fix(core): prevent unhandled AbortError crash during stream loop detection (#21123)

Co-authored-by: Gaurav <39389231+gsquared94@users.noreply.github.com>
Co-authored-by: ruomeng <ruomeng@google.com>
This commit is contained in:
Juhyuk
2026-03-06 04:12:09 +09:00
committed by GitHub
parent f47cb3c136
commit a830858f91
2 changed files with 74 additions and 13 deletions
+49
View File
@@ -28,6 +28,7 @@ import {
GeminiEventType, GeminiEventType,
Turn, Turn,
type ChatCompressionInfo, type ChatCompressionInfo,
type ServerGeminiStreamEvent,
} from './turn.js'; } from './turn.js';
import { getCoreSystemPrompt } from './prompts.js'; import { getCoreSystemPrompt } from './prompts.js';
import { DEFAULT_GEMINI_MODEL_AUTO } from '../config/models.js'; import { DEFAULT_GEMINI_MODEL_AUTO } from '../config/models.js';
@@ -1118,6 +1119,54 @@ ${JSON.stringify(
// The actual token calculation is unit tested in tokenCalculation.test.ts // The actual token calculation is unit tested in tokenCalculation.test.ts
}); });
it('should cleanly abort and return Turn on LoopDetected without unhandled promise rejections', async () => {
// Arrange
const mockStream = (async function* () {
// Yield an event that will trigger the loop detector
yield { type: 'content', value: 'Looping content' };
})();
mockTurnRunFn.mockReturnValue(mockStream);
const mockChat: Partial<GeminiChat> = {
addHistory: vi.fn(),
setTools: vi.fn(),
getHistory: vi.fn().mockReturnValue([]),
getLastPromptTokenCount: vi.fn(),
};
client['chat'] = mockChat as GeminiChat;
// Mock loop detector to return count > 1 on the first event (loop detected)
vi.spyOn(client['loopDetector'], 'addAndCheck').mockReturnValue({
count: 2,
});
const abortSpy = vi.spyOn(AbortController.prototype, 'abort');
// Act
const stream = client.sendMessageStream(
[{ text: 'Hi' }],
new AbortController().signal,
'prompt-id-1',
);
const events: ServerGeminiStreamEvent[] = [];
let finalResult: Turn | undefined;
while (true) {
const result = await stream.next();
if (result.done) {
finalResult = result.value;
break;
}
events.push(result.value);
}
// Assert
expect(events).toContainEqual({ type: GeminiEventType.LoopDetected });
expect(abortSpy).toHaveBeenCalled();
expect(finalResult).toBeInstanceOf(Turn);
});
it('should return the turn instance after the stream is complete', async () => { it('should return the turn instance after the stream is complete', async () => {
// Arrange // Arrange
const mockStream = (async function* () { const mockStream = (async function* () {
+25 -13
View File
@@ -708,27 +708,22 @@ export class GeminiClient {
let isError = false; let isError = false;
let isInvalidStream = false; let isInvalidStream = false;
let loopDetectedAbort = false;
let loopRecoverResult: { detail?: string } | undefined;
for await (const event of resultStream) { for await (const event of resultStream) {
const loopResult = this.loopDetector.addAndCheck(event); const loopResult = this.loopDetector.addAndCheck(event);
if (loopResult.count > 1) { if (loopResult.count > 1) {
yield { type: GeminiEventType.LoopDetected }; yield { type: GeminiEventType.LoopDetected };
controller.abort(); loopDetectedAbort = true;
return turn; break;
} else if (loopResult.count === 1) { } else if (loopResult.count === 1) {
if (boundedTurns <= 1) { if (boundedTurns <= 1) {
yield { type: GeminiEventType.MaxSessionTurns }; yield { type: GeminiEventType.MaxSessionTurns };
controller.abort(); loopDetectedAbort = true;
return turn; break;
} }
return yield* this._recoverFromLoop( loopRecoverResult = loopResult;
loopResult, break;
signal,
prompt_id,
boundedTurns,
isInvalidStreamRetry,
displayContent,
controller,
);
} }
yield event; yield event;
@@ -742,6 +737,23 @@ export class GeminiClient {
} }
} }
if (loopDetectedAbort) {
controller.abort();
return turn;
}
if (loopRecoverResult) {
return yield* this._recoverFromLoop(
loopRecoverResult,
signal,
prompt_id,
boundedTurns,
isInvalidStreamRetry,
displayContent,
controller,
);
}
if (isError) { if (isError) {
return turn; return turn;
} }