fix(cli): handle InvalidStream event gracefully without throwing (#26218)

This commit is contained in:
Adam Weidman
2026-04-29 16:27:53 -04:00
committed by GitHub
parent 3aedbbc067
commit 88626f37e3
2 changed files with 114 additions and 10 deletions
+89 -8
View File
@@ -703,7 +703,7 @@ describe('runNonInteractive', () => {
createStreamFromEvents(events),
);
vi.mocked(mockConfig.getOutputFormat).mockReturnValue(OutputFormat.JSON);
vi.mocked(uiTelemetryService.getMetrics).mockReturnValue(
vi.spyOn(uiTelemetryService, 'getMetrics').mockReturnValue(
MOCK_SESSION_METRICS,
);
@@ -793,7 +793,7 @@ describe('runNonInteractive', () => {
.mockReturnValueOnce(createStreamFromEvents(secondCallEvents));
vi.mocked(mockConfig.getOutputFormat).mockReturnValue(OutputFormat.JSON);
vi.mocked(uiTelemetryService.getMetrics).mockReturnValue(
vi.spyOn(uiTelemetryService, 'getMetrics').mockReturnValue(
MOCK_SESSION_METRICS,
);
@@ -836,7 +836,7 @@ describe('runNonInteractive', () => {
createStreamFromEvents(events),
);
vi.mocked(mockConfig.getOutputFormat).mockReturnValue(OutputFormat.JSON);
vi.mocked(uiTelemetryService.getMetrics).mockReturnValue(
vi.spyOn(uiTelemetryService, 'getMetrics').mockReturnValue(
MOCK_SESSION_METRICS,
);
@@ -1530,7 +1530,7 @@ describe('runNonInteractive', () => {
vi.mocked(mockConfig.getOutputFormat).mockReturnValue(
OutputFormat.STREAM_JSON,
);
vi.mocked(uiTelemetryService.getMetrics).mockReturnValue(
vi.spyOn(uiTelemetryService, 'getMetrics').mockReturnValue(
MOCK_SESSION_METRICS,
);
@@ -1692,7 +1692,7 @@ describe('runNonInteractive', () => {
vi.mocked(mockConfig.getOutputFormat).mockReturnValue(
OutputFormat.STREAM_JSON,
);
vi.mocked(uiTelemetryService.getMetrics).mockReturnValue(
vi.spyOn(uiTelemetryService, 'getMetrics').mockReturnValue(
MOCK_SESSION_METRICS,
);
@@ -1867,7 +1867,7 @@ describe('runNonInteractive', () => {
it('should write JSON output when a tool call returns STOP_EXECUTION error', async () => {
vi.mocked(mockConfig.getOutputFormat).mockReturnValue(OutputFormat.JSON);
vi.mocked(uiTelemetryService.getMetrics).mockReturnValue(
vi.spyOn(uiTelemetryService, 'getMetrics').mockReturnValue(
MOCK_SESSION_METRICS,
);
@@ -1931,7 +1931,7 @@ describe('runNonInteractive', () => {
vi.mocked(mockConfig.getOutputFormat).mockReturnValue(
OutputFormat.STREAM_JSON,
);
vi.mocked(uiTelemetryService.getMetrics).mockReturnValue(
vi.spyOn(uiTelemetryService, 'getMetrics').mockReturnValue(
MOCK_SESSION_METRICS,
);
@@ -2037,6 +2037,87 @@ describe('runNonInteractive', () => {
expect(mockGeminiClient.sendMessageStream).toHaveBeenCalledTimes(1);
expect(getWrittenOutput()).toBe('Final answer\n');
});
it('should handle InvalidStream event gracefully in TEXT mode', async () => {
const events: ServerGeminiStreamEvent[] = [
{ type: GeminiEventType.InvalidStream },
];
mockGeminiClient.sendMessageStream.mockReturnValue(
createStreamFromEvents(events),
);
await runNonInteractive({
config: mockConfig,
settings: mockSettings,
input: 'test invalid stream',
prompt_id: 'prompt-id-invalid',
});
expect(processStderrSpy).toHaveBeenCalledWith(
'[ERROR] Invalid stream: The model returned an empty response or malformed tool call.\n',
);
expect(mockGeminiClient.sendMessageStream).toHaveBeenCalledTimes(1);
});
it('should handle InvalidStream event gracefully in STREAM_JSON mode', async () => {
vi.spyOn(uiTelemetryService, 'getMetrics').mockReturnValue(
MOCK_SESSION_METRICS,
);
vi.spyOn(mockConfig, 'getOutputFormat').mockReturnValue(
OutputFormat.STREAM_JSON,
);
const events: ServerGeminiStreamEvent[] = [
{ type: GeminiEventType.InvalidStream },
];
mockGeminiClient.sendMessageStream.mockReturnValue(
createStreamFromEvents(events),
);
await runNonInteractive({
config: mockConfig,
settings: mockSettings,
input: 'test invalid stream',
prompt_id: 'prompt-id-invalid',
});
const output = getWrittenOutput();
expect(output).toContain('"type":"error"');
expect(output).toContain('"severity":"error"');
expect(output).toContain(
'Invalid stream: The model returned an empty response or malformed tool call.',
);
expect(mockGeminiClient.sendMessageStream).toHaveBeenCalledTimes(1);
});
it('should handle InvalidStream event gracefully in JSON mode', async () => {
vi.spyOn(uiTelemetryService, 'getMetrics').mockReturnValue(
MOCK_SESSION_METRICS,
);
vi.spyOn(mockConfig, 'getOutputFormat').mockReturnValue(
OutputFormat.JSON,
);
const events: ServerGeminiStreamEvent[] = [
{ type: GeminiEventType.InvalidStream },
];
mockGeminiClient.sendMessageStream.mockReturnValue(
createStreamFromEvents(events),
);
await runNonInteractive({
config: mockConfig,
settings: mockSettings,
input: 'test invalid stream',
prompt_id: 'prompt-id-invalid',
});
const output = getWrittenOutput();
expect(output).toContain('"error": {');
expect(output).toContain('"type": "INVALID_STREAM"');
expect(output).toContain(
'Invalid stream: The model returned an empty response or malformed tool call.',
);
expect(mockGeminiClient.sendMessageStream).toHaveBeenCalledTimes(1);
});
});
describe('Output Sanitization', () => {
@@ -2218,7 +2299,7 @@ describe('runNonInteractive', () => {
vi.mocked(mockConfig.getOutputFormat).mockReturnValue(
OutputFormat.STREAM_JSON,
);
vi.mocked(uiTelemetryService.getMetrics).mockReturnValue(
vi.spyOn(uiTelemetryService, 'getMetrics').mockReturnValue(
MOCK_SESSION_METRICS,
);
+25 -2
View File
@@ -295,6 +295,7 @@ export async function runNonInteractive(
let currentMessages: Content[] = [{ role: 'user', parts: query }];
let turnCount = 0;
let invalidStreamError: string | undefined;
while (true) {
turnCount++;
if (
@@ -395,6 +396,21 @@ export async function runNonInteractive(
if (config.getOutputFormat() === OutputFormat.TEXT) {
process.stderr.write(`[WARNING] ${blockMessage}\n`);
}
} else if (event.type === GeminiEventType.InvalidStream) {
invalidStreamError =
'Invalid stream: The model returned an empty response or malformed tool call.';
if (streamFormatter) {
streamFormatter.emitEvent({
type: JsonStreamEventType.ERROR,
timestamp: new Date().toISOString(),
severity: 'error',
message: invalidStreamError,
});
} else if (config.getOutputFormat() === OutputFormat.TEXT) {
process.stderr.write(`[ERROR] ${invalidStreamError}\n`);
}
toolCallRequests.length = 0;
break;
}
}
@@ -508,14 +524,21 @@ export async function runNonInteractive(
streamFormatter.emitEvent({
type: JsonStreamEventType.RESULT,
timestamp: new Date().toISOString(),
status: 'success',
status: invalidStreamError ? 'error' : 'success',
stats: streamFormatter.convertToStreamStats(metrics, durationMs),
});
} else if (config.getOutputFormat() === OutputFormat.JSON) {
const formatter = new JsonFormatter();
const stats = uiTelemetryService.getMetrics();
textOutput.write(
formatter.format(config.getSessionId(), responseText, stats),
formatter.format(
config.getSessionId(),
responseText,
stats,
invalidStreamError
? { type: 'INVALID_STREAM', message: invalidStreamError }
: undefined,
),
);
} else {
textOutput.ensureTrailingNewline(); // Ensure a final newline