From 1e589f06928b3b740d0795f6c112e98fafee5338 Mon Sep 17 00:00:00 2001 From: Christian Gunderman Date: Mon, 9 Feb 2026 16:37:12 -0800 Subject: [PATCH] Ignore notifications that don't match schema. --- packages/core/src/code_assist/server.test.ts | 45 ++++++++++++++++++++ packages/core/src/code_assist/server.ts | 14 +++++- 2 files changed, 58 insertions(+), 1 deletion(-) diff --git a/packages/core/src/code_assist/server.test.ts b/packages/core/src/code_assist/server.test.ts index 42c1d89922..090fdb5c7c 100644 --- a/packages/core/src/code_assist/server.test.ts +++ b/packages/core/src/code_assist/server.test.ts @@ -403,6 +403,51 @@ describe('CodeAssistServer', () => { expect(results[1].candidates?.[0].content?.parts?.[0].text).toBe(' World'); }); + it('should ignore SSE messages that do not match the schema', async () => { + const { server, mockRequest } = createTestServer(); + + const { Readable } = await import('node:stream'); + const mockStream = new Readable({ + read() {}, + }); + + const mockResponseData1 = { + response: { candidates: [{ content: { parts: [{ text: 'Hello' }] } }] }, + }; + const mockResponseData2 = { + somethingElse: 'that does not match schema', + }; + const mockResponseData3 = { + response: { candidates: [{ content: { parts: [{ text: ' World' }] } }] }, + }; + + mockRequest.mockResolvedValue({ data: mockStream }); + + const stream = await server.generateContentStream( + { + model: 'test-model', + contents: [{ role: 'user', parts: [{ text: 'request' }] }], + }, + 'user-prompt-id', + ); + + setTimeout(() => { + mockStream.push('data: ' + JSON.stringify(mockResponseData1) + '\n\n'); + mockStream.push('data: ' + JSON.stringify(mockResponseData2) + '\n\n'); + mockStream.push('data: ' + JSON.stringify(mockResponseData3) + '\n\n'); + mockStream.push(null); + }, 0); + + const results = []; + for await (const res of stream) { + results.push(res); + } + + expect(results).toHaveLength(2); + expect(results[0].candidates?.[0].content?.parts?.[0].text).toBe('Hello'); + expect(results[1].candidates?.[0].content?.parts?.[0].text).toBe(' World'); + }); + it('should ignore malformed SSE data', async () => { const { server, mockRequest } = createTestServer(); diff --git a/packages/core/src/code_assist/server.ts b/packages/core/src/code_assist/server.ts index c85788e665..1d02840c04 100644 --- a/packages/core/src/code_assist/server.ts +++ b/packages/core/src/code_assist/server.ts @@ -65,6 +65,7 @@ import { recordConversationOffered, } from './telemetry.js'; import { getClientMetadata } from './experiments/client_metadata.js'; +import { debugLogger } from 'src/utils/debugLogger.js'; /** HTTP options to be used in each of the requests. */ export interface HttpOptions { /** Additional HTTP headers to be sent with the request. */ @@ -390,6 +391,8 @@ export class CodeAssistServer implements ContentGenerator { crlfDelay: Infinity, // Recognizes '\r\n' and '\n' as line breaks }); + let seenUnrecognizedNotificationType = false; + let bufferedLines: string[] = []; for await (const line of rl) { if (line.startsWith('data: ')) { @@ -398,7 +401,16 @@ export class CodeAssistServer implements ContentGenerator { if (bufferedLines.length === 0) { continue; // no data to yield } - yield schema.parse(JSON.parse(bufferedLines.join('\n'))); + // Ignore notifications that fail to parse. + const result = schema.safeParse(JSON.parse(bufferedLines.join('\n'))); + if (result.success) { + yield result.data; + } else if (!seenUnrecognizedNotificationType) { + seenUnrecognizedNotificationType = true; + debugLogger.debug( + `[Client] Unrecognized notification type ${result.data}`, + ); + } bufferedLines = []; // Reset the buffer after yielding } // Ignore other lines like comments or id fields