From ab355ac7f08281dd38da2c679244a6ecb57239eb Mon Sep 17 00:00:00 2001 From: Spencer Date: Fri, 20 Mar 2026 20:46:25 +0000 Subject: [PATCH] fix(core): prevent OOM by truncating massive open telemetry attributes and fixing span leaks - Implements `truncateForTelemetry` to prevent massive strings (like LLM prompts) from staying in memory. - Fixes `runInDevTraceSpan` leaking unclosed spans on aborted or errored async iterators by automatically capturing generator returns. --- .../core/src/core/loggingContentGenerator.ts | 7 +- packages/core/src/telemetry/trace.test.ts | 51 ++++++++------ packages/core/src/telemetry/trace.ts | 70 +++++++++++++------ packages/core/src/telemetry/truncate.ts | 48 +++++++++++++ 4 files changed, 127 insertions(+), 49 deletions(-) create mode 100644 packages/core/src/telemetry/truncate.ts diff --git a/packages/core/src/core/loggingContentGenerator.ts b/packages/core/src/core/loggingContentGenerator.ts index 60144740c2..1dc10096f7 100644 --- a/packages/core/src/core/loggingContentGenerator.ts +++ b/packages/core/src/core/loggingContentGenerator.ts @@ -438,7 +438,6 @@ export class LoggingContentGenerator implements ContentGenerator { return runInDevTraceSpan( { operation: GeminiCliOperation.LLMCall, - noAutoEnd: true, attributes: { [GEN_AI_REQUEST_MODEL]: req.model, [GEN_AI_PROMPT_NAME]: userPromptId, @@ -448,7 +447,7 @@ export class LoggingContentGenerator implements ContentGenerator { [GEN_AI_TOOL_DEFINITIONS]: safeJsonStringify(req.config?.tools ?? []), }, }, - async ({ metadata: spanMetadata, endSpan }) => { + async ({ metadata: spanMetadata }) => { spanMetadata.input = req.contents; const startTime = Date.now(); @@ -504,7 +503,6 @@ export class LoggingContentGenerator implements ContentGenerator { userPromptId, role, spanMetadata, - endSpan, ); }, ); @@ -517,7 +515,6 @@ export class LoggingContentGenerator implements ContentGenerator { userPromptId: string, role: LlmRole, spanMetadata: SpanMetadata, - endSpan: () => void, ): AsyncGenerator { const responses: GenerateContentResponse[] = []; @@ -581,8 +578,6 @@ export class LoggingContentGenerator implements ContentGenerator { serverDetails, ); throw error; - } finally { - endSpan(); } } diff --git a/packages/core/src/telemetry/trace.test.ts b/packages/core/src/telemetry/trace.test.ts index 4d9aa0baa8..9a77b6e241 100644 --- a/packages/core/src/telemetry/trace.test.ts +++ b/packages/core/src/telemetry/trace.test.ts @@ -133,33 +133,44 @@ describe('runInDevTraceSpan', () => { expect(mockSpan.end).toHaveBeenCalled(); }); - it('should respect noAutoEnd option', async () => { - let capturedEndSpan: () => void = () => {}; - const result = await runInDevTraceSpan( - { operation: GeminiCliOperation.LLMCall, noAutoEnd: true }, - async ({ endSpan }) => { - capturedEndSpan = endSpan; - return 'streaming'; - }, - ); + it('should auto-wrap AsyncGenerators to end span only on completion', async () => { + const asyncGen = async function* () { + yield 'part1'; + yield 'part2'; + }; + + const result = (await runInDevTraceSpan( + { operation: GeminiCliOperation.LLMCall }, + async () => asyncGen(), + )) as unknown as AsyncGenerator; - expect(result).toBe('streaming'); expect(mockSpan.end).not.toHaveBeenCalled(); - capturedEndSpan(); + const parts = []; + for await (const part of result) { + parts.push(part); + } + expect(parts).toEqual(['part1', 'part2']); expect(mockSpan.end).toHaveBeenCalled(); }); - it('should automatically end span on error even if noAutoEnd is true', async () => { + it('should automatically end span on error even if it is an AsyncGenerator', async () => { const error = new Error('streaming error'); - await expect( - runInDevTraceSpan( - { operation: GeminiCliOperation.LLMCall, noAutoEnd: true }, - async () => { - throw error; - }, - ), - ).rejects.toThrow(error); + const asyncGen = async function* () { + yield 'part1'; + throw error; + }; + + const result = (await runInDevTraceSpan( + { operation: GeminiCliOperation.LLMCall }, + async () => asyncGen(), + )) as unknown as AsyncGenerator; + + await expect(async () => { + for await (const _ of result) { + // Wait for throw + } + }).rejects.toThrow(error); expect(mockSpan.end).toHaveBeenCalled(); }); diff --git a/packages/core/src/telemetry/trace.ts b/packages/core/src/telemetry/trace.ts index 1f4676343a..f662256e55 100644 --- a/packages/core/src/telemetry/trace.ts +++ b/packages/core/src/telemetry/trace.ts @@ -11,6 +11,7 @@ import { type AttributeValue, type SpanOptions, } from '@opentelemetry/api'; +import { truncateForTelemetry } from './truncate.js'; import { safeJsonStringify } from '../utils/safeJsonStringify.js'; import { type GeminiCliOperation, @@ -63,7 +64,7 @@ export interface SpanMetadata { * @returns The result of the function. */ export async function runInDevTraceSpan( - opts: SpanOptions & { operation: GeminiCliOperation; noAutoEnd?: boolean }, + opts: SpanOptions & { operation: GeminiCliOperation }, fn: ({ metadata, }: { @@ -71,7 +72,7 @@ export async function runInDevTraceSpan( endSpan: () => void; }) => Promise, ): Promise { - const { operation, noAutoEnd, ...restOfSpanOpts } = opts; + const { operation, ...restOfSpanOpts } = opts; const tracer = trace.getTracer(TRACER_NAME, TRACER_VERSION); return tracer.startActiveSpan(operation, restOfSpanOpts, async (span) => { @@ -84,22 +85,28 @@ export async function runInDevTraceSpan( [GEN_AI_CONVERSATION_ID]: sessionId, }, }; + let spanEnded = false; const endSpan = () => { + if (spanEnded) return; + spanEnded = true; try { if (meta.input !== undefined) { - span.setAttribute( - GEN_AI_INPUT_MESSAGES, - safeJsonStringify(meta.input), - ); + const truncatedInput = truncateForTelemetry(meta.input); + if (truncatedInput !== undefined) { + span.setAttribute(GEN_AI_INPUT_MESSAGES, truncatedInput); + } } if (meta.output !== undefined) { - span.setAttribute( - GEN_AI_OUTPUT_MESSAGES, - safeJsonStringify(meta.output), - ); + const truncatedOutput = truncateForTelemetry(meta.output); + if (truncatedOutput !== undefined) { + span.setAttribute(GEN_AI_OUTPUT_MESSAGES, truncatedOutput); + } } for (const [key, value] of Object.entries(meta.attributes)) { - span.setAttribute(key, value); + const truncatedValue = truncateForTelemetry(value); + if (truncatedValue !== undefined) { + span.setAttribute(key, truncatedValue); + } } if (meta.error) { span.setStatus({ @@ -123,23 +130,40 @@ export async function runInDevTraceSpan( span.end(); } }; + + let result: R; try { - return await fn({ metadata: meta, endSpan }); + result = await fn({ metadata: meta, endSpan }); } catch (e) { meta.error = e; - if (noAutoEnd) { - // For streaming operations, the delegated endSpan call will not be reached - // on an exception, so we must end the span here to prevent a leak. - endSpan(); - } + endSpan(); throw e; - } finally { - if (!noAutoEnd) { - // For non-streaming operations, this ensures the span is always closed, - // and if an error occurred, it will be recorded correctly by endSpan. - endSpan(); - } } + + // Auto-detect AsyncGenerators and wrap them to ensure endSpan is called + // when iteration finishes or fails. + if ( + result != null && + typeof result === 'object' && + Symbol.asyncIterator in result + ) { + // eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion + const asyncIterable = result as unknown as AsyncIterable; + // eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion + return (async function* () { + try { + yield* asyncIterable; + } catch (e) { + meta.error = e; + throw e; + } finally { + endSpan(); + } + })() as unknown as R; + } + + endSpan(); + return result; }); } diff --git a/packages/core/src/telemetry/truncate.ts b/packages/core/src/telemetry/truncate.ts new file mode 100644 index 0000000000..4afe324e76 --- /dev/null +++ b/packages/core/src/telemetry/truncate.ts @@ -0,0 +1,48 @@ +/** + * @license + * Copyright 2026 Google LLC + * SPDX-License-Identifier: Apache-2.0 + */ + +import { safeJsonStringify } from '../utils/safeJsonStringify.js'; +import type { AttributeValue } from '@opentelemetry/api'; + +/** + * Truncates values for telemetry to prevent massive strings (e.g., LLM prompts, tool arguments) + * from being attached to spans and causing memory leaks. + * + * @param value The value to truncate + * @param maxLength The maximum length of the string representation (default: 1000) + */ +export function truncateForTelemetry( + value: unknown, + maxLength: number = 1000, +): AttributeValue | undefined { + if (value === null || value === undefined) { + return undefined; + } + + let stringValue: string; + if (typeof value === 'string') { + stringValue = value; + } else if (typeof value === 'object' || Array.isArray(value)) { + try { + stringValue = safeJsonStringify(value); + } catch (_e) { + stringValue = String(value); + } + } else if (typeof value === 'number' || typeof value === 'boolean') { + return value; + } else { + stringValue = String(value); + } + + if (stringValue && stringValue.length > maxLength) { + return ( + stringValue.substring(0, maxLength) + + `...[TRUNCATED: original length ${stringValue.length}]` + ); + } + + return stringValue; +}