fix(core): prevent OOM by truncating massive open telemetry attributes and fixing span leaks

- Implements `truncateForTelemetry` to prevent massive strings (like LLM prompts) from staying in memory.
- Fixes `runInDevTraceSpan` leaking unclosed spans on aborted or errored async iterators by automatically capturing generator returns.
This commit is contained in:
Spencer
2026-03-20 20:46:25 +00:00
parent b459e1a108
commit ab355ac7f0
4 changed files with 127 additions and 49 deletions
@@ -438,7 +438,6 @@ export class LoggingContentGenerator implements ContentGenerator {
return runInDevTraceSpan(
{
operation: GeminiCliOperation.LLMCall,
noAutoEnd: true,
attributes: {
[GEN_AI_REQUEST_MODEL]: req.model,
[GEN_AI_PROMPT_NAME]: userPromptId,
@@ -448,7 +447,7 @@ export class LoggingContentGenerator implements ContentGenerator {
[GEN_AI_TOOL_DEFINITIONS]: safeJsonStringify(req.config?.tools ?? []),
},
},
async ({ metadata: spanMetadata, endSpan }) => {
async ({ metadata: spanMetadata }) => {
spanMetadata.input = req.contents;
const startTime = Date.now();
@@ -504,7 +503,6 @@ export class LoggingContentGenerator implements ContentGenerator {
userPromptId,
role,
spanMetadata,
endSpan,
);
},
);
@@ -517,7 +515,6 @@ export class LoggingContentGenerator implements ContentGenerator {
userPromptId: string,
role: LlmRole,
spanMetadata: SpanMetadata,
endSpan: () => void,
): AsyncGenerator<GenerateContentResponse> {
const responses: GenerateContentResponse[] = [];
@@ -581,8 +578,6 @@ export class LoggingContentGenerator implements ContentGenerator {
serverDetails,
);
throw error;
} finally {
endSpan();
}
}
+31 -20
View File
@@ -133,33 +133,44 @@ describe('runInDevTraceSpan', () => {
expect(mockSpan.end).toHaveBeenCalled();
});
it('should respect noAutoEnd option', async () => {
let capturedEndSpan: () => void = () => {};
const result = await runInDevTraceSpan(
{ operation: GeminiCliOperation.LLMCall, noAutoEnd: true },
async ({ endSpan }) => {
capturedEndSpan = endSpan;
return 'streaming';
},
);
it('should auto-wrap AsyncGenerators to end span only on completion', async () => {
const asyncGen = async function* () {
yield 'part1';
yield 'part2';
};
const result = (await runInDevTraceSpan(
{ operation: GeminiCliOperation.LLMCall },
async () => asyncGen(),
)) as unknown as AsyncGenerator<string>;
expect(result).toBe('streaming');
expect(mockSpan.end).not.toHaveBeenCalled();
capturedEndSpan();
const parts = [];
for await (const part of result) {
parts.push(part);
}
expect(parts).toEqual(['part1', 'part2']);
expect(mockSpan.end).toHaveBeenCalled();
});
it('should automatically end span on error even if noAutoEnd is true', async () => {
it('should automatically end span on error even if it is an AsyncGenerator', async () => {
const error = new Error('streaming error');
await expect(
runInDevTraceSpan(
{ operation: GeminiCliOperation.LLMCall, noAutoEnd: true },
async () => {
throw error;
},
),
).rejects.toThrow(error);
const asyncGen = async function* () {
yield 'part1';
throw error;
};
const result = (await runInDevTraceSpan(
{ operation: GeminiCliOperation.LLMCall },
async () => asyncGen(),
)) as unknown as AsyncGenerator<string>;
await expect(async () => {
for await (const _ of result) {
// Wait for throw
}
}).rejects.toThrow(error);
expect(mockSpan.end).toHaveBeenCalled();
});
+47 -23
View File
@@ -11,6 +11,7 @@ import {
type AttributeValue,
type SpanOptions,
} from '@opentelemetry/api';
import { truncateForTelemetry } from './truncate.js';
import { safeJsonStringify } from '../utils/safeJsonStringify.js';
import {
type GeminiCliOperation,
@@ -63,7 +64,7 @@ export interface SpanMetadata {
* @returns The result of the function.
*/
export async function runInDevTraceSpan<R>(
opts: SpanOptions & { operation: GeminiCliOperation; noAutoEnd?: boolean },
opts: SpanOptions & { operation: GeminiCliOperation },
fn: ({
metadata,
}: {
@@ -71,7 +72,7 @@ export async function runInDevTraceSpan<R>(
endSpan: () => void;
}) => Promise<R>,
): Promise<R> {
const { operation, noAutoEnd, ...restOfSpanOpts } = opts;
const { operation, ...restOfSpanOpts } = opts;
const tracer = trace.getTracer(TRACER_NAME, TRACER_VERSION);
return tracer.startActiveSpan(operation, restOfSpanOpts, async (span) => {
@@ -84,22 +85,28 @@ export async function runInDevTraceSpan<R>(
[GEN_AI_CONVERSATION_ID]: sessionId,
},
};
let spanEnded = false;
const endSpan = () => {
if (spanEnded) return;
spanEnded = true;
try {
if (meta.input !== undefined) {
span.setAttribute(
GEN_AI_INPUT_MESSAGES,
safeJsonStringify(meta.input),
);
const truncatedInput = truncateForTelemetry(meta.input);
if (truncatedInput !== undefined) {
span.setAttribute(GEN_AI_INPUT_MESSAGES, truncatedInput);
}
}
if (meta.output !== undefined) {
span.setAttribute(
GEN_AI_OUTPUT_MESSAGES,
safeJsonStringify(meta.output),
);
const truncatedOutput = truncateForTelemetry(meta.output);
if (truncatedOutput !== undefined) {
span.setAttribute(GEN_AI_OUTPUT_MESSAGES, truncatedOutput);
}
}
for (const [key, value] of Object.entries(meta.attributes)) {
span.setAttribute(key, value);
const truncatedValue = truncateForTelemetry(value);
if (truncatedValue !== undefined) {
span.setAttribute(key, truncatedValue);
}
}
if (meta.error) {
span.setStatus({
@@ -123,23 +130,40 @@ export async function runInDevTraceSpan<R>(
span.end();
}
};
let result: R;
try {
return await fn({ metadata: meta, endSpan });
result = await fn({ metadata: meta, endSpan });
} catch (e) {
meta.error = e;
if (noAutoEnd) {
// For streaming operations, the delegated endSpan call will not be reached
// on an exception, so we must end the span here to prevent a leak.
endSpan();
}
endSpan();
throw e;
} finally {
if (!noAutoEnd) {
// For non-streaming operations, this ensures the span is always closed,
// and if an error occurred, it will be recorded correctly by endSpan.
endSpan();
}
}
// Auto-detect AsyncGenerators and wrap them to ensure endSpan is called
// when iteration finishes or fails.
if (
result != null &&
typeof result === 'object' &&
Symbol.asyncIterator in result
) {
// eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion
const asyncIterable = result as unknown as AsyncIterable<unknown>;
// eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion
return (async function* () {
try {
yield* asyncIterable;
} catch (e) {
meta.error = e;
throw e;
} finally {
endSpan();
}
})() as unknown as R;
}
endSpan();
return result;
});
}
+48
View File
@@ -0,0 +1,48 @@
/**
* @license
* Copyright 2026 Google LLC
* SPDX-License-Identifier: Apache-2.0
*/
import { safeJsonStringify } from '../utils/safeJsonStringify.js';
import type { AttributeValue } from '@opentelemetry/api';
/**
* Truncates values for telemetry to prevent massive strings (e.g., LLM prompts, tool arguments)
* from being attached to spans and causing memory leaks.
*
* @param value The value to truncate
* @param maxLength The maximum length of the string representation (default: 1000)
*/
export function truncateForTelemetry(
value: unknown,
maxLength: number = 1000,
): AttributeValue | undefined {
if (value === null || value === undefined) {
return undefined;
}
let stringValue: string;
if (typeof value === 'string') {
stringValue = value;
} else if (typeof value === 'object' || Array.isArray(value)) {
try {
stringValue = safeJsonStringify(value);
} catch (_e) {
stringValue = String(value);
}
} else if (typeof value === 'number' || typeof value === 'boolean') {
return value;
} else {
stringValue = String(value);
}
if (stringValue && stringValue.length > maxLength) {
return (
stringValue.substring(0, maxLength) +
`...[TRUNCATED: original length ${stringValue.length}]`
);
}
return stringValue;
}