From 88626f37e36647157d4d03bbb2c074635a79b757 Mon Sep 17 00:00:00 2001 From: Adam Weidman <65992621+adamfweidman@users.noreply.github.com> Date: Wed, 29 Apr 2026 16:27:53 -0400 Subject: [PATCH] fix(cli): handle InvalidStream event gracefully without throwing (#26218) --- packages/cli/src/nonInteractiveCli.test.ts | 97 ++++++++++++++++++++-- packages/cli/src/nonInteractiveCli.ts | 27 +++++- 2 files changed, 114 insertions(+), 10 deletions(-) diff --git a/packages/cli/src/nonInteractiveCli.test.ts b/packages/cli/src/nonInteractiveCli.test.ts index 5d0c3d1016..1167bbbce4 100644 --- a/packages/cli/src/nonInteractiveCli.test.ts +++ b/packages/cli/src/nonInteractiveCli.test.ts @@ -703,7 +703,7 @@ describe('runNonInteractive', () => { createStreamFromEvents(events), ); vi.mocked(mockConfig.getOutputFormat).mockReturnValue(OutputFormat.JSON); - vi.mocked(uiTelemetryService.getMetrics).mockReturnValue( + vi.spyOn(uiTelemetryService, 'getMetrics').mockReturnValue( MOCK_SESSION_METRICS, ); @@ -793,7 +793,7 @@ describe('runNonInteractive', () => { .mockReturnValueOnce(createStreamFromEvents(secondCallEvents)); vi.mocked(mockConfig.getOutputFormat).mockReturnValue(OutputFormat.JSON); - vi.mocked(uiTelemetryService.getMetrics).mockReturnValue( + vi.spyOn(uiTelemetryService, 'getMetrics').mockReturnValue( MOCK_SESSION_METRICS, ); @@ -836,7 +836,7 @@ describe('runNonInteractive', () => { createStreamFromEvents(events), ); vi.mocked(mockConfig.getOutputFormat).mockReturnValue(OutputFormat.JSON); - vi.mocked(uiTelemetryService.getMetrics).mockReturnValue( + vi.spyOn(uiTelemetryService, 'getMetrics').mockReturnValue( MOCK_SESSION_METRICS, ); @@ -1530,7 +1530,7 @@ describe('runNonInteractive', () => { vi.mocked(mockConfig.getOutputFormat).mockReturnValue( OutputFormat.STREAM_JSON, ); - vi.mocked(uiTelemetryService.getMetrics).mockReturnValue( + vi.spyOn(uiTelemetryService, 'getMetrics').mockReturnValue( MOCK_SESSION_METRICS, ); @@ -1692,7 +1692,7 @@ describe('runNonInteractive', () => { vi.mocked(mockConfig.getOutputFormat).mockReturnValue( OutputFormat.STREAM_JSON, ); - vi.mocked(uiTelemetryService.getMetrics).mockReturnValue( + vi.spyOn(uiTelemetryService, 'getMetrics').mockReturnValue( MOCK_SESSION_METRICS, ); @@ -1867,7 +1867,7 @@ describe('runNonInteractive', () => { it('should write JSON output when a tool call returns STOP_EXECUTION error', async () => { vi.mocked(mockConfig.getOutputFormat).mockReturnValue(OutputFormat.JSON); - vi.mocked(uiTelemetryService.getMetrics).mockReturnValue( + vi.spyOn(uiTelemetryService, 'getMetrics').mockReturnValue( MOCK_SESSION_METRICS, ); @@ -1931,7 +1931,7 @@ describe('runNonInteractive', () => { vi.mocked(mockConfig.getOutputFormat).mockReturnValue( OutputFormat.STREAM_JSON, ); - vi.mocked(uiTelemetryService.getMetrics).mockReturnValue( + vi.spyOn(uiTelemetryService, 'getMetrics').mockReturnValue( MOCK_SESSION_METRICS, ); @@ -2037,6 +2037,87 @@ describe('runNonInteractive', () => { expect(mockGeminiClient.sendMessageStream).toHaveBeenCalledTimes(1); expect(getWrittenOutput()).toBe('Final answer\n'); }); + + it('should handle InvalidStream event gracefully in TEXT mode', async () => { + const events: ServerGeminiStreamEvent[] = [ + { type: GeminiEventType.InvalidStream }, + ]; + mockGeminiClient.sendMessageStream.mockReturnValue( + createStreamFromEvents(events), + ); + + await runNonInteractive({ + config: mockConfig, + settings: mockSettings, + input: 'test invalid stream', + prompt_id: 'prompt-id-invalid', + }); + + expect(processStderrSpy).toHaveBeenCalledWith( + '[ERROR] Invalid stream: The model returned an empty response or malformed tool call.\n', + ); + expect(mockGeminiClient.sendMessageStream).toHaveBeenCalledTimes(1); + }); + + it('should handle InvalidStream event gracefully in STREAM_JSON mode', async () => { + vi.spyOn(uiTelemetryService, 'getMetrics').mockReturnValue( + MOCK_SESSION_METRICS, + ); + vi.spyOn(mockConfig, 'getOutputFormat').mockReturnValue( + OutputFormat.STREAM_JSON, + ); + const events: ServerGeminiStreamEvent[] = [ + { type: GeminiEventType.InvalidStream }, + ]; + mockGeminiClient.sendMessageStream.mockReturnValue( + createStreamFromEvents(events), + ); + + await runNonInteractive({ + config: mockConfig, + settings: mockSettings, + input: 'test invalid stream', + prompt_id: 'prompt-id-invalid', + }); + + const output = getWrittenOutput(); + expect(output).toContain('"type":"error"'); + expect(output).toContain('"severity":"error"'); + expect(output).toContain( + 'Invalid stream: The model returned an empty response or malformed tool call.', + ); + expect(mockGeminiClient.sendMessageStream).toHaveBeenCalledTimes(1); + }); + + it('should handle InvalidStream event gracefully in JSON mode', async () => { + vi.spyOn(uiTelemetryService, 'getMetrics').mockReturnValue( + MOCK_SESSION_METRICS, + ); + vi.spyOn(mockConfig, 'getOutputFormat').mockReturnValue( + OutputFormat.JSON, + ); + const events: ServerGeminiStreamEvent[] = [ + { type: GeminiEventType.InvalidStream }, + ]; + mockGeminiClient.sendMessageStream.mockReturnValue( + createStreamFromEvents(events), + ); + + await runNonInteractive({ + config: mockConfig, + settings: mockSettings, + input: 'test invalid stream', + prompt_id: 'prompt-id-invalid', + }); + + const output = getWrittenOutput(); + expect(output).toContain('"error": {'); + expect(output).toContain('"type": "INVALID_STREAM"'); + expect(output).toContain( + 'Invalid stream: The model returned an empty response or malformed tool call.', + ); + expect(mockGeminiClient.sendMessageStream).toHaveBeenCalledTimes(1); + }); }); describe('Output Sanitization', () => { @@ -2218,7 +2299,7 @@ describe('runNonInteractive', () => { vi.mocked(mockConfig.getOutputFormat).mockReturnValue( OutputFormat.STREAM_JSON, ); - vi.mocked(uiTelemetryService.getMetrics).mockReturnValue( + vi.spyOn(uiTelemetryService, 'getMetrics').mockReturnValue( MOCK_SESSION_METRICS, ); diff --git a/packages/cli/src/nonInteractiveCli.ts b/packages/cli/src/nonInteractiveCli.ts index 8c1c2ca6a2..8db512f56d 100644 --- a/packages/cli/src/nonInteractiveCli.ts +++ b/packages/cli/src/nonInteractiveCli.ts @@ -295,6 +295,7 @@ export async function runNonInteractive( let currentMessages: Content[] = [{ role: 'user', parts: query }]; let turnCount = 0; + let invalidStreamError: string | undefined; while (true) { turnCount++; if ( @@ -395,6 +396,21 @@ export async function runNonInteractive( if (config.getOutputFormat() === OutputFormat.TEXT) { process.stderr.write(`[WARNING] ${blockMessage}\n`); } + } else if (event.type === GeminiEventType.InvalidStream) { + invalidStreamError = + 'Invalid stream: The model returned an empty response or malformed tool call.'; + if (streamFormatter) { + streamFormatter.emitEvent({ + type: JsonStreamEventType.ERROR, + timestamp: new Date().toISOString(), + severity: 'error', + message: invalidStreamError, + }); + } else if (config.getOutputFormat() === OutputFormat.TEXT) { + process.stderr.write(`[ERROR] ${invalidStreamError}\n`); + } + toolCallRequests.length = 0; + break; } } @@ -508,14 +524,21 @@ export async function runNonInteractive( streamFormatter.emitEvent({ type: JsonStreamEventType.RESULT, timestamp: new Date().toISOString(), - status: 'success', + status: invalidStreamError ? 'error' : 'success', stats: streamFormatter.convertToStreamStats(metrics, durationMs), }); } else if (config.getOutputFormat() === OutputFormat.JSON) { const formatter = new JsonFormatter(); const stats = uiTelemetryService.getMetrics(); textOutput.write( - formatter.format(config.getSessionId(), responseText, stats), + formatter.format( + config.getSessionId(), + responseText, + stats, + invalidStreamError + ? { type: 'INVALID_STREAM', message: invalidStreamError } + : undefined, + ), ); } else { textOutput.ensureTrailingNewline(); // Ensure a final newline