diff --git a/packages/core/src/code_assist/server.test.ts b/packages/core/src/code_assist/server.test.ts index 3ea20be5e2..bb7f4532a3 100644 --- a/packages/core/src/code_assist/server.test.ts +++ b/packages/core/src/code_assist/server.test.ts @@ -10,8 +10,14 @@ import { OAuth2Client } from 'google-auth-library'; import { UserTierId, ActionStatus } from './types.js'; import { FinishReason } from '@google/genai'; import { LlmRole } from '../telemetry/types.js'; +import { logInvalidChunk } from '../telemetry/loggers.js'; +import { makeFakeConfig } from '../test-utils/config.js'; vi.mock('google-auth-library'); +vi.mock('../telemetry/loggers.js', () => ({ + logBillingEvent: vi.fn(), + logInvalidChunk: vi.fn(), +})); function createTestServer(headers: Record = {}) { const mockRequest = vi.fn(); @@ -671,4 +677,242 @@ describe('CodeAssistServer', () => { expect(requestPostSpy).toHaveBeenCalledWith('retrieveUserQuota', req); expect(response).toEqual(mockResponse); }); + + describe('robustness testing', () => { + it('should not crash on random error objects in loadCodeAssist (isVpcScAffectedUser)', async () => { + const { server } = createTestServer(); + const errors = [ + null, + undefined, + 'string error', + 123, + { some: 'object' }, + new Error('standard error'), + { response: {} }, + { response: { data: {} } }, + ]; + + for (const err of errors) { + vi.spyOn(server, 'requestPost').mockRejectedValueOnce(err); + try { + await server.loadCodeAssist({ metadata: {} }); + } catch (e) { + expect(e).toBe(err); + } + } + }); + + it('should handle randomly fragmented SSE streams gracefully', async () => { + const { server, mockRequest } = createTestServer(); + const { Readable } = await import('node:stream'); + + const fragmentedCases = [ + { + chunks: ['d', 'ata: {"foo":', ' "bar"}\n\n'], + expected: [{ foo: 'bar' }], + }, + { + chunks: ['data: {"foo": "bar"}\n', '\n'], + expected: [{ foo: 'bar' }], + }, + { + chunks: ['data: ', '{"foo": "bar"}', '\n\n'], + expected: [{ foo: 'bar' }], + }, + { + chunks: ['data: {"foo": "bar"}\n\n', 'data: {"baz": 1}\n\n'], + expected: [{ foo: 'bar' }, { baz: 1 }], + }, + ]; + + for (const { chunks, expected } of fragmentedCases) { + const mockStream = new Readable({ + read() { + for (const chunk of chunks) { + this.push(chunk); + } + this.push(null); + }, + }); + mockRequest.mockResolvedValueOnce({ data: mockStream }); + + const stream = await server.requestStreamingPost('testStream', {}); + const results = []; + for await (const res of stream) { + results.push(res); + } + expect(results).toEqual(expected); + } + }); + + it('should correctly parse valid JSON split across multiple data lines', async () => { + const { server, mockRequest } = createTestServer(); + const { Readable } = await import('node:stream'); + const jsonObj = { + complex: { structure: [1, 2, 3] }, + bool: true, + str: 'value', + }; + const jsonString = JSON.stringify(jsonObj, null, 2); + const lines = jsonString.split('\n'); + const ssePayload = lines.map((line) => `data: ${line}\n`).join('') + '\n'; + + const mockStream = new Readable({ + read() { + this.push(ssePayload); + this.push(null); + }, + }); + mockRequest.mockResolvedValueOnce({ data: mockStream }); + + const stream = await server.requestStreamingPost('testStream', {}); + const results = []; + for await (const res of stream) { + results.push(res); + } + expect(results).toHaveLength(1); + expect(results[0]).toEqual(jsonObj); + }); + + it('should not crash on objects partially matching VPC SC error structure', async () => { + const { server } = createTestServer(); + const partialErrors = [ + { response: { data: { error: { details: [{ reason: 'OTHER' }] } } } }, + { response: { data: { error: { details: [] } } } }, + { response: { data: { error: {} } } }, + { response: { data: {} } }, + ]; + + for (const err of partialErrors) { + vi.spyOn(server, 'requestPost').mockRejectedValueOnce(err); + try { + await server.loadCodeAssist({ metadata: {} }); + } catch (e) { + expect(e).toBe(err); + } + } + }); + + it('should correctly ignore arbitrary SSE comments and ID lines and empty lines before data', async () => { + const { server, mockRequest } = createTestServer(); + const { Readable } = await import('node:stream'); + const jsonObj = { foo: 'bar' }; + const jsonString = JSON.stringify(jsonObj); + + const ssePayload = `id: 123 +:comment +retry: 100 + +data: ${jsonString} + +`; + + const mockStream = new Readable({ + read() { + this.push(ssePayload); + this.push(null); + }, + }); + mockRequest.mockResolvedValueOnce({ data: mockStream }); + + const stream = await server.requestStreamingPost('testStream', {}); + const results = []; + for await (const res of stream) { + results.push(res); + } + expect(results).toHaveLength(1); + expect(results[0]).toEqual(jsonObj); + }); + + it('should log InvalidChunkEvent when SSE chunk is not valid JSON', async () => { + const config = makeFakeConfig(); + const mockRequest = vi.fn(); + const client = { request: mockRequest } as unknown as OAuth2Client; + const server = new CodeAssistServer( + client, + 'test-project', + {}, + 'test-session', + UserTierId.FREE, + undefined, + undefined, + config, + ); + + const { Readable } = await import('node:stream'); + const mockStream = new Readable({ + read() {}, + }); + + mockRequest.mockResolvedValue({ data: mockStream }); + + const stream = await server.requestStreamingPost('testStream', {}); + + setTimeout(() => { + mockStream.push('data: { "invalid": json }\n\n'); + mockStream.push(null); + }, 0); + + const results = []; + for await (const res of stream) { + results.push(res); + } + + expect(results).toHaveLength(0); + expect(logInvalidChunk).toHaveBeenCalledWith( + config, + expect.objectContaining({ + error_message: 'Malformed JSON chunk', + }), + ); + }); + + it('should safely process random response streams in generateContentStream (consumed/remaining credits)', async () => { + const { mockRequest, client } = createTestServer(); + const testServer = new CodeAssistServer( + client, + 'test-project', + {}, + 'test-session', + UserTierId.FREE, + undefined, + { id: 'test-tier', name: 'tier', availableCredits: [] }, + ); + const { Readable } = await import('node:stream'); + + const streamResponses = [ + { + traceId: '1', + consumedCredits: [{ creditType: 'A', creditAmount: '10' }], + }, + { traceId: '2', remainingCredits: [{ creditType: 'B' }] }, + { traceId: '3' }, + { traceId: '4', consumedCredits: null, remainingCredits: undefined }, + ]; + + const mockStream = new Readable({ + read() { + for (const resp of streamResponses) { + this.push(`data: ${JSON.stringify(resp)}\n\n`); + } + this.push(null); + }, + }); + mockRequest.mockResolvedValueOnce({ data: mockStream }); + vi.spyOn(testServer, 'recordCodeAssistMetrics').mockResolvedValue( + undefined, + ); + + const stream = await testServer.generateContentStream( + { model: 'test-model', contents: [] }, + 'user-prompt-id', + LlmRole.MAIN, + ); + + for await (const _ of stream) { + // Drain stream + } + // Should not crash + }); + }); }); diff --git a/packages/core/src/code_assist/server.ts b/packages/core/src/code_assist/server.ts index 9fbde78d41..114fa60092 100644 --- a/packages/core/src/code_assist/server.ts +++ b/packages/core/src/code_assist/server.ts @@ -47,7 +47,7 @@ import { isOverageEligibleModel, shouldAutoUseCredits, } from '../billing/billing.js'; -import { logBillingEvent } from '../telemetry/loggers.js'; +import { logBillingEvent, logInvalidChunk } from '../telemetry/loggers.js'; import { CreditsUsedEvent } from '../telemetry/billingEvents.js'; import { fromCountTokenResponse, @@ -62,7 +62,7 @@ import { recordConversationOffered, } from './telemetry.js'; import { getClientMetadata } from './experiments/client_metadata.js'; -import type { LlmRole } from '../telemetry/types.js'; +import { InvalidChunkEvent, type LlmRole } from '../telemetry/types.js'; /** HTTP options to be used in each of the requests. */ export interface HttpOptions { /** Additional HTTP headers to be sent with the request. */ @@ -466,7 +466,7 @@ export class CodeAssistServer implements ContentGenerator { retry: false, }); - return (async function* (): AsyncGenerator { + return (async function* (server: CodeAssistServer): AsyncGenerator { const rl = readline.createInterface({ input: Readable.from(res.data), crlfDelay: Infinity, // Recognizes '\r\n' and '\n' as line breaks @@ -480,12 +480,23 @@ export class CodeAssistServer implements ContentGenerator { if (bufferedLines.length === 0) { continue; // no data to yield } - yield JSON.parse(bufferedLines.join('\n')); + const chunk = bufferedLines.join('\n'); + try { + yield JSON.parse(chunk); + } catch (_e) { + if (server.config) { + logInvalidChunk( + server.config, + // Don't include the chunk content in the log for security/privacy reasons. + new InvalidChunkEvent('Malformed JSON chunk'), + ); + } + } bufferedLines = []; // Reset the buffer after yielding } // Ignore other lines like comments or id fields } - })(); + })(this); } private getBaseUrl(): string { diff --git a/packages/core/src/telemetry/loggers.test.ts b/packages/core/src/telemetry/loggers.test.ts index 3d9ed780e6..a3c757f5a7 100644 --- a/packages/core/src/telemetry/loggers.test.ts +++ b/packages/core/src/telemetry/loggers.test.ts @@ -33,6 +33,7 @@ import { logFlashFallback, logChatCompression, logMalformedJsonResponse, + logInvalidChunk, logFileOperation, logRipgrepFallback, logToolOutputTruncated, @@ -68,6 +69,7 @@ import { EVENT_AGENT_START, EVENT_AGENT_FINISH, EVENT_WEB_FETCH_FALLBACK_ATTEMPT, + EVENT_INVALID_CHUNK, ApiErrorEvent, ApiRequestEvent, ApiResponseEvent, @@ -77,6 +79,7 @@ import { FlashFallbackEvent, RipgrepFallbackEvent, MalformedJsonResponseEvent, + InvalidChunkEvent, makeChatCompressionEvent, FileOperationEvent, ToolOutputTruncatedEvent, @@ -1736,6 +1739,39 @@ describe('loggers', () => { }); }); + describe('logInvalidChunk', () => { + beforeEach(() => { + vi.spyOn(ClearcutLogger.prototype, 'logInvalidChunkEvent'); + vi.spyOn(metrics, 'recordInvalidChunk'); + }); + + it('logs the event to Clearcut and OTEL', () => { + const mockConfig = makeFakeConfig(); + const event = new InvalidChunkEvent('Unexpected token'); + + logInvalidChunk(mockConfig, event); + + expect( + ClearcutLogger.prototype.logInvalidChunkEvent, + ).toHaveBeenCalledWith(event); + + expect(mockLogger.emit).toHaveBeenCalledWith({ + body: 'Invalid chunk received from stream.', + attributes: { + 'session.id': 'test-session-id', + 'user.email': 'test-user@example.com', + 'installation.id': 'test-installation-id', + 'event.name': EVENT_INVALID_CHUNK, + 'event.timestamp': '2025-01-01T00:00:00.000Z', + interactive: false, + 'error.message': 'Unexpected token', + }, + }); + + expect(metrics.recordInvalidChunk).toHaveBeenCalledWith(mockConfig); + }); + }); + describe('logFileOperation', () => { const mockConfig = { getSessionId: () => 'test-session-id', diff --git a/packages/core/src/telemetry/loggers.ts b/packages/core/src/telemetry/loggers.ts index 2625f10789..4c3ed55321 100644 --- a/packages/core/src/telemetry/loggers.ts +++ b/packages/core/src/telemetry/loggers.ts @@ -29,6 +29,7 @@ import { type ConversationFinishedEvent, type ChatCompressionEvent, type MalformedJsonResponseEvent, + type InvalidChunkEvent, type ContentRetryEvent, type ContentRetryFailureEvent, type RipgrepFallbackEvent, @@ -75,6 +76,7 @@ import { recordPlanExecution, recordKeychainAvailability, recordTokenStorageInitialization, + recordInvalidChunk, } from './metrics.js'; import { bufferTelemetryEvent } from './sdk.js'; import { uiTelemetryService, type UiEvent } from './uiTelemetry.js'; @@ -467,6 +469,22 @@ export function logMalformedJsonResponse( }); } +export function logInvalidChunk( + config: Config, + event: InvalidChunkEvent, +): void { + ClearcutLogger.getInstance(config)?.logInvalidChunkEvent(event); + bufferTelemetryEvent(() => { + const logger = logs.getLogger(SERVICE_NAME); + const logRecord: LogRecord = { + body: event.toLogBody(), + attributes: event.toOpenTelemetryAttributes(config), + }; + logger.emit(logRecord); + recordInvalidChunk(config); + }); +} + export function logContentRetry( config: Config, event: ContentRetryEvent, diff --git a/packages/core/src/telemetry/metrics.test.ts b/packages/core/src/telemetry/metrics.test.ts index d0254ec678..3b8ae1ea0c 100644 --- a/packages/core/src/telemetry/metrics.test.ts +++ b/packages/core/src/telemetry/metrics.test.ts @@ -105,6 +105,7 @@ describe('Telemetry Metrics', () => { let recordPlanExecutionModule: typeof import('./metrics.js').recordPlanExecution; let recordKeychainAvailabilityModule: typeof import('./metrics.js').recordKeychainAvailability; let recordTokenStorageInitializationModule: typeof import('./metrics.js').recordTokenStorageInitialization; + let recordInvalidChunkModule: typeof import('./metrics.js').recordInvalidChunk; beforeEach(async () => { vi.resetModules(); @@ -154,6 +155,7 @@ describe('Telemetry Metrics', () => { metricsJsModule.recordKeychainAvailability; recordTokenStorageInitializationModule = metricsJsModule.recordTokenStorageInitialization; + recordInvalidChunkModule = metricsJsModule.recordInvalidChunk; const otelApiModule = await import('@opentelemetry/api'); @@ -1555,5 +1557,27 @@ describe('Telemetry Metrics', () => { }); }); }); + + describe('recordInvalidChunk', () => { + it('should not record metrics if not initialized', () => { + const config = makeFakeConfig({}); + recordInvalidChunkModule(config); + expect(mockCounterAddFn).not.toHaveBeenCalled(); + }); + + it('should record invalid chunk when initialized', () => { + const config = makeFakeConfig({}); + initializeMetricsModule(config); + mockCounterAddFn.mockClear(); + + recordInvalidChunkModule(config); + + expect(mockCounterAddFn).toHaveBeenCalledWith(1, { + 'session.id': 'test-session-id', + 'installation.id': 'test-installation-id', + 'user.email': 'test@example.com', + }); + }); + }); }); });