!feat(cli): harden non-interactive agent session handling

This commit is contained in:
Adam Weidman
2026-03-20 14:33:57 -04:00
committed by Adam Weidman
parent 24a4a0da1e
commit 7c7150f487
4 changed files with 300 additions and 52 deletions
@@ -8,14 +8,6 @@ exports[`runNonInteractive > should emit appropriate error event in streaming JS
"
`;
exports[`runNonInteractive > should emit appropriate error event in streaming JSON mode: 'max session turns' 1`] = `
"{"type":"init","timestamp":"<TIMESTAMP>","session_id":"test-session-id","model":"test-model"}
{"type":"message","timestamp":"<TIMESTAMP>","role":"user","content":"Max turns test"}
{"type":"error","timestamp":"<TIMESTAMP>","severity":"error","message":"Maximum session turns exceeded"}
{"type":"result","timestamp":"<TIMESTAMP>","status":"success","stats":{"total_tokens":0,"input_tokens":0,"output_tokens":0,"cached":0,"input":0,"duration_ms":<DURATION>,"tool_calls":0,"models":{}}}
"
`;
exports[`runNonInteractive > should emit appropriate events for streaming JSON output 1`] = `
"{"type":"init","timestamp":"<TIMESTAMP>","session_id":"test-session-id","model":"test-model"}
{"type":"message","timestamp":"<TIMESTAMP>","role":"user","content":"Stream test"}
+240 -26
View File
@@ -295,6 +295,30 @@ describe('runNonInteractive', () => {
expect(streamSpy).toHaveBeenCalledWith({ streamId: expect.any(String) });
});
it('fails fast if the session acknowledges a message send without a stream', async () => {
const { LegacyAgentSession } = await import('@google/gemini-cli-core');
const sendSpy = vi
.spyOn(LegacyAgentSession.prototype, 'send')
.mockResolvedValue({ streamId: null });
const streamSpy = vi.spyOn(LegacyAgentSession.prototype, 'stream');
await expect(
runNonInteractive({
config: mockConfig,
settings: mockSettings,
input: 'Test input',
prompt_id: 'prompt-id-null-stream',
}),
).rejects.toThrow(
'LegacyAgentSession.send() unexpectedly returned no stream for a message send.',
);
expect(streamSpy).not.toHaveBeenCalled();
sendSpy.mockRestore();
streamSpy.mockRestore();
});
it('should register activity logger when GEMINI_CLI_ACTIVITY_LOG_TARGET is set', async () => {
vi.stubEnv('GEMINI_CLI_ACTIVITY_LOG_TARGET', '/tmp/test.jsonl');
const events: ServerGeminiStreamEvent[] = [
@@ -655,6 +679,21 @@ describe('runNonInteractive', () => {
).rejects.toThrow('process.exit(53) called');
});
it('should exit when the session reports max turns through agent_end', async () => {
mockGeminiClient.sendMessageStream.mockReturnValue(
createStreamFromEvents([{ type: GeminiEventType.MaxSessionTurns }]),
);
await expect(
runNonInteractive({
config: mockConfig,
settings: mockSettings,
input: 'Trigger max turns event',
prompt_id: 'prompt-id-max-turns-event',
}),
).rejects.toThrow('process.exit(53) called');
});
it('should preprocess @include commands before sending to the model', async () => {
// 1. Mock the imported atCommandProcessor
const { handleAtCommand } = await import(
@@ -837,6 +876,79 @@ describe('runNonInteractive', () => {
);
});
it('should keep only the final post-tool assistant text in JSON output', async () => {
const toolCallEvent: ServerGeminiStreamEvent = {
type: GeminiEventType.ToolCallRequest,
value: {
callId: 'tool-1',
name: 'testTool',
args: { arg1: 'value1' },
isClientInitiated: false,
prompt_id: 'prompt-id-json-tool-text',
},
};
mockSchedulerSchedule.mockResolvedValue([
{
status: CoreToolCallStatus.Success,
request: toolCallEvent.value,
tool: {} as AnyDeclarativeTool,
invocation: {} as AnyToolInvocation,
response: {
responseParts: [{ text: 'Tool executed successfully' }],
callId: 'tool-1',
error: undefined,
errorType: undefined,
contentLength: undefined,
},
},
]);
mockGeminiClient.sendMessageStream
.mockReturnValueOnce(
createStreamFromEvents([
{ type: GeminiEventType.Content, value: 'Let me check that...' },
toolCallEvent,
{
type: GeminiEventType.Finished,
value: { reason: undefined, usageMetadata: { totalTokenCount: 5 } },
},
]),
)
.mockReturnValueOnce(
createStreamFromEvents([
{ type: GeminiEventType.Content, value: 'Final answer' },
{
type: GeminiEventType.Finished,
value: { reason: undefined, usageMetadata: { totalTokenCount: 3 } },
},
]),
);
vi.mocked(mockConfig.getOutputFormat).mockReturnValue(OutputFormat.JSON);
vi.mocked(uiTelemetryService.getMetrics).mockReturnValue(
MOCK_SESSION_METRICS,
);
await runNonInteractive({
config: mockConfig,
settings: mockSettings,
input: 'Use a tool',
prompt_id: 'prompt-id-json-tool-text',
});
expect(processStdoutSpy).toHaveBeenCalledWith(
JSON.stringify(
{
session_id: 'test-session-id',
response: 'Final answer',
stats: MOCK_SESSION_METRICS,
},
null,
2,
),
);
});
it('should write JSON output with stats for empty response commands', async () => {
// Test the scenario where a command completes but produces no content at all
const events: ServerGeminiStreamEvent[] = [
@@ -953,7 +1065,7 @@ describe('runNonInteractive', () => {
{
session_id: 'test-session-id',
error: {
type: 'Error',
type: 'FatalInputError',
message: 'Invalid command syntax provided',
code: 42,
},
@@ -1074,10 +1186,11 @@ describe('runNonInteractive', () => {
// Spy on handleCancellationError to verify it's called
const errors = await import('./utils/errors.js');
const cancellationSentinel = new Error('Cancelled');
const handleCancellationErrorSpy = vi
.spyOn(errors, 'handleCancellationError')
.mockImplementation(() => {
throw new Error('Cancelled');
throw cancellationSentinel;
});
const events: ServerGeminiStreamEvent[] = [
@@ -1093,7 +1206,7 @@ describe('runNonInteractive', () => {
signal.addEventListener('abort', () => {
clearTimeout(timeout);
setTimeout(() => {
reject(new Error('Aborted')); // This will be caught by nonInteractiveCli and passed to handleError
reject(new Error('Aborted'));
}, 300);
});
});
@@ -1126,20 +1239,10 @@ describe('runNonInteractive', () => {
keypressHandler('\u0003', { ctrl: true, name: 'c' });
}
// The promise should reject with 'Aborted' because our mock stream throws it,
// and nonInteractiveCli catches it and calls handleError, which doesn't necessarily throw.
// Wait, if handleError is called, we should check that.
// But here we want to check if Ctrl+C works.
// In our current setup, Ctrl+C aborts the signal. The stream throws 'Aborted'.
// nonInteractiveCli catches 'Aborted' and calls handleError.
// If we want to test that handleCancellationError is called, we need the loop to detect abortion.
// But our stream throws before the loop can detect it.
// Let's just check that the promise rejects with 'Aborted' for now,
// which proves the abortion signal reached the stream.
await expect(runPromise).rejects.toThrow('Aborted');
// The Ctrl+C path should route through handleCancellationError rather than
// surfacing the raw stream abort.
await expect(runPromise).rejects.toBe(cancellationSentinel);
expect(handleCancellationErrorSpy).toHaveBeenCalledTimes(1);
expect(
processStderrSpy.mock.calls.some(
@@ -1166,6 +1269,79 @@ describe('runNonInteractive', () => {
// but we can also do it manually if needed.
});
it('should honor cancellation that happens before session.send()', async () => {
const originalIsTTY = process.stdin.isTTY;
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const originalSetRawMode = (process.stdin as any).setRawMode;
Object.defineProperty(process.stdin, 'isTTY', {
value: true,
configurable: true,
});
if (!originalSetRawMode) {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
(process.stdin as any).setRawMode = vi.fn();
}
const stdinOnSpy = vi.spyOn(process.stdin, 'on').mockImplementation(
(
event: string | symbol,
listener: (...args: unknown[]) => void,
) => {
if (event === 'keypress') {
listener('\u0003', { ctrl: true, name: 'c' });
}
return process.stdin;
},
);
// eslint-disable-next-line @typescript-eslint/no-explicit-any
vi.spyOn(process.stdin as any, 'setRawMode').mockImplementation(() => true);
vi.spyOn(process.stdin, 'resume').mockImplementation(() => process.stdin);
vi.spyOn(process.stdin, 'pause').mockImplementation(() => process.stdin);
vi.spyOn(process.stdin, 'removeAllListeners').mockImplementation(
() => process.stdin,
);
const errors = await import('./utils/errors.js');
const cancellationSentinel = new Error('Cancelled before send');
const handleCancellationErrorSpy = vi
.spyOn(errors, 'handleCancellationError')
.mockImplementation(() => {
throw cancellationSentinel;
});
const { LegacyAgentSession } = await import('@google/gemini-cli-core');
const sendSpy = vi.spyOn(LegacyAgentSession.prototype, 'send');
await expect(
runNonInteractive({
config: mockConfig,
settings: mockSettings,
input: 'Cancelled query',
prompt_id: 'prompt-id-pre-send-cancel',
}),
).rejects.toBe(cancellationSentinel);
expect(handleCancellationErrorSpy).toHaveBeenCalledTimes(1);
expect(sendSpy).not.toHaveBeenCalled();
expect(stdinOnSpy).toHaveBeenCalled();
handleCancellationErrorSpy.mockRestore();
sendSpy.mockRestore();
Object.defineProperty(process.stdin, 'isTTY', {
value: originalIsTTY,
configurable: true,
});
if (originalSetRawMode) {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
(process.stdin as any).setRawMode = originalSetRawMode;
} else {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
delete (process.stdin as any).setRawMode;
}
});
it('should throw FatalInputError if a command requires confirmation', async () => {
const mockCommand = {
name: 'confirm',
@@ -1679,17 +1855,9 @@ describe('runNonInteractive', () => {
input: 'Loop test',
promptId: 'prompt-id-loop',
},
{
name: 'max session turns',
events: [
{ type: GeminiEventType.MaxSessionTurns },
] as ServerGeminiStreamEvent[],
input: 'Max turns test',
promptId: 'prompt-id-max-turns',
},
])(
'should emit appropriate error event in streaming JSON mode: $name',
async ({ events, input, promptId }) => {
async ({ name, events, input, promptId }) => {
vi.mocked(mockConfig.getOutputFormat).mockReturnValue(
OutputFormat.STREAM_JSON,
);
@@ -1727,6 +1895,52 @@ describe('runNonInteractive', () => {
},
);
it('should emit a terminal max-turns error event in streaming JSON mode', async () => {
vi.mocked(mockConfig.getOutputFormat).mockReturnValue(
OutputFormat.STREAM_JSON,
);
vi.mocked(uiTelemetryService.getMetrics).mockReturnValue(
MOCK_SESSION_METRICS,
);
mockGeminiClient.sendMessageStream.mockReturnValue(
createStreamFromEvents([
{ type: GeminiEventType.MaxSessionTurns },
{
type: GeminiEventType.Finished,
value: { reason: undefined, usageMetadata: { totalTokenCount: 0 } },
},
]),
);
try {
await runNonInteractive({
config: mockConfig,
settings: mockSettings,
input: 'Max turns test',
prompt_id: 'prompt-id-max-turns',
});
} catch (_error) {
// Expected exit
}
const streamEvents = getWrittenOutput()
.trim()
.split('\n')
.map((line) => JSON.parse(line) as Record<string, unknown>);
expect(streamEvents).toHaveLength(3);
expect(streamEvents[2]).toMatchObject({
type: 'result',
status: 'error',
error: {
type: 'FatalTurnLimitedError',
message:
'Reached max session turns for this session. Increase the number of turns by specifying maxSessionTurns in settings.json.',
},
});
});
it('should log error when tool recording fails', async () => {
const toolCallEvent: ServerGeminiStreamEvent = {
type: GeminiEventType.ToolCallRequest,
+58 -17
View File
@@ -181,6 +181,8 @@ export async function runNonInteractive({
};
let errorToHandle: unknown | undefined;
let terminalProcessExitHandled = false;
let abortSession = () => {};
try {
consolePatcher.patch();
@@ -289,14 +291,23 @@ export async function runNonInteractive({
});
// Wire Ctrl+C to session abort
abortController.signal.addEventListener('abort', () => {
abortSession = () => {
void session.abort();
});
};
abortController.signal.addEventListener('abort', abortSession);
if (abortController.signal.aborted) {
return handleCancellationError(config);
}
// Start the agentic loop (runs in background)
const { streamId } = await session.send({
message: geminiPartsToContentParts(query),
});
if (streamId === null) {
throw new Error(
'LegacyAgentSession.send() unexpectedly returned no stream for a message send.',
);
}
const getTextContent = (parts?: ContentPart[]): string | undefined => {
const text = parts
@@ -347,11 +358,23 @@ export async function runNonInteractive({
enumerable: true,
});
}
if (errorMeta?.['status'] !== undefined) {
Object.defineProperty(errToThrow, 'status', {
value: errorMeta['status'],
enumerable: true,
});
}
return errToThrow;
};
const runTerminalExitHandler = (handler: () => never): never => {
terminalProcessExitHandled = true;
return handler();
};
// Consume AgentEvents for output formatting
let responseText = '';
let preToolResponseText: string | undefined;
let streamEnded = false;
for await (const event of session.stream({ streamId })) {
if (streamEnded) break;
@@ -384,6 +407,12 @@ export async function runNonInteractive({
break;
}
case 'tool_request': {
if (config.getOutputFormat() === OutputFormat.JSON) {
// Final JSON output should reflect the last assistant answer after
// any tool orchestration, not intermediate pre-tool text.
preToolResponseText = responseText || preToolResponseText;
responseText = '';
}
if (streamFormatter) {
streamFormatter.emitEvent({
type: JsonStreamEventType.TOOL_USE,
@@ -422,12 +451,33 @@ export async function runNonInteractive({
const errorMsg = getTextContent(event.content) ?? 'Tool error';
if (event.data?.['errorType'] === ToolErrorType.STOP_EXECUTION) {
if (
config.getOutputFormat() === OutputFormat.JSON &&
!responseText &&
preToolResponseText
) {
responseText = preToolResponseText;
}
const stopMessage = `Agent execution stopped: ${errorMsg}`;
if (config.getOutputFormat() === OutputFormat.TEXT) {
process.stderr.write(`${stopMessage}\n`);
}
}
if (event.data?.['errorType'] === ToolErrorType.NO_SPACE_LEFT) {
runTerminalExitHandler(() =>
handleToolError(
event.name,
new Error(errorMsg),
config,
typeof event.data?.['errorType'] === 'string'
? event.data['errorType']
: undefined,
displayText,
),
);
break;
}
handleToolError(
event.name,
new Error(errorMsg),
@@ -471,22 +521,9 @@ export async function runNonInteractive({
}
case 'agent_end': {
if (event.reason === 'aborted') {
handleCancellationError(config);
runTerminalExitHandler(() => handleCancellationError(config));
} else if (event.reason === 'max_turns') {
const isSessionLimit =
typeof event.data?.['maxTurns'] === 'number' &&
typeof event.data?.['turnCount'] === 'number';
if (isSessionLimit) {
handleMaxTurnsExceededError(config);
}
if (streamFormatter) {
streamFormatter.emitEvent({
type: JsonStreamEventType.ERROR,
timestamp: new Date().toISOString(),
severity: 'error',
message: 'Maximum session turns exceeded',
});
}
runTerminalExitHandler(() => handleMaxTurnsExceededError(config));
}
const stopMessage =
@@ -523,12 +560,16 @@ export async function runNonInteractive({
} finally {
// Cleanup stdin cancellation before other cleanup
cleanupStdinCancellation();
abortController.signal.removeEventListener('abort', abortSession);
consolePatcher.cleanup();
coreEvents.off(CoreEvent.UserFeedback, handleUserFeedback);
}
if (errorToHandle) {
if (terminalProcessExitHandled) {
throw errorToHandle;
}
handleError(errorToHandle, config);
}
});
+2 -1
View File
@@ -6,6 +6,7 @@
import stripAnsi from 'strip-ansi';
import type { SessionMetrics } from '../telemetry/uiTelemetry.js';
import { getErrorType } from '../utils/errors.js';
import type { JsonError, JsonOutput } from './types.js';
export class JsonFormatter {
@@ -42,7 +43,7 @@ export class JsonFormatter {
sessionId?: string,
): string {
const jsonError: JsonError = {
type: error.constructor.name,
type: getErrorType(error),
message: stripAnsi(error.message),
...(code && { code }),
};