Fix(quality): Refine Stream Validation Logic (#9150)

This commit is contained in:
Sandy Tao
2025-09-22 14:40:09 -07:00
committed by GitHub
parent fcffcfbacb
commit 9c4d1594ac
2 changed files with 192 additions and 81 deletions

View File

@@ -69,15 +69,12 @@ vi.mock('../fallback/handler.js', () => ({
handleFallback: mockHandleFallback,
}));
const { mockLogInvalidChunk, mockLogContentRetry, mockLogContentRetryFailure } =
vi.hoisted(() => ({
mockLogInvalidChunk: vi.fn(),
mockLogContentRetry: vi.fn(),
mockLogContentRetryFailure: vi.fn(),
}));
const { mockLogContentRetry, mockLogContentRetryFailure } = vi.hoisted(() => ({
mockLogContentRetry: vi.fn(),
mockLogContentRetryFailure: vi.fn(),
}));
vi.mock('../telemetry/loggers.js', () => ({
logInvalidChunk: mockLogInvalidChunk,
logContentRetry: mockLogContentRetry,
logContentRetryFailure: mockLogContentRetryFailure,
}));
@@ -454,7 +451,7 @@ describe('GeminiChat', () => {
'This is the visible text that should not be lost.',
);
});
it('should add a placeholder model turn when a tool call is followed by an empty stream response', async () => {
it('should throw an error when a tool call is followed by an empty stream response', async () => {
// 1. Setup: A history where the model has just made a function call.
const initialHistory: Content[] = [
{
@@ -503,23 +500,164 @@ describe('GeminiChat', () => {
},
'prompt-id-stream-1',
);
for await (const _ of stream) {
// This loop consumes the stream to trigger the internal logic.
}
// 4. Assert: The history should now have four valid, alternating turns.
const history = chat.getHistory();
expect(history.length).toBe(4);
// 4. Assert: The stream processing should throw an EmptyStreamError.
await expect(
(async () => {
for await (const _ of stream) {
// This loop consumes the stream to trigger the internal logic.
}
})(),
).rejects.toThrow(EmptyStreamError);
});
// The final turn must be the empty model placeholder.
const lastTurn = history[3]!;
expect(lastTurn.role).toBe('model');
expect(lastTurn?.parts?.length).toBe(0);
it('should succeed when there is a tool call without finish reason', async () => {
// Setup: Stream with tool call but no finish reason
const streamWithToolCall = (async function* () {
yield {
candidates: [
{
content: {
role: 'model',
parts: [
{
functionCall: {
name: 'test_function',
args: { param: 'value' },
},
},
],
},
// No finishReason
},
],
} as unknown as GenerateContentResponse;
})();
// The second-to-last turn must be the function response we sent.
const secondToLastTurn = history[2]!;
expect(secondToLastTurn.role).toBe('user');
expect(secondToLastTurn?.parts![0]!.functionResponse).toBeDefined();
vi.mocked(mockContentGenerator.generateContentStream).mockResolvedValue(
streamWithToolCall,
);
const stream = await chat.sendMessageStream(
'test-model',
{ message: 'test' },
'prompt-id-1',
);
// Should not throw an error
await expect(
(async () => {
for await (const _ of stream) {
// consume stream
}
})(),
).resolves.not.toThrow();
});
it('should throw EmptyStreamError when no tool call and no finish reason', async () => {
// Setup: Stream with text but no finish reason and no tool call
const streamWithoutFinishReason = (async function* () {
yield {
candidates: [
{
content: {
role: 'model',
parts: [{ text: 'some response' }],
},
// No finishReason
},
],
} as unknown as GenerateContentResponse;
})();
vi.mocked(mockContentGenerator.generateContentStream).mockResolvedValue(
streamWithoutFinishReason,
);
const stream = await chat.sendMessageStream(
'test-model',
{ message: 'test' },
'prompt-id-1',
);
await expect(
(async () => {
for await (const _ of stream) {
// consume stream
}
})(),
).rejects.toThrow(EmptyStreamError);
});
it('should throw EmptyStreamError when no tool call and empty response text', async () => {
// Setup: Stream with finish reason but empty response (only thoughts)
const streamWithEmptyResponse = (async function* () {
yield {
candidates: [
{
content: {
role: 'model',
parts: [{ thought: 'thinking...' }],
},
finishReason: 'STOP',
},
],
} as unknown as GenerateContentResponse;
})();
vi.mocked(mockContentGenerator.generateContentStream).mockResolvedValue(
streamWithEmptyResponse,
);
const stream = await chat.sendMessageStream(
'test-model',
{ message: 'test' },
'prompt-id-1',
);
await expect(
(async () => {
for await (const _ of stream) {
// consume stream
}
})(),
).rejects.toThrow(EmptyStreamError);
});
it('should succeed when there is finish reason and response text', async () => {
// Setup: Stream with both finish reason and text content
const validStream = (async function* () {
yield {
candidates: [
{
content: {
role: 'model',
parts: [{ text: 'valid response' }],
},
finishReason: 'STOP',
},
],
} as unknown as GenerateContentResponse;
})();
vi.mocked(mockContentGenerator.generateContentStream).mockResolvedValue(
validStream,
);
const stream = await chat.sendMessageStream(
'test-model',
{ message: 'test' },
'prompt-id-1',
);
// Should not throw an error
await expect(
(async () => {
for await (const _ of stream) {
// consume stream
}
})(),
).resolves.not.toThrow();
});
it('should call generateContentStream with the correct parameters', async () => {
@@ -690,7 +828,6 @@ describe('GeminiChat', () => {
}
// Assertions
expect(mockLogInvalidChunk).toHaveBeenCalledTimes(1);
expect(mockLogContentRetry).toHaveBeenCalledTimes(1);
expect(mockLogContentRetryFailure).not.toHaveBeenCalled();
expect(mockContentGenerator.generateContentStream).toHaveBeenCalledTimes(
@@ -758,7 +895,6 @@ describe('GeminiChat', () => {
expect(mockContentGenerator.generateContentStream).toHaveBeenCalledTimes(
3,
);
expect(mockLogInvalidChunk).toHaveBeenCalledTimes(3);
expect(mockLogContentRetry).toHaveBeenCalledTimes(2);
expect(mockLogContentRetryFailure).toHaveBeenCalledTimes(1);

View File

@@ -29,13 +29,11 @@ import type { StructuredError } from './turn.js';
import {
logContentRetry,
logContentRetryFailure,
logInvalidChunk,
} from '../telemetry/loggers.js';
import { ChatRecordingService } from '../services/chatRecordingService.js';
import {
ContentRetryEvent,
ContentRetryFailureEvent,
InvalidChunkEvent,
} from '../telemetry/types.js';
import { handleFallback } from '../fallback/handler.js';
import { isFunctionResponse } from '../utils/messageInspectors.js';
@@ -491,19 +489,14 @@ export class GeminiChat {
streamResponse: AsyncGenerator<GenerateContentResponse>,
): AsyncGenerator<GenerateContentResponse> {
const modelResponseParts: Part[] = [];
let hasReceivedAnyChunk = false;
let hasReceivedValidChunk = false;
let hasToolCall = false;
let lastChunk: GenerateContentResponse | null = null;
let lastChunkIsInvalid = false;
let hasFinishReason = false;
for await (const chunk of this.stopBeforeSecondMutator(streamResponse)) {
hasReceivedAnyChunk = true;
lastChunk = chunk;
hasFinishReason =
chunk?.candidates?.some((candidate) => candidate.finishReason) ?? false;
if (isValidResponse(chunk)) {
hasReceivedValidChunk = true;
lastChunkIsInvalid = false;
const content = chunk.candidates?.[0]?.content;
if (content?.parts) {
if (content.parts.some((part) => part.thought)) {
@@ -518,12 +511,6 @@ export class GeminiChat {
...content.parts.filter((part) => !part.thought),
);
}
} else {
logInvalidChunk(
this.config,
new InvalidChunkEvent('Invalid chunk received from stream.'),
);
lastChunkIsInvalid = true;
}
// Record token usage if this chunk has usageMetadata
@@ -539,46 +526,6 @@ export class GeminiChat {
yield chunk; // Yield every chunk to the UI immediately.
}
if (!hasReceivedAnyChunk) {
throw new EmptyStreamError('Model stream completed without any chunks.');
}
const hasFinishReason = lastChunk?.candidates?.some(
(candidate) => candidate.finishReason,
);
// Stream validation logic: A stream is considered successful if:
// 1. There's a tool call (tool calls can end without explicit finish reasons), OR
// 2. There's a finish reason AND the last chunk is valid (or we haven't received any valid chunks)
//
// We throw an error only when there's no tool call AND:
// - No finish reason, OR
// - Last chunk is invalid after receiving valid content
if (
!hasToolCall &&
(!hasFinishReason || (lastChunkIsInvalid && !hasReceivedValidChunk))
) {
throw new EmptyStreamError(
'Model stream ended with an invalid chunk or missing finish reason.',
);
}
// Record model response text from the collected parts
if (modelResponseParts.length > 0) {
const responseText = modelResponseParts
.filter((part) => part.text)
.map((part) => part.text)
.join('');
if (responseText.trim()) {
this.chatRecordingService.recordMessage({
model,
type: 'gemini',
content: responseText,
});
}
}
// String thoughts and consolidate text parts.
const consolidatedParts: Part[] = [];
for (const part of modelResponseParts) {
@@ -594,6 +541,34 @@ export class GeminiChat {
}
}
const responseText = consolidatedParts
.filter((part) => part.text)
.map((part) => part.text)
.join('')
.trim();
// Record model response text from the collected parts
if (responseText) {
this.chatRecordingService.recordMessage({
model,
type: 'gemini',
content: responseText,
});
}
// Stream validation logic: A stream is considered successful if:
// 1. There's a tool call (tool calls can end without explicit finish reasons), OR
// 2. There's a finish reason AND we have non-empty response text
//
// We throw an error only when there's no tool call AND:
// - No finish reason, OR
// - Empty response text (e.g., only thoughts with no actual content)
if (!hasToolCall && (!hasFinishReason || !responseText)) {
throw new EmptyStreamError(
'Model stream ended with an invalid chunk or missing finish reason.',
);
}
this.history.push({ role: 'model', parts: consolidatedParts });
}