From 968e93894963690e09e34af07aaa050719b3df1d Mon Sep 17 00:00:00 2001 From: Sandy Tao Date: Fri, 5 Sep 2025 16:22:54 -0700 Subject: [PATCH] Fix(core): Fix stream validation logic (#7832) --- packages/core/src/core/geminiChat.test.ts | 71 +++++++++++++++-------- packages/core/src/core/geminiChat.ts | 49 ++++++---------- 2 files changed, 65 insertions(+), 55 deletions(-) diff --git a/packages/core/src/core/geminiChat.test.ts b/packages/core/src/core/geminiChat.test.ts index 8592c56091..ee1e53f831 100644 --- a/packages/core/src/core/geminiChat.test.ts +++ b/packages/core/src/core/geminiChat.test.ts @@ -416,9 +416,9 @@ describe('GeminiChat', () => { expect(modelTurn?.parts![0]!.functionCall).toBeDefined(); }); - it('should succeed if the stream ends with an empty part but has a valid finishReason', async () => { - // 1. Mock a stream that ends with an invalid part but has a 'STOP' finish reason. - const streamWithValidFinish = (async function* () { + it('should fail if the stream ends with an empty part and has no finishReason', async () => { + // 1. Mock a stream that ends with an invalid part and has no finish reason. + const streamWithNoFinish = (async function* () { yield { candidates: [ { @@ -429,7 +429,7 @@ describe('GeminiChat', () => { }, ], } as unknown as GenerateContentResponse; - // This second chunk is invalid, but the finishReason should save it from retrying. + // This second chunk is invalid and has no finishReason, so it should fail. yield { candidates: [ { @@ -437,21 +437,19 @@ describe('GeminiChat', () => { role: 'model', parts: [{ text: '' }], }, - finishReason: 'STOP', }, ], } as unknown as GenerateContentResponse; })(); vi.mocked(mockModelsModule.generateContentStream).mockResolvedValue( - streamWithValidFinish, + streamWithNoFinish, ); - // 2. Action & Assert: The stream should complete successfully because the valid - // finishReason overrides the invalid final chunk. + // 2. Action & Assert: The stream should fail because there's no finish reason. const stream = await chat.sendMessageStream( { message: 'test message' }, - 'prompt-id-valid-finish-empty-end', + 'prompt-id-no-finish-empty-end', ); await expect( (async () => { @@ -459,14 +457,7 @@ describe('GeminiChat', () => { /* consume stream */ } })(), - ).resolves.not.toThrow(); - - // 3. Verify history was recorded correctly - const history = chat.getHistory(); - expect(history.length).toBe(2); - const modelTurn = history[1]!; - expect(modelTurn?.parts?.length).toBe(1); // The empty part is discarded - expect(modelTurn?.parts![0]!.text).toBe('Initial content...'); + ).rejects.toThrow(EmptyStreamError); }); it('should not consolidate text into a part that also contains a functionCall', async () => { // 1. Mock the API to stream a malformed part followed by a valid text part. @@ -542,7 +533,10 @@ describe('GeminiChat', () => { // as the important part is consolidating what comes after. yield { candidates: [ - { content: { role: 'model', parts: [{ text: ' World!' }] } }, + { + content: { role: 'model', parts: [{ text: ' World!' }] }, + finishReason: 'STOP', + }, ], } as unknown as GenerateContentResponse; })(); @@ -645,6 +639,7 @@ describe('GeminiChat', () => { { text: 'This is the visible text that should not be lost.' }, ], }, + finishReason: 'STOP', }, ], } as unknown as GenerateContentResponse; @@ -705,7 +700,10 @@ describe('GeminiChat', () => { const emptyStreamResponse = (async function* () { yield { candidates: [ - { content: { role: 'model', parts: [{ thought: true }] } }, + { + content: { role: 'model', parts: [{ thought: true }] }, + finishReason: 'STOP', + }, ], } as unknown as GenerateContentResponse; })(); @@ -975,7 +973,12 @@ describe('GeminiChat', () => { // Second attempt (the retry): A minimal valid stream. (async function* () { yield { - candidates: [{ content: { parts: [{ text: 'Success' }] } }], + candidates: [ + { + content: { parts: [{ text: 'Success' }] }, + finishReason: 'STOP', + }, + ], } as unknown as GenerateContentResponse; })(), ); @@ -1012,7 +1015,10 @@ describe('GeminiChat', () => { (async function* () { yield { candidates: [ - { content: { parts: [{ text: 'Successful response' }] } }, + { + content: { parts: [{ text: 'Successful response' }] }, + finishReason: 'STOP', + }, ], } as unknown as GenerateContentResponse; })(), @@ -1123,7 +1129,12 @@ describe('GeminiChat', () => { // Second attempt succeeds (async function* () { yield { - candidates: [{ content: { parts: [{ text: 'Second answer' }] } }], + candidates: [ + { + content: { parts: [{ text: 'Second answer' }] }, + finishReason: 'STOP', + }, + ], } as unknown as GenerateContentResponse; })(), ); @@ -1272,6 +1283,7 @@ describe('GeminiChat', () => { content: { parts: [{ text: 'Successful response after empty' }], }, + finishReason: 'STOP', }, ], } as unknown as GenerateContentResponse; @@ -1333,13 +1345,23 @@ describe('GeminiChat', () => { } as unknown as GenerateContentResponse; await firstStreamContinuePromise; // Pause the stream yield { - candidates: [{ content: { parts: [{ text: ' part 2' }] } }], + candidates: [ + { + content: { parts: [{ text: ' part 2' }] }, + finishReason: 'STOP', + }, + ], } as unknown as GenerateContentResponse; })(); const secondStreamGenerator = (async function* () { yield { - candidates: [{ content: { parts: [{ text: 'second response' }] } }], + candidates: [ + { + content: { parts: [{ text: 'second response' }] }, + finishReason: 'STOP', + }, + ], } as unknown as GenerateContentResponse; })(); @@ -1424,6 +1446,7 @@ describe('GeminiChat', () => { content: { parts: [{ text: 'Successful final response' }], }, + finishReason: 'STOP', }, ], } as unknown as GenerateContentResponse; diff --git a/packages/core/src/core/geminiChat.ts b/packages/core/src/core/geminiChat.ts index 17da50078f..575fd8a6c1 100644 --- a/packages/core/src/core/geminiChat.ts +++ b/packages/core/src/core/geminiChat.ts @@ -614,21 +614,14 @@ export class GeminiChat { let hasReceivedAnyChunk = false; let hasToolCall = false; let lastChunk: GenerateContentResponse | null = null; - - let isStreamInvalid = false; - let firstInvalidChunkEncountered = false; - let validChunkAfterInvalidEncountered = false; + let lastChunkIsInvalid = false; for await (const chunk of streamResponse) { hasReceivedAnyChunk = true; lastChunk = chunk; if (isValidResponse(chunk)) { - if (firstInvalidChunkEncountered) { - // A valid chunk appeared *after* an invalid one. - validChunkAfterInvalidEncountered = true; - } - + lastChunkIsInvalid = false; const content = chunk.candidates?.[0]?.content; if (content?.parts) { if (content.parts.some((part) => part.thought)) { @@ -646,8 +639,7 @@ export class GeminiChat { this.config, new InvalidChunkEvent('Invalid chunk received from stream.'), ); - isStreamInvalid = true; - firstInvalidChunkEncountered = true; + lastChunkIsInvalid = true; } // Record token usage if this chunk has usageMetadata @@ -662,27 +654,22 @@ export class GeminiChat { throw new EmptyStreamError('Model stream completed without any chunks.'); } - // --- FIX: The entire validation block was restructured for clarity and correctness --- - // Only apply complex validation if an invalid chunk was actually found. - if (isStreamInvalid) { - // Fail immediately if an invalid chunk was not the absolute last chunk. - if (validChunkAfterInvalidEncountered) { - throw new EmptyStreamError( - 'Model stream had invalid intermediate chunks without a tool call.', - ); - } + const hasFinishReason = lastChunk?.candidates?.some( + (candidate) => candidate.finishReason, + ); - if (!hasToolCall) { - // If the *only* invalid part was the last chunk, we still check its finish reason. - const finishReason = lastChunk?.candidates?.[0]?.finishReason; - const isSuccessfulFinish = - finishReason === 'STOP' || finishReason === 'MAX_TOKENS'; - if (!isSuccessfulFinish) { - throw new EmptyStreamError( - 'Model stream ended with an invalid chunk and a failed finish reason.', - ); - } - } + // --- FIX: The entire validation block was restructured for clarity and correctness --- + // Stream validation logic: A stream is considered successful if: + // 1. There's a tool call (tool calls can end without explicit finish reasons), OR + // 2. Both conditions are met: last chunk is valid AND any candidate has a finish reason + // + // We throw an error only when there's no tool call AND either: + // - The last chunk is invalid, OR + // - No candidate in the last chunk has a finish reason + if (!hasToolCall && (lastChunkIsInvalid || !hasFinishReason)) { + throw new EmptyStreamError( + 'Model stream ended with an invalid chunk or missing finish reason.', + ); } // Record model response text from the collected parts