Memory fix for trace's streamWrapper. (#25089)

This commit is contained in:
Mark Griffith
2026-04-10 22:26:07 -07:00
committed by GitHub
parent 0957f7d3e2
commit 0179726222
2 changed files with 102 additions and 25 deletions

View File

@@ -4,32 +4,37 @@
* SPDX-License-Identifier: Apache-2.0
*/
import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest';
import { trace, SpanStatusCode, diag, type Tracer } from '@opentelemetry/api';
import { runInDevTraceSpan, truncateForTelemetry } from './trace.js';
import { diag, SpanStatusCode, trace } from '@opentelemetry/api';
import type { Tracer } from '@opentelemetry/api';
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest';
import {
GeminiCliOperation,
GEN_AI_CONVERSATION_ID,
GEN_AI_AGENT_DESCRIPTION,
GEN_AI_AGENT_NAME,
GEN_AI_CONVERSATION_ID,
GEN_AI_INPUT_MESSAGES,
GEN_AI_OPERATION_NAME,
GEN_AI_OUTPUT_MESSAGES,
GeminiCliOperation,
SERVICE_DESCRIPTION,
SERVICE_NAME,
} from './constants.js';
import {
runInDevTraceSpan,
spanRegistry,
truncateForTelemetry,
} from './trace.js';
vi.mock('@opentelemetry/api', async (importOriginal) => {
const original = await importOriginal<typeof import('@opentelemetry/api')>();
return {
...original,
const original = await importOriginal();
return Object.assign({}, original, {
trace: {
getTracer: vi.fn(),
},
diag: {
error: vi.fn(),
},
};
});
});
vi.mock('../utils/session.js', () => ({
@@ -207,6 +212,45 @@ describe('runInDevTraceSpan', () => {
expect(mockSpan.end).toHaveBeenCalled();
});
it('should register async generators with spanRegistry', async () => {
const spy = vi.spyOn(spanRegistry, 'register');
async function* testStream() {
yield 1;
}
const resultStream = await runInDevTraceSpan(
{ operation: GeminiCliOperation.LLMCall, sessionId: 'test-session-id' },
async () => testStream(),
);
expect(spy).toHaveBeenCalledWith(resultStream, expect.any(Function));
});
it('should be idempotent and call span.end only once', async () => {
vi.spyOn(spanRegistry, 'register');
async function* testStream() {
yield 1;
}
const resultStream = await runInDevTraceSpan(
{ operation: GeminiCliOperation.LLMCall, sessionId: 'test-session-id' },
async () => testStream(),
);
// Simulate completion
for await (const _ of resultStream) {
// iterate
}
expect(mockSpan.end).toHaveBeenCalledTimes(1);
// Try to end again (simulating registry or double call)
const endSpanFn = vi.mocked(spanRegistry.register).mock
.calls[0][1] as () => void;
endSpanFn();
expect(mockSpan.end).toHaveBeenCalledTimes(1);
});
it('should end span automatically on error in async iterators', async () => {
const error = new Error('streaming error');
async function* errorStream() {

View File

@@ -11,9 +11,11 @@ import {
type AttributeValue,
type SpanOptions,
} from '@opentelemetry/api';
import { debugLogger } from '../utils/debugLogger.js';
import { safeJsonStringify } from '../utils/safeJsonStringify.js';
import { truncateString } from '../utils/textUtils.js';
import {
type GeminiCliOperation,
GEN_AI_AGENT_DESCRIPTION,
GEN_AI_AGENT_NAME,
GEN_AI_CONVERSATION_ID,
@@ -22,23 +24,44 @@ import {
GEN_AI_OUTPUT_MESSAGES,
SERVICE_DESCRIPTION,
SERVICE_NAME,
type GeminiCliOperation,
} from './constants.js';
import { truncateString } from '../utils/textUtils.js';
const TRACER_NAME = 'gemini-cli';
const TRACER_VERSION = 'v1';
/**
* Registry used to ensure that spans are properly ended when their associated
* async objects are garbage collected.
*/
export const spanRegistry = new FinalizationRegistry((endSpan: () => void) => {
try {
endSpan();
} catch (e) {
debugLogger.warn(
'Error in FinalizationRegistry callback for span cleanup',
e,
);
}
});
/**
* Truncates a value for inclusion in telemetry attributes.
*
* @param value The value to truncate.
* @param maxLength The maximum length of the stringified value.
* @returns The truncated value, or undefined if the value type is not supported.
*/
export function truncateForTelemetry(
value: unknown,
maxLength: number = 10000,
maxLength = 10000,
): AttributeValue | undefined {
if (typeof value === 'string') {
return truncateString(
value,
maxLength,
`...[TRUNCATED: original length ${value.length}]`,
);
) as AttributeValue;
}
if (typeof value === 'object' && value !== null) {
const stringified = safeJsonStringify(value);
@@ -46,10 +69,10 @@ export function truncateForTelemetry(
stringified,
maxLength,
`...[TRUNCATED: original length ${stringified.length}]`,
);
) as AttributeValue;
}
if (typeof value === 'number' || typeof value === 'boolean') {
return value;
return value as AttributeValue;
}
return undefined;
}
@@ -82,12 +105,15 @@ export interface SpanMetadata {
*
* @example
* ```typescript
* runInDevTraceSpan({ name: 'my-operation' }, ({ metadata }) => {
* metadata.input = { foo: 'bar' };
* // ... do work ...
* metadata.output = { result: 'baz' };
* metadata.attributes['my.custom.attribute'] = 'some-value';
* });
* await runInDevTraceSpan(
* { operation: GeminiCliOperation.LLMCall, sessionId: 'my-session' },
* async ({ metadata }) => {
* metadata.input = { foo: 'bar' };
* // ... do work ...
* metadata.output = { result: 'baz' };
* metadata.attributes['my.custom.attribute'] = 'some-value';
* }
* );
* ```
*
* @param opts The options for the span.
@@ -115,7 +141,12 @@ export async function runInDevTraceSpan<R>(
[GEN_AI_CONVERSATION_ID]: sessionId,
},
};
let spanEnded = false;
const endSpan = () => {
if (spanEnded) {
return;
}
spanEnded = true;
try {
if (logPrompts !== false) {
if (meta.input !== undefined) {
@@ -169,7 +200,7 @@ export async function runInDevTraceSpan<R>(
const streamWrapper = (async function* () {
try {
yield* result;
} catch (e) {
} catch (e: unknown) {
meta.error = e;
throw e;
} finally {
@@ -177,10 +208,12 @@ export async function runInDevTraceSpan<R>(
}
})();
return Object.assign(streamWrapper, result);
const finalResult = Object.assign(streamWrapper, result);
spanRegistry.register(finalResult, endSpan);
return finalResult;
}
return result;
} catch (e) {
} catch (e: unknown) {
meta.error = e;
throw e;
} finally {