Ignore notifications that don't match schema.

This commit is contained in:
Christian Gunderman
2026-02-09 16:37:12 -08:00
parent 898382ad61
commit 1e589f0692
2 changed files with 58 additions and 1 deletions

View File

@@ -403,6 +403,51 @@ describe('CodeAssistServer', () => {
expect(results[1].candidates?.[0].content?.parts?.[0].text).toBe(' World');
});
it('should ignore SSE messages that do not match the schema', async () => {
const { server, mockRequest } = createTestServer();
const { Readable } = await import('node:stream');
const mockStream = new Readable({
read() {},
});
const mockResponseData1 = {
response: { candidates: [{ content: { parts: [{ text: 'Hello' }] } }] },
};
const mockResponseData2 = {
somethingElse: 'that does not match schema',
};
const mockResponseData3 = {
response: { candidates: [{ content: { parts: [{ text: ' World' }] } }] },
};
mockRequest.mockResolvedValue({ data: mockStream });
const stream = await server.generateContentStream(
{
model: 'test-model',
contents: [{ role: 'user', parts: [{ text: 'request' }] }],
},
'user-prompt-id',
);
setTimeout(() => {
mockStream.push('data: ' + JSON.stringify(mockResponseData1) + '\n\n');
mockStream.push('data: ' + JSON.stringify(mockResponseData2) + '\n\n');
mockStream.push('data: ' + JSON.stringify(mockResponseData3) + '\n\n');
mockStream.push(null);
}, 0);
const results = [];
for await (const res of stream) {
results.push(res);
}
expect(results).toHaveLength(2);
expect(results[0].candidates?.[0].content?.parts?.[0].text).toBe('Hello');
expect(results[1].candidates?.[0].content?.parts?.[0].text).toBe(' World');
});
it('should ignore malformed SSE data', async () => {
const { server, mockRequest } = createTestServer();

View File

@@ -65,6 +65,7 @@ import {
recordConversationOffered,
} from './telemetry.js';
import { getClientMetadata } from './experiments/client_metadata.js';
import { debugLogger } from 'src/utils/debugLogger.js';
/** HTTP options to be used in each of the requests. */
export interface HttpOptions {
/** Additional HTTP headers to be sent with the request. */
@@ -390,6 +391,8 @@ export class CodeAssistServer implements ContentGenerator {
crlfDelay: Infinity, // Recognizes '\r\n' and '\n' as line breaks
});
let seenUnrecognizedNotificationType = false;
let bufferedLines: string[] = [];
for await (const line of rl) {
if (line.startsWith('data: ')) {
@@ -398,7 +401,16 @@ export class CodeAssistServer implements ContentGenerator {
if (bufferedLines.length === 0) {
continue; // no data to yield
}
yield schema.parse(JSON.parse(bufferedLines.join('\n')));
// Ignore notifications that fail to parse.
const result = schema.safeParse(JSON.parse(bufferedLines.join('\n')));
if (result.success) {
yield result.data;
} else if (!seenUnrecognizedNotificationType) {
seenUnrecognizedNotificationType = true;
debugLogger.debug(
`[Client] Unrecognized notification type ${result.data}`,
);
}
bufferedLines = []; // Reset the buffer after yielding
}
// Ignore other lines like comments or id fields