From 0179726222f62db8d2c495d73505768166646912 Mon Sep 17 00:00:00 2001 From: Mark Griffith Date: Fri, 10 Apr 2026 22:26:07 -0700 Subject: [PATCH] Memory fix for trace's streamWrapper. (#25089) --- packages/core/src/telemetry/trace.test.ts | 62 +++++++++++++++++---- packages/core/src/telemetry/trace.ts | 65 +++++++++++++++++------ 2 files changed, 102 insertions(+), 25 deletions(-) diff --git a/packages/core/src/telemetry/trace.test.ts b/packages/core/src/telemetry/trace.test.ts index 9cb1e8796f..87a1419080 100644 --- a/packages/core/src/telemetry/trace.test.ts +++ b/packages/core/src/telemetry/trace.test.ts @@ -4,32 +4,37 @@ * SPDX-License-Identifier: Apache-2.0 */ -import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest'; -import { trace, SpanStatusCode, diag, type Tracer } from '@opentelemetry/api'; -import { runInDevTraceSpan, truncateForTelemetry } from './trace.js'; +import { diag, SpanStatusCode, trace } from '@opentelemetry/api'; +import type { Tracer } from '@opentelemetry/api'; +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; + import { - GeminiCliOperation, - GEN_AI_CONVERSATION_ID, GEN_AI_AGENT_DESCRIPTION, GEN_AI_AGENT_NAME, + GEN_AI_CONVERSATION_ID, GEN_AI_INPUT_MESSAGES, GEN_AI_OPERATION_NAME, GEN_AI_OUTPUT_MESSAGES, + GeminiCliOperation, SERVICE_DESCRIPTION, SERVICE_NAME, } from './constants.js'; +import { + runInDevTraceSpan, + spanRegistry, + truncateForTelemetry, +} from './trace.js'; vi.mock('@opentelemetry/api', async (importOriginal) => { - const original = await importOriginal(); - return { - ...original, + const original = await importOriginal(); + return Object.assign({}, original, { trace: { getTracer: vi.fn(), }, diag: { error: vi.fn(), }, - }; + }); }); vi.mock('../utils/session.js', () => ({ @@ -207,6 +212,45 @@ describe('runInDevTraceSpan', () => { expect(mockSpan.end).toHaveBeenCalled(); }); + it('should register async generators with spanRegistry', async () => { + const spy = vi.spyOn(spanRegistry, 'register'); + async function* testStream() { + yield 1; + } + + const resultStream = await runInDevTraceSpan( + { operation: GeminiCliOperation.LLMCall, sessionId: 'test-session-id' }, + async () => testStream(), + ); + + expect(spy).toHaveBeenCalledWith(resultStream, expect.any(Function)); + }); + + it('should be idempotent and call span.end only once', async () => { + vi.spyOn(spanRegistry, 'register'); + async function* testStream() { + yield 1; + } + + const resultStream = await runInDevTraceSpan( + { operation: GeminiCliOperation.LLMCall, sessionId: 'test-session-id' }, + async () => testStream(), + ); + + // Simulate completion + for await (const _ of resultStream) { + // iterate + } + expect(mockSpan.end).toHaveBeenCalledTimes(1); + + // Try to end again (simulating registry or double call) + const endSpanFn = vi.mocked(spanRegistry.register).mock + .calls[0][1] as () => void; + endSpanFn(); + + expect(mockSpan.end).toHaveBeenCalledTimes(1); + }); + it('should end span automatically on error in async iterators', async () => { const error = new Error('streaming error'); async function* errorStream() { diff --git a/packages/core/src/telemetry/trace.ts b/packages/core/src/telemetry/trace.ts index 86447eb353..fd3082c3cd 100644 --- a/packages/core/src/telemetry/trace.ts +++ b/packages/core/src/telemetry/trace.ts @@ -11,9 +11,11 @@ import { type AttributeValue, type SpanOptions, } from '@opentelemetry/api'; + +import { debugLogger } from '../utils/debugLogger.js'; import { safeJsonStringify } from '../utils/safeJsonStringify.js'; +import { truncateString } from '../utils/textUtils.js'; import { - type GeminiCliOperation, GEN_AI_AGENT_DESCRIPTION, GEN_AI_AGENT_NAME, GEN_AI_CONVERSATION_ID, @@ -22,23 +24,44 @@ import { GEN_AI_OUTPUT_MESSAGES, SERVICE_DESCRIPTION, SERVICE_NAME, + type GeminiCliOperation, } from './constants.js'; -import { truncateString } from '../utils/textUtils.js'; - const TRACER_NAME = 'gemini-cli'; const TRACER_VERSION = 'v1'; +/** + * Registry used to ensure that spans are properly ended when their associated + * async objects are garbage collected. + */ +export const spanRegistry = new FinalizationRegistry((endSpan: () => void) => { + try { + endSpan(); + } catch (e) { + debugLogger.warn( + 'Error in FinalizationRegistry callback for span cleanup', + e, + ); + } +}); + +/** + * Truncates a value for inclusion in telemetry attributes. + * + * @param value The value to truncate. + * @param maxLength The maximum length of the stringified value. + * @returns The truncated value, or undefined if the value type is not supported. + */ export function truncateForTelemetry( value: unknown, - maxLength: number = 10000, + maxLength = 10000, ): AttributeValue | undefined { if (typeof value === 'string') { return truncateString( value, maxLength, `...[TRUNCATED: original length ${value.length}]`, - ); + ) as AttributeValue; } if (typeof value === 'object' && value !== null) { const stringified = safeJsonStringify(value); @@ -46,10 +69,10 @@ export function truncateForTelemetry( stringified, maxLength, `...[TRUNCATED: original length ${stringified.length}]`, - ); + ) as AttributeValue; } if (typeof value === 'number' || typeof value === 'boolean') { - return value; + return value as AttributeValue; } return undefined; } @@ -82,12 +105,15 @@ export interface SpanMetadata { * * @example * ```typescript - * runInDevTraceSpan({ name: 'my-operation' }, ({ metadata }) => { - * metadata.input = { foo: 'bar' }; - * // ... do work ... - * metadata.output = { result: 'baz' }; - * metadata.attributes['my.custom.attribute'] = 'some-value'; - * }); + * await runInDevTraceSpan( + * { operation: GeminiCliOperation.LLMCall, sessionId: 'my-session' }, + * async ({ metadata }) => { + * metadata.input = { foo: 'bar' }; + * // ... do work ... + * metadata.output = { result: 'baz' }; + * metadata.attributes['my.custom.attribute'] = 'some-value'; + * } + * ); * ``` * * @param opts The options for the span. @@ -115,7 +141,12 @@ export async function runInDevTraceSpan( [GEN_AI_CONVERSATION_ID]: sessionId, }, }; + let spanEnded = false; const endSpan = () => { + if (spanEnded) { + return; + } + spanEnded = true; try { if (logPrompts !== false) { if (meta.input !== undefined) { @@ -169,7 +200,7 @@ export async function runInDevTraceSpan( const streamWrapper = (async function* () { try { yield* result; - } catch (e) { + } catch (e: unknown) { meta.error = e; throw e; } finally { @@ -177,10 +208,12 @@ export async function runInDevTraceSpan( } })(); - return Object.assign(streamWrapper, result); + const finalResult = Object.assign(streamWrapper, result); + spanRegistry.register(finalResult, endSpan); + return finalResult; } return result; - } catch (e) { + } catch (e: unknown) { meta.error = e; throw e; } finally {