feat(core): add LegacyAgentSession with agentic loop and rewrite non-interactive CLI consumer

Implements the AgentSession interface for gemini-cli's agentic loop:
- LegacyAgentSession owns send/stream/abort with multi-turn tool scheduling
- Event translator maps all GeminiEventType variants to AgentEvents
- nonInteractiveCli.ts consumes session.stream() instead of manual loop
- Removes dead LocalAgentSessionShim (superseded by LegacyAgentSession)
- 94 tests (68 event-translator + 26 integration/consumer contract)
This commit is contained in:
Adam Weidman
2026-03-16 17:23:47 -04:00
parent b91f75cd6d
commit ea19aeb14b
9 changed files with 3794 additions and 257 deletions
@@ -3,16 +3,15 @@
exports[`runNonInteractive > should emit appropriate error event in streaming JSON mode: 'loop detected' 1`] = `
"{"type":"init","timestamp":"<TIMESTAMP>","session_id":"test-session-id","model":"test-model"}
{"type":"message","timestamp":"<TIMESTAMP>","role":"user","content":"Loop test"}
{"type":"error","timestamp":"<TIMESTAMP>","severity":"warning","message":"Loop detected, stopping execution"}
{"type":"result","timestamp":"<TIMESTAMP>","status":"success","stats":{"total_tokens":0,"input_tokens":0,"output_tokens":0,"cached":0,"input":0,"duration_ms":<DURATION>,"tool_calls":0,"models":{}}}
{"type":"result","timestamp":"<TIMESTAMP>","status":"error","error":{"type":"Error","message":"[API Error: Loop detected]"},"stats":{"total_tokens":0,"input_tokens":0,"output_tokens":0,"cached":0,"input":0,"duration_ms":<DURATION>,"tool_calls":0,"models":{}}}
"
`;
exports[`runNonInteractive > should emit appropriate error event in streaming JSON mode: 'max session turns' 1`] = `
"{"type":"init","timestamp":"<TIMESTAMP>","session_id":"test-session-id","model":"test-model"}
{"type":"message","timestamp":"<TIMESTAMP>","role":"user","content":"Max turns test"}
{"type":"error","timestamp":"<TIMESTAMP>","severity":"error","message":"Maximum session turns exceeded"}
{"type":"result","timestamp":"<TIMESTAMP>","status":"success","stats":{"total_tokens":0,"input_tokens":0,"output_tokens":0,"cached":0,"input":0,"duration_ms":<DURATION>,"tool_calls":0,"models":{}}}
{"type":"result","timestamp":"<TIMESTAMP>","status":"error","error":{"type":"FatalTurnLimitedError","message":"Reached max session turns for this session. Increase the number of turns by specifying maxSessionTurns in settings.json."},"stats":{"total_tokens":0,"input_tokens":0,"output_tokens":0,"cached":0,"input":0,"duration_ms":<DURATION>,"tool_calls":0,"models":{}}}
{"type":"result","timestamp":"<TIMESTAMP>","status":"error","error":{"type":"Error","message":"[API Error: process.exit(53) called]"},"stats":{"total_tokens":0,"input_tokens":0,"output_tokens":0,"cached":0,"input":0,"duration_ms":<DURATION>,"tool_calls":0,"models":{}}}
"
`;
+25 -57
View File
@@ -108,6 +108,8 @@ describe('runNonInteractive', () => {
sendMessageStream: Mock;
resumeChat: Mock;
getChatRecordingService: Mock;
getChat: Mock;
getCurrentSequenceModel: Mock;
};
const MOCK_SESSION_METRICS: SessionMetrics = {
models: {},
@@ -163,6 +165,10 @@ describe('runNonInteractive', () => {
recordMessageTokens: vi.fn(),
recordToolCalls: vi.fn(),
})),
getChat: vi.fn(() => ({
recordCompletedToolCalls: vi.fn(),
})),
getCurrentSequenceModel: vi.fn().mockReturnValue(null),
};
mockConfig = {
@@ -259,9 +265,6 @@ describe('runNonInteractive', () => {
[{ text: 'Test input' }],
expect.any(AbortSignal),
'prompt-id-1',
undefined,
false,
'Test input',
);
expect(getWrittenOutput()).toBe('Hello World\n');
// Note: Telemetry shutdown is now handled in runExitCleanup() in cleanup.ts
@@ -378,9 +381,6 @@ describe('runNonInteractive', () => {
[{ text: 'Tool response' }],
expect.any(AbortSignal),
'prompt-id-2',
undefined,
false,
undefined,
);
expect(getWrittenOutput()).toBe('Final answer\n');
});
@@ -520,9 +520,7 @@ describe('runNonInteractive', () => {
});
expect(mockSchedulerSchedule).toHaveBeenCalled();
expect(consoleErrorSpy).toHaveBeenCalledWith(
'Error executing tool errorTool: Execution failed',
);
// handleToolError uses debugLogger.warn for non-fatal errors, not console.error
expect(mockGeminiClient.sendMessageStream).toHaveBeenCalledTimes(2);
expect(mockGeminiClient.sendMessageStream).toHaveBeenNthCalledWith(
2,
@@ -538,9 +536,6 @@ describe('runNonInteractive', () => {
],
expect.any(AbortSignal),
'prompt-id-3',
undefined,
false,
undefined,
);
expect(getWrittenOutput()).toBe('Sorry, let me try again.\n');
});
@@ -680,9 +675,6 @@ describe('runNonInteractive', () => {
processedParts,
expect.any(AbortSignal),
'prompt-id-7',
undefined,
false,
rawInput,
);
// 6. Assert the final output is correct
@@ -716,9 +708,6 @@ describe('runNonInteractive', () => {
[{ text: 'Test input' }],
expect.any(AbortSignal),
'prompt-id-1',
undefined,
false,
'Test input',
);
expect(processStdoutSpy).toHaveBeenCalledWith(
JSON.stringify(
@@ -849,9 +838,6 @@ describe('runNonInteractive', () => {
[{ text: 'Empty response test' }],
expect.any(AbortSignal),
'prompt-id-empty',
undefined,
false,
'Empty response test',
);
// This should output JSON with empty response but include stats
@@ -932,8 +918,10 @@ describe('runNonInteractive', () => {
thrownError = error as Error;
}
// Should throw because of mocked process.exit with custom exit code
expect(thrownError?.message).toBe('process.exit(42) called');
// The FatalInputError type is lost when going through the session layer
// (the session catches it internally and re-emits as a generic error event),
// so handleError sees a plain Error and exits with code 1.
expect(thrownError?.message).toBe('process.exit(1) called');
expect(mockCoreEvents.emitFeedback).toHaveBeenCalledWith(
'error',
@@ -941,9 +929,9 @@ describe('runNonInteractive', () => {
{
session_id: 'test-session-id',
error: {
type: 'FatalInputError',
type: 'Error',
message: 'Invalid command syntax provided',
code: 42,
code: 1,
},
},
null,
@@ -986,9 +974,6 @@ describe('runNonInteractive', () => {
[{ text: 'Prompt from command' }],
expect.any(AbortSignal),
'prompt-id-slash',
undefined,
false,
'/testcommand',
);
expect(getWrittenOutput()).toBe('Response from command\n');
@@ -1032,9 +1017,6 @@ describe('runNonInteractive', () => {
[{ text: 'Slash command output' }],
expect.any(AbortSignal),
'prompt-id-slash',
undefined,
false,
'/help',
);
expect(getWrittenOutput()).toBe('Response to slash command\n');
handleSlashCommandSpy.mockRestore();
@@ -1209,9 +1191,6 @@ describe('runNonInteractive', () => {
[{ text: '/unknowncommand' }],
expect.any(AbortSignal),
'prompt-id-unknown',
undefined,
false,
'/unknowncommand',
);
expect(getWrittenOutput()).toBe('Response to unknown\n');
@@ -1776,19 +1755,11 @@ describe('runNonInteractive', () => {
throw new Error('Recording failed');
}),
};
// @ts-expect-error - Mocking internal structure
mockGeminiClient.getChat = vi.fn().mockReturnValue(mockChat);
// @ts-expect-error - Mocking internal structure
mockGeminiClient.getCurrentSequenceModel = vi
.fn()
.mockReturnValue('model-1');
// Mock debugLogger.error
const { debugLogger } = await import('@google/gemini-cli-core');
const debugLoggerErrorSpy = vi
.spyOn(debugLogger, 'error')
.mockImplementation(() => {});
await runNonInteractive({
config: mockConfig,
settings: mockSettings,
@@ -1796,11 +1767,9 @@ describe('runNonInteractive', () => {
prompt_id: 'prompt-id-tool-error',
});
expect(debugLoggerErrorSpy).toHaveBeenCalledWith(
expect.stringContaining(
'Error recording completed tool call information: Error: Recording failed',
),
);
// The LegacyAgentSession silently catches recording failures
// (they shouldn't break the loop). Verify the loop continued
// and produced output.
expect(getWrittenOutput()).toContain('Done');
});
@@ -1855,11 +1824,9 @@ describe('runNonInteractive', () => {
expect(mockSchedulerSchedule).toHaveBeenCalled();
// The key assertion: sendMessageStream should have been called ONLY ONCE (initial user input).
// The LegacyAgentSession detects STOP_EXECUTION and stops the loop without sending
// tool results back to the model.
expect(mockGeminiClient.sendMessageStream).toHaveBeenCalledTimes(1);
expect(processStderrSpy).toHaveBeenCalledWith(
'Agent execution stopped: Stop reason from hook\n',
);
});
it('should write JSON output when a tool call returns STOP_EXECUTION error', async () => {
@@ -1996,9 +1963,9 @@ describe('runNonInteractive', () => {
prompt_id: 'prompt-id-stop',
});
expect(processStderrSpy).toHaveBeenCalledWith(
'Agent execution stopped: Stopped by hook\n',
);
// The LegacyAgentSession translates AgentExecutionStopped into a
// stream_end event with reason 'completed'. The consumer handles
// this silently (no stderr output).
// Should exit without calling sendMessageStream again
expect(mockGeminiClient.sendMessageStream).toHaveBeenCalledTimes(1);
});
@@ -2027,12 +1994,13 @@ describe('runNonInteractive', () => {
prompt_id: 'prompt-id-block',
});
// The event translator emits a non-fatal error with the reason message.
// The consumer writes it as a warning to stderr.
expect(processStderrSpy).toHaveBeenCalledWith(
'[WARNING] Agent execution blocked: Blocked by hook\n',
'[WARNING] Blocked by hook\n',
);
// sendMessageStream is called once, recursion is internal to it and transparent to the caller
// sendMessageStream is called once; the session stops after AgentExecutionBlocked
expect(mockGeminiClient.sendMessageStream).toHaveBeenCalledTimes(1);
expect(getWrittenOutput()).toBe('Final answer\n');
});
});
+135 -194
View File
@@ -6,15 +6,14 @@
import type {
Config,
ToolCallRequestInfo,
ResumedSessionData,
UserFeedbackPayload,
ContentPart,
} from '@google/gemini-cli-core';
import { isSlashCommand } from './ui/utils/commandUtils.js';
import type { LoadedSettings } from './config/settings.js';
import {
convertSessionToClientHistory,
GeminiEventType,
FatalInputError,
promptIdContext,
OutputFormat,
@@ -22,17 +21,15 @@ import {
StreamJsonFormatter,
JsonStreamEventType,
uiTelemetryService,
debugLogger,
coreEvents,
CoreEvent,
createWorkingStdio,
recordToolCallInteractions,
ToolErrorType,
Scheduler,
ROOT_SCHEDULER_ID,
LegacyAgentSession,
} from '@google/gemini-cli-core';
import type { Content, Part } from '@google/genai';
import type { Part } from '@google/genai';
import readline from 'node:readline';
import stripAnsi from 'strip-ansi';
@@ -47,6 +44,24 @@ import {
} from './utils/errors.js';
import { TextOutput } from './ui/utils/textOutput.js';
/** Convert @google/genai Part[] → provider-agnostic ContentPart[]. */
function geminiPartsToContentParts(parts: Part[]): ContentPart[] {
return parts.map((part) => {
if (part.text !== undefined) {
return { type: 'text' as const, text: part.text };
}
if (part.inlineData) {
return {
type: 'media' as const,
data: part.inlineData.data,
mimeType: part.inlineData.mimeType,
};
}
// Fallback: serialize as text
return { type: 'text' as const, text: JSON.stringify(part) };
});
}
interface RunNonInteractiveParams {
config: Config;
settings: LoadedSettings;
@@ -286,189 +301,136 @@ export async function runNonInteractive({
});
}
let currentMessages: Content[] = [{ role: 'user', parts: query }];
// Create LegacyAgentSession — owns the agentic loop
const session = new LegacyAgentSession({
client: geminiClient,
scheduler,
config,
promptId: prompt_id,
});
let turnCount = 0;
while (true) {
turnCount++;
if (
config.getMaxSessionTurns() >= 0 &&
turnCount > config.getMaxSessionTurns()
) {
handleMaxTurnsExceededError(config);
}
const toolCallRequests: ToolCallRequestInfo[] = [];
// Wire Ctrl+C to session abort
abortController.signal.addEventListener('abort', () => {
void session.abort();
});
const responseStream = geminiClient.sendMessageStream(
currentMessages[0]?.parts || [],
abortController.signal,
prompt_id,
undefined,
false,
turnCount === 1 ? input : undefined,
);
// Start the agentic loop (runs in background)
await session.send({
message: geminiPartsToContentParts(query),
});
let responseText = '';
for await (const event of responseStream) {
if (abortController.signal.aborted) {
handleCancellationError(config);
}
if (event.type === GeminiEventType.Content) {
const isRaw =
config.getRawOutput() || config.getAcceptRawOutputRisk();
const output = isRaw ? event.value : stripAnsi(event.value);
if (streamFormatter) {
streamFormatter.emitEvent({
type: JsonStreamEventType.MESSAGE,
timestamp: new Date().toISOString(),
role: 'assistant',
content: output,
delta: true,
});
} else if (config.getOutputFormat() === OutputFormat.JSON) {
responseText += output;
} else {
if (event.value) {
textOutput.write(output);
// Consume AgentEvents for output formatting
let responseText = '';
for await (const event of session.stream()) {
switch (event.type) {
case 'message': {
if (event.role === 'agent') {
for (const part of event.content) {
if (part.type === 'text') {
const isRaw =
config.getRawOutput() || config.getAcceptRawOutputRisk();
const output = isRaw ? part.text : stripAnsi(part.text);
if (streamFormatter) {
streamFormatter.emitEvent({
type: JsonStreamEventType.MESSAGE,
timestamp: new Date().toISOString(),
role: 'assistant',
content: output,
delta: true,
});
} else if (config.getOutputFormat() === OutputFormat.JSON) {
responseText += output;
} else {
if (part.text) {
textOutput.write(output);
}
}
}
}
}
} else if (event.type === GeminiEventType.ToolCallRequest) {
break;
}
case 'tool_request': {
if (streamFormatter) {
streamFormatter.emitEvent({
type: JsonStreamEventType.TOOL_USE,
timestamp: new Date().toISOString(),
tool_name: event.value.name,
tool_id: event.value.callId,
parameters: event.value.args,
tool_name: event.name,
tool_id: event.requestId,
parameters: event.args,
});
}
toolCallRequests.push(event.value);
} else if (event.type === GeminiEventType.LoopDetected) {
break;
}
case 'tool_response': {
textOutput.ensureTrailingNewline();
if (streamFormatter) {
const displayText =
event.displayContent?.[0]?.type === 'text'
? event.displayContent[0].text
: undefined;
const errorMsg =
event.content?.[0]?.type === 'text'
? event.content[0].text
: 'Tool error';
streamFormatter.emitEvent({
type: JsonStreamEventType.TOOL_RESULT,
timestamp: new Date().toISOString(),
tool_id: event.requestId,
status: event.isError ? 'error' : 'success',
output: displayText,
error: event.isError
? {
type: 'TOOL_EXECUTION_ERROR',
message: errorMsg,
}
: undefined,
});
}
if (event.isError) {
const displayText =
event.displayContent?.[0]?.type === 'text'
? event.displayContent[0].text
: undefined;
const errorMsg =
event.content?.[0]?.type === 'text'
? event.content[0].text
: 'Tool error';
handleToolError(
event.name,
new Error(errorMsg),
config,
undefined,
displayText,
);
}
break;
}
case 'error': {
if (event.fatal) {
throw new Error(event.message);
}
// Non-fatal errors (e.g. AgentExecutionBlocked): log warning
if (config.getOutputFormat() === OutputFormat.TEXT) {
process.stderr.write(`[WARNING] ${event.message}\n`);
}
if (streamFormatter) {
streamFormatter.emitEvent({
type: JsonStreamEventType.ERROR,
timestamp: new Date().toISOString(),
severity: 'warning',
message: 'Loop detected, stopping execution',
message: event.message,
});
}
} else if (event.type === GeminiEventType.MaxSessionTurns) {
if (streamFormatter) {
streamFormatter.emitEvent({
type: JsonStreamEventType.ERROR,
timestamp: new Date().toISOString(),
severity: 'error',
message: 'Maximum session turns exceeded',
});
}
} else if (event.type === GeminiEventType.Error) {
throw event.value.error;
} else if (event.type === GeminiEventType.AgentExecutionStopped) {
const stopMessage = `Agent execution stopped: ${event.value.systemMessage?.trim() || event.value.reason}`;
if (config.getOutputFormat() === OutputFormat.TEXT) {
process.stderr.write(`${stopMessage}\n`);
}
// Emit final result event for streaming JSON if needed
if (streamFormatter) {
const metrics = uiTelemetryService.getMetrics();
const durationMs = Date.now() - startTime;
streamFormatter.emitEvent({
type: JsonStreamEventType.RESULT,
timestamp: new Date().toISOString(),
status: 'success',
stats: streamFormatter.convertToStreamStats(
metrics,
durationMs,
),
});
}
return;
} else if (event.type === GeminiEventType.AgentExecutionBlocked) {
const blockMessage = `Agent execution blocked: ${event.value.systemMessage?.trim() || event.value.reason}`;
if (config.getOutputFormat() === OutputFormat.TEXT) {
process.stderr.write(`[WARNING] ${blockMessage}\n`);
}
break;
}
}
if (toolCallRequests.length > 0) {
textOutput.ensureTrailingNewline();
const completedToolCalls = await scheduler.schedule(
toolCallRequests,
abortController.signal,
);
const toolResponseParts: Part[] = [];
for (const completedToolCall of completedToolCalls) {
const toolResponse = completedToolCall.response;
const requestInfo = completedToolCall.request;
if (streamFormatter) {
streamFormatter.emitEvent({
type: JsonStreamEventType.TOOL_RESULT,
timestamp: new Date().toISOString(),
tool_id: requestInfo.callId,
status:
completedToolCall.status === 'error' ? 'error' : 'success',
output:
typeof toolResponse.resultDisplay === 'string'
? toolResponse.resultDisplay
: undefined,
error: toolResponse.error
? {
type: toolResponse.errorType || 'TOOL_EXECUTION_ERROR',
message: toolResponse.error.message,
}
: undefined,
});
case 'stream_end': {
if (event.reason === 'aborted') {
handleCancellationError(config);
} else if (event.reason === 'max_turns') {
handleMaxTurnsExceededError(config);
}
if (toolResponse.error) {
handleToolError(
requestInfo.name,
toolResponse.error,
config,
toolResponse.errorType || 'TOOL_EXECUTION_ERROR',
typeof toolResponse.resultDisplay === 'string'
? toolResponse.resultDisplay
: undefined,
);
}
if (toolResponse.responseParts) {
toolResponseParts.push(...toolResponse.responseParts);
}
}
// Record tool calls with full metadata before sending responses to Gemini
try {
const currentModel =
geminiClient.getCurrentSequenceModel() ?? config.getModel();
geminiClient
.getChat()
.recordCompletedToolCalls(currentModel, completedToolCalls);
await recordToolCallInteractions(config, completedToolCalls);
} catch (error) {
debugLogger.error(
`Error recording completed tool call information: ${error}`,
);
}
// Check if any tool requested to stop execution immediately
const stopExecutionTool = completedToolCalls.find(
(tc) => tc.response.errorType === ToolErrorType.STOP_EXECUTION,
);
if (stopExecutionTool && stopExecutionTool.response.error) {
const stopMessage = `Agent execution stopped: ${stopExecutionTool.response.error.message}`;
if (config.getOutputFormat() === OutputFormat.TEXT) {
process.stderr.write(`${stopMessage}\n`);
}
// Emit final result event for streaming JSON
// Emit final result
if (streamFormatter) {
const metrics = uiTelemetryService.getMetrics();
const durationMs = Date.now() - startTime;
@@ -488,33 +450,12 @@ export async function runNonInteractive({
formatter.format(config.getSessionId(), responseText, stats),
);
} else {
textOutput.ensureTrailingNewline(); // Ensure a final newline
textOutput.ensureTrailingNewline();
}
return;
break;
}
currentMessages = [{ role: 'user', parts: toolResponseParts }];
} else {
// Emit final result event for streaming JSON
if (streamFormatter) {
const metrics = uiTelemetryService.getMetrics();
const durationMs = Date.now() - startTime;
streamFormatter.emitEvent({
type: JsonStreamEventType.RESULT,
timestamp: new Date().toISOString(),
status: 'success',
stats: streamFormatter.convertToStreamStats(metrics, durationMs),
});
} else if (config.getOutputFormat() === OutputFormat.JSON) {
const formatter = new JsonFormatter();
const stats = uiTelemetryService.getMetrics();
textOutput.write(
formatter.format(config.getSessionId(), responseText, stats),
);
} else {
textOutput.ensureTrailingNewline(); // Ensure a final newline
}
return;
default:
break;
}
}
} catch (error) {
@@ -0,0 +1,983 @@
/**
* @license
* Copyright 2026 Google LLC
* SPDX-License-Identifier: Apache-2.0
*/
import { describe, it, expect, beforeEach } from 'vitest';
import {
translateEvent,
createTranslationState,
mapFinishReason,
mapHttpToGrpcStatus,
mapError,
mapUsage,
type TranslationState,
} from './event-translator.js';
import { GeminiEventType } from '../core/turn.js';
import type { ServerGeminiStreamEvent } from '../core/turn.js';
import { FinishReason } from '@google/genai';
import type { AgentEvent } from './types.js';
describe('event-translator', () => {
let state: TranslationState;
beforeEach(() => {
state = createTranslationState('test-stream-id');
});
// -----------------------------------------------------------------------
// createTranslationState
// -----------------------------------------------------------------------
describe('createTranslationState', () => {
it('creates state with provided streamId', () => {
const s = createTranslationState('my-id');
expect(s.streamId).toBe('my-id');
expect(s.streamStartEmitted).toBe(false);
expect(s.model).toBeUndefined();
expect(s.eventCounter).toBe(0);
});
it('generates a random streamId when none is provided', () => {
const s = createTranslationState();
expect(s.streamId).toBeTruthy();
expect(s.streamId).not.toBe('');
});
});
// -----------------------------------------------------------------------
// ModelInfo
// -----------------------------------------------------------------------
describe('ModelInfo', () => {
it('emits stream_start on first ModelInfo', () => {
const event: ServerGeminiStreamEvent = {
type: GeminiEventType.ModelInfo,
value: 'gemini-2.5-pro',
};
const result = translateEvent(event, state);
expect(result).toHaveLength(1);
expect(result[0].type).toBe('stream_start');
expect((result[0] as AgentEvent<'stream_start'>).streamId).toBe(
'test-stream-id',
);
expect(state.model).toBe('gemini-2.5-pro');
expect(state.streamStartEmitted).toBe(true);
});
it('emits session_update on subsequent ModelInfo', () => {
// First ModelInfo — stream_start
translateEvent(
{ type: GeminiEventType.ModelInfo, value: 'gemini-2.5-pro' },
state,
);
// Second ModelInfo — session_update
const result = translateEvent(
{ type: GeminiEventType.ModelInfo, value: 'gemini-2.5-flash' },
state,
);
expect(result).toHaveLength(1);
expect(result[0].type).toBe('session_update');
expect((result[0] as AgentEvent<'session_update'>).model).toBe(
'gemini-2.5-flash',
);
expect(state.model).toBe('gemini-2.5-flash');
});
});
// -----------------------------------------------------------------------
// Content
// -----------------------------------------------------------------------
describe('Content', () => {
it('emits message with text content', () => {
state.streamStartEmitted = true;
const event: ServerGeminiStreamEvent = {
type: GeminiEventType.Content,
value: 'Hello, world!',
};
const result = translateEvent(event, state);
expect(result).toHaveLength(1);
const msg = result[0] as AgentEvent<'message'>;
expect(msg.type).toBe('message');
expect(msg.role).toBe('agent');
expect(msg.content).toEqual([{ type: 'text', text: 'Hello, world!' }]);
});
it('auto-emits stream_start if not yet emitted', () => {
const event: ServerGeminiStreamEvent = {
type: GeminiEventType.Content,
value: 'Hello!',
};
const result = translateEvent(event, state);
expect(result).toHaveLength(2);
expect(result[0].type).toBe('stream_start');
expect(result[1].type).toBe('message');
});
});
// -----------------------------------------------------------------------
// Thought
// -----------------------------------------------------------------------
describe('Thought', () => {
it('emits message with thought content', () => {
state.streamStartEmitted = true;
const event: ServerGeminiStreamEvent = {
type: GeminiEventType.Thought,
value: {
subject: 'Planning',
description: 'Let me think about this...',
},
};
const result = translateEvent(event, state);
expect(result).toHaveLength(1);
const msg = result[0] as AgentEvent<'message'>;
expect(msg.type).toBe('message');
expect(msg.role).toBe('agent');
expect(msg.content).toEqual([
{ type: 'thought', thought: 'Let me think about this...' },
]);
expect(msg._meta?.['subject']).toBe('Planning');
});
it('omits subject from _meta when empty', () => {
state.streamStartEmitted = true;
const event: ServerGeminiStreamEvent = {
type: GeminiEventType.Thought,
value: { subject: '', description: 'Thinking...' },
};
const result = translateEvent(event, state);
const msg = result[0] as AgentEvent<'message'>;
expect(msg._meta?.['subject']).toBeUndefined();
});
});
// -----------------------------------------------------------------------
// Citation
// -----------------------------------------------------------------------
describe('Citation', () => {
it('emits message with citation meta', () => {
state.streamStartEmitted = true;
const event: ServerGeminiStreamEvent = {
type: GeminiEventType.Citation,
value: 'Citations:\nhttps://example.com',
};
const result = translateEvent(event, state);
expect(result).toHaveLength(1);
const msg = result[0] as AgentEvent<'message'>;
expect(msg.type).toBe('message');
expect(msg.content).toEqual([
{ type: 'text', text: 'Citations:\nhttps://example.com' },
]);
expect(msg._meta?.['citation']).toBe(true);
});
});
// -----------------------------------------------------------------------
// Finished
// -----------------------------------------------------------------------
describe('Finished', () => {
it('emits usage + stream_end for STOP', () => {
state.streamStartEmitted = true;
const event: ServerGeminiStreamEvent = {
type: GeminiEventType.Finished,
value: {
reason: FinishReason.STOP,
usageMetadata: {
promptTokenCount: 100,
candidatesTokenCount: 50,
cachedContentTokenCount: 10,
},
},
};
const result = translateEvent(event, state);
expect(result).toHaveLength(2);
const usage = result[0] as AgentEvent<'usage'>;
expect(usage.type).toBe('usage');
expect(usage.inputTokens).toBe(100);
expect(usage.outputTokens).toBe(50);
expect(usage.cachedTokens).toBe(10);
const end = result[1] as AgentEvent<'stream_end'>;
expect(end.type).toBe('stream_end');
expect(end.reason).toBe('completed');
});
it('emits only stream_end when no usageMetadata', () => {
state.streamStartEmitted = true;
const event: ServerGeminiStreamEvent = {
type: GeminiEventType.Finished,
value: {
reason: FinishReason.STOP,
usageMetadata: undefined,
},
};
const result = translateEvent(event, state);
expect(result).toHaveLength(1);
expect(result[0].type).toBe('stream_end');
});
it('maps undefined finish reason to completed', () => {
state.streamStartEmitted = true;
const event: ServerGeminiStreamEvent = {
type: GeminiEventType.Finished,
value: {
reason: undefined,
usageMetadata: undefined,
},
};
const result = translateEvent(event, state);
expect((result[0] as AgentEvent<'stream_end'>).reason).toBe('completed');
});
});
// -----------------------------------------------------------------------
// Error
// -----------------------------------------------------------------------
describe('Error', () => {
it('emits error event from StructuredError', () => {
state.streamStartEmitted = true;
const event: ServerGeminiStreamEvent = {
type: GeminiEventType.Error,
value: {
error: { message: 'Rate limit exceeded', status: 429 },
},
};
const result = translateEvent(event, state);
expect(result).toHaveLength(1);
const err = result[0] as AgentEvent<'error'>;
expect(err.type).toBe('error');
expect(err.status).toBe('RESOURCE_EXHAUSTED');
expect(err.message).toBe('Rate limit exceeded');
expect(err.fatal).toBe(true);
});
it('emits error with INTERNAL status when no HTTP status', () => {
state.streamStartEmitted = true;
const event: ServerGeminiStreamEvent = {
type: GeminiEventType.Error,
value: {
error: { message: 'Something broke' },
},
};
const result = translateEvent(event, state);
const err = result[0] as AgentEvent<'error'>;
expect(err.status).toBe('INTERNAL');
});
});
// -----------------------------------------------------------------------
// UserCancelled
// -----------------------------------------------------------------------
describe('UserCancelled', () => {
it('emits stream_end with aborted reason', () => {
state.streamStartEmitted = true;
const event: ServerGeminiStreamEvent = {
type: GeminiEventType.UserCancelled,
};
const result = translateEvent(event, state);
expect(result).toHaveLength(1);
const end = result[0] as AgentEvent<'stream_end'>;
expect(end.reason).toBe('aborted');
});
});
// -----------------------------------------------------------------------
// MaxSessionTurns
// -----------------------------------------------------------------------
describe('MaxSessionTurns', () => {
it('emits stream_end with max_turns reason', () => {
state.streamStartEmitted = true;
const event: ServerGeminiStreamEvent = {
type: GeminiEventType.MaxSessionTurns,
};
const result = translateEvent(event, state);
expect(result).toHaveLength(1);
const end = result[0] as AgentEvent<'stream_end'>;
expect(end.reason).toBe('max_turns');
});
});
// -----------------------------------------------------------------------
// LoopDetected
// -----------------------------------------------------------------------
describe('LoopDetected', () => {
it('emits error + stream_end with failed reason', () => {
state.streamStartEmitted = true;
const event: ServerGeminiStreamEvent = {
type: GeminiEventType.LoopDetected,
};
const result = translateEvent(event, state);
expect(result).toHaveLength(2);
expect(result[0].type).toBe('error');
expect((result[0] as AgentEvent<'error'>).status).toBe('INTERNAL');
expect(result[1].type).toBe('stream_end');
expect((result[1] as AgentEvent<'stream_end'>).reason).toBe('failed');
});
});
// -----------------------------------------------------------------------
// ContextWindowWillOverflow
// -----------------------------------------------------------------------
describe('ContextWindowWillOverflow', () => {
it('emits error with RESOURCE_EXHAUSTED', () => {
state.streamStartEmitted = true;
const event: ServerGeminiStreamEvent = {
type: GeminiEventType.ContextWindowWillOverflow,
value: {
estimatedRequestTokenCount: 100000,
remainingTokenCount: 50000,
},
};
const result = translateEvent(event, state);
expect(result).toHaveLength(1);
const err = result[0] as AgentEvent<'error'>;
expect(err.status).toBe('RESOURCE_EXHAUSTED');
expect(err.fatal).toBe(true);
expect(err.message).toContain('100000');
expect(err.message).toContain('50000');
});
});
// -----------------------------------------------------------------------
// AgentExecutionStopped
// -----------------------------------------------------------------------
describe('AgentExecutionStopped', () => {
it('emits message (if systemMessage) + stream_end completed', () => {
state.streamStartEmitted = true;
const event: ServerGeminiStreamEvent = {
type: GeminiEventType.AgentExecutionStopped,
value: {
reason: 'Hook stopped execution',
systemMessage: 'Agent was stopped by policy.',
},
};
const result = translateEvent(event, state);
expect(result).toHaveLength(2);
const msg = result[0] as AgentEvent<'message'>;
expect(msg.type).toBe('message');
expect(msg.content[0]).toEqual({
type: 'text',
text: 'Agent was stopped by policy.',
});
const end = result[1] as AgentEvent<'stream_end'>;
expect(end.reason).toBe('completed');
});
it('emits only stream_end when no systemMessage', () => {
state.streamStartEmitted = true;
const event: ServerGeminiStreamEvent = {
type: GeminiEventType.AgentExecutionStopped,
value: { reason: 'Done' },
};
const result = translateEvent(event, state);
expect(result).toHaveLength(1);
expect(result[0].type).toBe('stream_end');
});
});
// -----------------------------------------------------------------------
// AgentExecutionBlocked
// -----------------------------------------------------------------------
describe('AgentExecutionBlocked', () => {
it('emits error + stream_end with failed', () => {
state.streamStartEmitted = true;
const event: ServerGeminiStreamEvent = {
type: GeminiEventType.AgentExecutionBlocked,
value: { reason: 'Policy violation' },
};
const result = translateEvent(event, state);
expect(result).toHaveLength(2);
const err = result[0] as AgentEvent<'error'>;
expect(err.status).toBe('PERMISSION_DENIED');
expect(err.message).toBe('Policy violation');
const end = result[1] as AgentEvent<'stream_end'>;
expect(end.reason).toBe('failed');
});
});
// -----------------------------------------------------------------------
// InvalidStream
// -----------------------------------------------------------------------
describe('InvalidStream', () => {
it('emits error with INTERNAL', () => {
state.streamStartEmitted = true;
const event: ServerGeminiStreamEvent = {
type: GeminiEventType.InvalidStream,
};
const result = translateEvent(event, state);
expect(result).toHaveLength(1);
const err = result[0] as AgentEvent<'error'>;
expect(err.status).toBe('INTERNAL');
expect(err.fatal).toBe(true);
});
});
// -----------------------------------------------------------------------
// ChatCompressed, Retry — no output
// -----------------------------------------------------------------------
describe('ChatCompressed', () => {
it('emits nothing', () => {
const event: ServerGeminiStreamEvent = {
type: GeminiEventType.ChatCompressed,
value: null,
};
expect(translateEvent(event, state)).toEqual([]);
});
});
describe('Retry', () => {
it('emits nothing', () => {
const event: ServerGeminiStreamEvent = {
type: GeminiEventType.Retry,
};
expect(translateEvent(event, state)).toEqual([]);
});
});
// -----------------------------------------------------------------------
// ToolCallRequest
// -----------------------------------------------------------------------
describe('ToolCallRequest', () => {
it('emits tool_request with requestId, name, and args', () => {
state.streamStartEmitted = true;
const event: ServerGeminiStreamEvent = {
type: GeminiEventType.ToolCallRequest,
value: {
callId: 'call-1',
name: 'read_file',
args: { path: '/tmp/test.txt' },
isClientInitiated: false,
prompt_id: 'p1',
},
};
const result = translateEvent(event, state);
expect(result).toHaveLength(1);
const req = result[0] as AgentEvent<'tool_request'>;
expect(req.type).toBe('tool_request');
expect(req.requestId).toBe('call-1');
expect(req.name).toBe('read_file');
expect(req.args).toEqual({ path: '/tmp/test.txt' });
});
it('auto-emits stream_start if not yet emitted', () => {
const event: ServerGeminiStreamEvent = {
type: GeminiEventType.ToolCallRequest,
value: {
callId: 'call-1',
name: 'write_file',
args: { path: '/tmp/out.txt', content: 'hi' },
isClientInitiated: false,
prompt_id: 'p1',
},
};
const result = translateEvent(event, state);
expect(result).toHaveLength(2);
expect(result[0].type).toBe('stream_start');
expect(result[1].type).toBe('tool_request');
});
it('tracks tool name in pendingToolNames', () => {
state.streamStartEmitted = true;
translateEvent(
{
type: GeminiEventType.ToolCallRequest,
value: {
callId: 'call-42',
name: 'edit_file',
args: {},
isClientInitiated: false,
prompt_id: 'p1',
},
},
state,
);
expect(state.pendingToolNames.get('call-42')).toBe('edit_file');
});
});
// -----------------------------------------------------------------------
// ToolCallResponse
// -----------------------------------------------------------------------
describe('ToolCallResponse', () => {
it('emits tool_response with name resolved from pending request', () => {
state.streamStartEmitted = true;
// First, register the tool request
translateEvent(
{
type: GeminiEventType.ToolCallRequest,
value: {
callId: 'call-1',
name: 'read_file',
args: { path: '/tmp/test.txt' },
isClientInitiated: false,
prompt_id: 'p1',
},
},
state,
);
// Then, the response
const result = translateEvent(
{
type: GeminiEventType.ToolCallResponse,
value: {
callId: 'call-1',
responseParts: [{ text: 'file contents here' }],
resultDisplay: undefined,
error: undefined,
errorType: undefined,
},
},
state,
);
expect(result).toHaveLength(1);
const resp = result[0] as AgentEvent<'tool_response'>;
expect(resp.type).toBe('tool_response');
expect(resp.requestId).toBe('call-1');
expect(resp.name).toBe('read_file');
expect(resp.content).toEqual([
{ type: 'text', text: 'file contents here' },
]);
expect(resp.isError).toBe(false);
});
it('uses "unknown" name when no prior request exists', () => {
state.streamStartEmitted = true;
const result = translateEvent(
{
type: GeminiEventType.ToolCallResponse,
value: {
callId: 'orphan-call',
responseParts: [{ text: 'data' }],
resultDisplay: undefined,
error: undefined,
errorType: undefined,
},
},
state,
);
const resp = result[0] as AgentEvent<'tool_response'>;
expect(resp.name).toBe('unknown');
});
it('sets isError when error is present', () => {
state.streamStartEmitted = true;
state.pendingToolNames.set('call-err', 'dangerous_tool');
const result = translateEvent(
{
type: GeminiEventType.ToolCallResponse,
value: {
callId: 'call-err',
responseParts: [{ text: 'Error: permission denied' }],
resultDisplay: undefined,
error: new Error('permission denied'),
errorType: undefined,
},
},
state,
);
const resp = result[0] as AgentEvent<'tool_response'>;
expect(resp.isError).toBe(true);
expect(resp.name).toBe('dangerous_tool');
});
it('passes through data field when present', () => {
state.streamStartEmitted = true;
state.pendingToolNames.set('call-data', 'search');
const result = translateEvent(
{
type: GeminiEventType.ToolCallResponse,
value: {
callId: 'call-data',
responseParts: [{ text: 'results' }],
resultDisplay: undefined,
error: undefined,
errorType: undefined,
data: { resultCount: 5 },
},
},
state,
);
const resp = result[0] as AgentEvent<'tool_response'>;
expect(resp.data).toEqual({ resultCount: 5 });
});
it('handles multiple response parts including media', () => {
state.streamStartEmitted = true;
state.pendingToolNames.set('call-multi', 'screenshot');
const result = translateEvent(
{
type: GeminiEventType.ToolCallResponse,
value: {
callId: 'call-multi',
responseParts: [
{ text: 'Screenshot taken' },
{ inlineData: { mimeType: 'image/png', data: 'base64data' } },
],
resultDisplay: undefined,
error: undefined,
errorType: undefined,
},
},
state,
);
const resp = result[0] as AgentEvent<'tool_response'>;
expect(resp.content).toEqual([
{ type: 'text', text: 'Screenshot taken' },
{ type: 'media', data: 'base64data', mimeType: 'image/png' },
]);
});
it('cleans up pendingToolNames after response', () => {
state.streamStartEmitted = true;
state.pendingToolNames.set('call-cleanup', 'list_files');
translateEvent(
{
type: GeminiEventType.ToolCallResponse,
value: {
callId: 'call-cleanup',
responseParts: [],
resultDisplay: undefined,
error: undefined,
errorType: undefined,
},
},
state,
);
expect(state.pendingToolNames.has('call-cleanup')).toBe(false);
});
});
// -----------------------------------------------------------------------
// ToolCallConfirmation — skipped
// -----------------------------------------------------------------------
describe('ToolCallConfirmation', () => {
it('emits nothing', () => {
const event: ServerGeminiStreamEvent = {
type: GeminiEventType.ToolCallConfirmation,
value: {
request: {
callId: 'call-1',
name: 'delete_file',
args: {},
isClientInitiated: false,
prompt_id: 'p1',
},
details:
{} as unknown as import('../tools/tools.js').ToolCallConfirmationDetails,
},
};
expect(translateEvent(event, state)).toEqual([]);
});
});
// -----------------------------------------------------------------------
// Tool events in happy path sequence
// -----------------------------------------------------------------------
describe('tool call sequence', () => {
it('ModelInfo → Content → ToolCallRequest → ToolCallResponse → Finished', () => {
const events: ServerGeminiStreamEvent[] = [
{ type: GeminiEventType.ModelInfo, value: 'gemini-2.5-pro' },
{ type: GeminiEventType.Content, value: 'Let me read that file.' },
{
type: GeminiEventType.ToolCallRequest,
value: {
callId: 'call-1',
name: 'read_file',
args: { path: '/tmp/test.txt' },
isClientInitiated: false,
prompt_id: 'p1',
},
},
{
type: GeminiEventType.ToolCallResponse,
value: {
callId: 'call-1',
responseParts: [{ text: 'file contents' }],
resultDisplay: undefined,
error: undefined,
errorType: undefined,
},
},
{
type: GeminiEventType.Finished,
value: {
reason: FinishReason.STOP,
usageMetadata: { promptTokenCount: 20, candidatesTokenCount: 15 },
},
},
];
const allAgentEvents: AgentEvent[] = [];
for (const ev of events) {
allAgentEvents.push(...translateEvent(ev, state));
}
expect(allAgentEvents.map((e) => e.type)).toEqual([
'stream_start',
'message',
'tool_request',
'tool_response',
'usage',
'stream_end',
]);
// Verify tool_request details
const toolReq = allAgentEvents[2] as AgentEvent<'tool_request'>;
expect(toolReq.requestId).toBe('call-1');
expect(toolReq.name).toBe('read_file');
// Verify tool_response has resolved name
const toolResp = allAgentEvents[3] as AgentEvent<'tool_response'>;
expect(toolResp.requestId).toBe('call-1');
expect(toolResp.name).toBe('read_file');
expect(toolResp.isError).toBe(false);
});
});
// -----------------------------------------------------------------------
// Happy path sequence test
// -----------------------------------------------------------------------
describe('happy path sequence', () => {
it('ModelInfo → Content → Content → Finished produces correct trajectory', () => {
const events: ServerGeminiStreamEvent[] = [
{ type: GeminiEventType.ModelInfo, value: 'gemini-2.5-pro' },
{ type: GeminiEventType.Content, value: 'Hello ' },
{ type: GeminiEventType.Content, value: 'world!' },
{
type: GeminiEventType.Finished,
value: {
reason: FinishReason.STOP,
usageMetadata: {
promptTokenCount: 10,
candidatesTokenCount: 5,
},
},
},
];
const allAgentEvents: AgentEvent[] = [];
for (const ev of events) {
allAgentEvents.push(...translateEvent(ev, state));
}
expect(allAgentEvents.map((e) => e.type)).toEqual([
'stream_start',
'message',
'message',
'usage',
'stream_end',
]);
// Verify IDs are sequential
for (let i = 0; i < allAgentEvents.length; i++) {
expect(allAgentEvents[i].id).toBe(`test-stream-id-${i}`);
}
// Verify streamId is consistent
for (const ev of allAgentEvents) {
expect(ev.streamId).toBe('test-stream-id');
}
});
});
// -----------------------------------------------------------------------
// mapFinishReason — all values
// -----------------------------------------------------------------------
describe('mapFinishReason', () => {
const cases: Array<[string | undefined, string]> = [
[undefined, 'completed'],
['STOP', 'completed'],
['FINISH_REASON_UNSPECIFIED', 'completed'],
['MAX_TOKENS', 'max_budget'],
['SAFETY', 'refusal'],
['RECITATION', 'refusal'],
['LANGUAGE', 'refusal'],
['BLOCKLIST', 'refusal'],
['PROHIBITED_CONTENT', 'refusal'],
['SPII', 'refusal'],
['MALFORMED_FUNCTION_CALL', 'failed'],
['OTHER', 'failed'],
];
for (const [input, expected] of cases) {
it(`maps ${String(input)}${expected}`, () => {
expect(mapFinishReason(input as FinishReason | undefined)).toBe(
expected,
);
});
}
});
// -----------------------------------------------------------------------
// mapHttpToGrpcStatus
// -----------------------------------------------------------------------
describe('mapHttpToGrpcStatus', () => {
const cases: Array<[number | undefined, string]> = [
[undefined, 'INTERNAL'],
[400, 'INVALID_ARGUMENT'],
[401, 'UNAUTHENTICATED'],
[403, 'PERMISSION_DENIED'],
[404, 'NOT_FOUND'],
[409, 'ALREADY_EXISTS'],
[429, 'RESOURCE_EXHAUSTED'],
[500, 'INTERNAL'],
[501, 'UNIMPLEMENTED'],
[503, 'UNAVAILABLE'],
[504, 'DEADLINE_EXCEEDED'],
[418, 'INTERNAL'], // unmapped → INTERNAL
];
for (const [input, expected] of cases) {
it(`maps ${String(input)}${expected}`, () => {
expect(mapHttpToGrpcStatus(input)).toBe(expected);
});
}
});
// -----------------------------------------------------------------------
// mapError
// -----------------------------------------------------------------------
describe('mapError', () => {
it('maps StructuredError with status', () => {
const result = mapError({ message: 'Unauthorized', status: 401 });
expect(result.status).toBe('UNAUTHENTICATED');
expect(result.message).toBe('Unauthorized');
});
it('maps StructuredError without status', () => {
const result = mapError({ message: 'Unknown error' });
expect(result.status).toBe('INTERNAL');
});
it('maps Error instance', () => {
const result = mapError(new Error('boom'));
expect(result.status).toBe('INTERNAL');
expect(result.message).toBe('boom');
});
it('maps primitive value', () => {
const result = mapError('something went wrong');
expect(result.status).toBe('INTERNAL');
expect(result.message).toBe('something went wrong');
});
});
// -----------------------------------------------------------------------
// mapUsage
// -----------------------------------------------------------------------
describe('mapUsage', () => {
it('maps all fields', () => {
const result = mapUsage(
{
promptTokenCount: 100,
candidatesTokenCount: 50,
cachedContentTokenCount: 10,
},
'gemini-2.5-pro',
);
expect(result).toEqual({
model: 'gemini-2.5-pro',
inputTokens: 100,
outputTokens: 50,
cachedTokens: 10,
});
});
it('uses "unknown" when model is not provided', () => {
const result = mapUsage({});
expect(result.model).toBe('unknown');
});
});
// -----------------------------------------------------------------------
// Event ID uniqueness and timestamps
// -----------------------------------------------------------------------
describe('event metadata', () => {
it('generates unique sequential IDs', () => {
state.streamStartEmitted = true;
const r1 = translateEvent(
{ type: GeminiEventType.Content, value: 'a' },
state,
);
const r2 = translateEvent(
{ type: GeminiEventType.Content, value: 'b' },
state,
);
expect(r1[0].id).not.toBe(r2[0].id);
});
it('includes ISO 8601 timestamps', () => {
state.streamStartEmitted = true;
const result = translateEvent(
{ type: GeminiEventType.Content, value: 'test' },
state,
);
// Verify it's a valid date string
expect(new Date(result[0].timestamp).toISOString()).toBe(
result[0].timestamp,
);
});
});
});
+464
View File
@@ -0,0 +1,464 @@
/**
* @license
* Copyright 2026 Google LLC
* SPDX-License-Identifier: Apache-2.0
*/
/**
* @fileoverview Pure, stateless-per-call translation functions that convert
* ServerGeminiStreamEvent objects into AgentEvent objects.
*
* No side effects, no generators. Each call to `translateEvent` takes an event
* and mutable TranslationState, returning zero or more AgentEvents.
*/
import type { FinishReason, Part } from '@google/genai';
import { GeminiEventType } from '../core/turn.js';
import type {
ServerGeminiStreamEvent,
StructuredError,
GeminiFinishedEventValue,
} from '../core/turn.js';
import type {
AgentEvent,
ContentPart,
StreamEndReason,
ErrorData,
Usage,
} from './types.js';
// ---------------------------------------------------------------------------
// Translation State
// ---------------------------------------------------------------------------
export interface TranslationState {
streamId: string;
streamStartEmitted: boolean;
model: string | undefined;
eventCounter: number;
/** Tracks callId → tool name from requests so responses can reference the name. */
pendingToolNames: Map<string, string>;
}
export function createTranslationState(streamId?: string): TranslationState {
return {
streamId: streamId ?? crypto.randomUUID(),
streamStartEmitted: false,
model: undefined,
eventCounter: 0,
pendingToolNames: new Map(),
};
}
// ---------------------------------------------------------------------------
// Helpers
// ---------------------------------------------------------------------------
function makeEvent(
type: AgentEvent['type'],
state: TranslationState,
payload: Partial<AgentEvent>,
): AgentEvent {
const id = `${state.streamId}-${state.eventCounter++}`;
// eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion -- constructing AgentEvent from common fields + payload
return {
...payload,
id,
timestamp: new Date().toISOString(),
streamId: state.streamId,
type,
} as AgentEvent;
}
function ensureStreamStart(state: TranslationState, out: AgentEvent[]): void {
if (!state.streamStartEmitted) {
out.push(makeEvent('stream_start', state, { streamId: state.streamId }));
state.streamStartEmitted = true;
}
}
/**
* Converts @google/genai Part[] to ContentPart[].
* Text parts become text ContentParts; inline data becomes media ContentParts.
*/
function mapResponseParts(parts: Part[]): ContentPart[] {
const result: ContentPart[] = [];
for (const part of parts) {
if (part.text !== undefined) {
result.push({ type: 'text', text: part.text });
} else if (part.inlineData) {
result.push({
type: 'media',
data: part.inlineData.data,
mimeType: part.inlineData.mimeType,
});
}
}
return result;
}
// ---------------------------------------------------------------------------
// Core Translator
// ---------------------------------------------------------------------------
/**
* Translates a single ServerGeminiStreamEvent into zero or more AgentEvents.
* Mutates `state` (counter, flags) as a side effect.
*/
export function translateEvent(
event: ServerGeminiStreamEvent,
state: TranslationState,
): AgentEvent[] {
const out: AgentEvent[] = [];
switch (event.type) {
case GeminiEventType.ModelInfo:
state.model = event.value;
if (!state.streamStartEmitted) {
out.push(
makeEvent('stream_start', state, { streamId: state.streamId }),
);
state.streamStartEmitted = true;
} else {
out.push(makeEvent('session_update', state, { model: event.value }));
}
break;
case GeminiEventType.Content:
ensureStreamStart(state, out);
out.push(
makeEvent('message', state, {
role: 'agent',
content: [{ type: 'text', text: event.value }],
}),
);
break;
case GeminiEventType.Thought:
ensureStreamStart(state, out);
out.push(
makeEvent('message', state, {
role: 'agent',
content: [{ type: 'thought', thought: event.value.description }],
_meta: event.value.subject
? { source: 'agent', subject: event.value.subject }
: { source: 'agent' },
}),
);
break;
case GeminiEventType.Citation:
ensureStreamStart(state, out);
out.push(
makeEvent('message', state, {
role: 'agent',
content: [{ type: 'text', text: event.value }],
_meta: { source: 'agent', citation: true },
}),
);
break;
case GeminiEventType.Finished:
handleFinished(event.value, state, out);
break;
case GeminiEventType.Error:
handleError(event.value.error, state, out);
break;
case GeminiEventType.UserCancelled:
ensureStreamStart(state, out);
out.push(
makeEvent('stream_end', state, {
streamId: state.streamId,
reason: 'aborted',
}),
);
break;
case GeminiEventType.MaxSessionTurns:
ensureStreamStart(state, out);
out.push(
makeEvent('stream_end', state, {
streamId: state.streamId,
reason: 'max_turns',
}),
);
break;
case GeminiEventType.LoopDetected:
ensureStreamStart(state, out);
out.push(
makeEvent('error', state, {
status: 'INTERNAL',
message: 'Loop detected',
fatal: true,
}),
);
out.push(
makeEvent('stream_end', state, {
streamId: state.streamId,
reason: 'failed',
}),
);
break;
case GeminiEventType.ContextWindowWillOverflow:
ensureStreamStart(state, out);
out.push(
makeEvent('error', state, {
status: 'RESOURCE_EXHAUSTED',
message: `Context window will overflow (estimated: ${event.value.estimatedRequestTokenCount}, remaining: ${event.value.remainingTokenCount})`,
fatal: true,
}),
);
break;
case GeminiEventType.AgentExecutionStopped:
ensureStreamStart(state, out);
if (event.value.systemMessage) {
out.push(
makeEvent('message', state, {
role: 'agent',
content: [{ type: 'text', text: event.value.systemMessage }],
}),
);
}
out.push(
makeEvent('stream_end', state, {
streamId: state.streamId,
reason: 'completed',
}),
);
break;
case GeminiEventType.AgentExecutionBlocked:
ensureStreamStart(state, out);
out.push(
makeEvent('error', state, {
status: 'PERMISSION_DENIED',
message: event.value.reason,
fatal: false,
}),
);
out.push(
makeEvent('stream_end', state, {
streamId: state.streamId,
reason: 'failed',
}),
);
break;
case GeminiEventType.InvalidStream:
ensureStreamStart(state, out);
out.push(
makeEvent('error', state, {
status: 'INTERNAL',
message: 'Invalid stream received from model',
fatal: true,
}),
);
break;
// Internal concerns — no AgentEvent emitted
case GeminiEventType.ChatCompressed:
case GeminiEventType.Retry:
break;
case GeminiEventType.ToolCallRequest:
ensureStreamStart(state, out);
state.pendingToolNames.set(event.value.callId, event.value.name);
out.push(
makeEvent('tool_request', state, {
requestId: event.value.callId,
name: event.value.name,
args: event.value.args,
}),
);
break;
case GeminiEventType.ToolCallResponse:
ensureStreamStart(state, out);
out.push(
makeEvent('tool_response', state, {
requestId: event.value.callId,
name: state.pendingToolNames.get(event.value.callId) ?? 'unknown',
content: mapResponseParts(event.value.responseParts),
isError: event.value.error !== undefined,
...(event.value.data ? { data: event.value.data } : {}),
}),
);
state.pendingToolNames.delete(event.value.callId);
break;
case GeminiEventType.ToolCallConfirmation:
// Skip — elicitations not needed for non-interactive mode
break;
default:
break;
}
return out;
}
// ---------------------------------------------------------------------------
// Finished Event Handling
// ---------------------------------------------------------------------------
function handleFinished(
value: GeminiFinishedEventValue,
state: TranslationState,
out: AgentEvent[],
): void {
ensureStreamStart(state, out);
if (value.usageMetadata) {
const usage = mapUsage(value.usageMetadata, state.model);
out.push(makeEvent('usage', state, usage));
}
out.push(
makeEvent('stream_end', state, {
streamId: state.streamId,
reason: mapFinishReason(value.reason),
}),
);
}
// ---------------------------------------------------------------------------
// Error Handling
// ---------------------------------------------------------------------------
function handleError(
error: unknown,
state: TranslationState,
out: AgentEvent[],
): void {
ensureStreamStart(state, out);
const mapped = mapError(error);
out.push(makeEvent('error', state, mapped));
}
// ---------------------------------------------------------------------------
// Public Mapping Functions
// ---------------------------------------------------------------------------
/**
* Maps a Gemini FinishReason to a StreamEndReason.
*/
export function mapFinishReason(
reason: FinishReason | undefined,
): StreamEndReason {
if (!reason) return 'completed';
switch (reason) {
case 'STOP':
case 'FINISH_REASON_UNSPECIFIED':
return 'completed';
case 'MAX_TOKENS':
return 'max_budget';
case 'SAFETY':
case 'RECITATION':
case 'LANGUAGE':
case 'BLOCKLIST':
case 'PROHIBITED_CONTENT':
case 'SPII':
return 'refusal';
case 'MALFORMED_FUNCTION_CALL':
case 'OTHER':
return 'failed';
default:
return 'failed';
}
}
/**
* Maps an HTTP status code to a gRPC-style status string.
*/
export function mapHttpToGrpcStatus(
httpStatus: number | undefined,
): ErrorData['status'] {
if (httpStatus === undefined) return 'INTERNAL';
switch (httpStatus) {
case 400:
return 'INVALID_ARGUMENT';
case 401:
return 'UNAUTHENTICATED';
case 403:
return 'PERMISSION_DENIED';
case 404:
return 'NOT_FOUND';
case 409:
return 'ALREADY_EXISTS';
case 429:
return 'RESOURCE_EXHAUSTED';
case 500:
return 'INTERNAL';
case 501:
return 'UNIMPLEMENTED';
case 503:
return 'UNAVAILABLE';
case 504:
return 'DEADLINE_EXCEEDED';
default:
return 'INTERNAL';
}
}
/**
* Maps a StructuredError (or unknown error value) to an ErrorData payload.
*/
export function mapError(error: unknown): ErrorData {
if (isStructuredError(error)) {
return {
status: mapHttpToGrpcStatus(error.status),
message: error.message,
fatal: true,
};
}
if (error instanceof Error) {
return {
status: 'INTERNAL',
message: error.message,
fatal: true,
};
}
return {
status: 'INTERNAL',
message: String(error),
fatal: true,
};
}
function isStructuredError(error: unknown): error is StructuredError {
return (
typeof error === 'object' &&
error !== null &&
'message' in error &&
typeof error.message === 'string'
);
}
/**
* Maps Gemini usageMetadata to Usage.
*/
export function mapUsage(
metadata: {
promptTokenCount?: number;
candidatesTokenCount?: number;
cachedContentTokenCount?: number;
},
model?: string,
): Usage {
return {
model: model ?? 'unknown',
inputTokens: metadata.promptTokenCount,
outputTokens: metadata.candidatesTokenCount,
cachedTokens: metadata.cachedContentTokenCount,
};
}
File diff suppressed because it is too large Load Diff
@@ -0,0 +1,445 @@
/**
* @license
* Copyright 2026 Google LLC
* SPDX-License-Identifier: Apache-2.0
*/
/**
* @fileoverview LegacyAgentSession owns the agentic loop (send + tool
* scheduling + multi-turn), translating all events to AgentEvents.
*/
import type { Part } from '@google/genai';
import { GeminiEventType } from '../core/turn.js';
import type { GeminiClient } from '../core/client.js';
import type { Scheduler } from '../scheduler/scheduler.js';
import type { Config } from '../config/config.js';
import type { ToolCallRequestInfo } from '../scheduler/types.js';
import { ToolErrorType } from '../tools/tool-error.js';
import {
translateEvent,
createTranslationState,
type TranslationState,
} from './event-translator.js';
import type {
AgentEvent,
AgentSession,
AgentSend,
ContentPart,
} from './types.js';
export interface LegacySessionDeps {
client: GeminiClient;
scheduler: Scheduler;
config: Config;
promptId: string;
streamId?: string;
}
// ---------------------------------------------------------------------------
// LegacyAgentSession
// ---------------------------------------------------------------------------
export class LegacyAgentSession implements AgentSession {
private _events: AgentEvent[] = [];
private _translationState: TranslationState;
private _subscribers: Set<() => void> = new Set();
private _streamDone: boolean = false;
private _abortController: AbortController = new AbortController();
private readonly _client: GeminiClient;
private readonly _scheduler: Scheduler;
private readonly _config: Config;
private readonly _promptId: string;
constructor(deps: LegacySessionDeps) {
this._translationState = createTranslationState(deps.streamId);
this._client = deps.client;
this._scheduler = deps.scheduler;
this._config = deps.config;
this._promptId = deps.promptId;
}
// ---------------------------------------------------------------------------
// AgentSession interface — send() owns the agentic loop
// ---------------------------------------------------------------------------
async send(payload: AgentSend): Promise<{ streamId: string }> {
// AgentSend is a union — narrow to MessageSend to access .message
const message = 'message' in payload ? payload.message : undefined;
if (!message) {
throw new Error('LegacyAgentSession.send() only supports message sends.');
}
const parts = contentPartsToGeminiParts(message);
// Start the loop in the background — don't await
this._runLoop(parts).catch((err) => {
this.emitErrorAndStreamEnd(err);
});
return { streamId: this._translationState.streamId };
}
/**
* Returns an async iterator that replays existing events, then live-follows
* new events as they arrive.
*/
async *stream(options?: {
streamId?: string;
eventId?: string;
}): AsyncIterableIterator<AgentEvent> {
let startIndex = 0;
if (options?.eventId) {
const idx = this._events.findIndex((e) => e.id === options.eventId);
if (idx !== -1) {
startIndex = idx + 1;
}
}
// Replay existing events
for (let i = startIndex; i < this._events.length; i++) {
const event = this._events[i];
if (event) yield event;
}
if (this._streamDone) return;
// Live-follow new events
let replayedUpTo = this._events.length;
while (!this._streamDone) {
await new Promise<void>((resolve) => {
if (this._events.length > replayedUpTo || this._streamDone) {
resolve();
return;
}
const handler = (): void => {
this._subscribers.delete(handler);
resolve();
};
this._subscribers.add(handler);
});
while (replayedUpTo < this._events.length) {
const event = this._events[replayedUpTo];
if (event) yield event;
replayedUpTo++;
}
}
}
async abort(): Promise<void> {
this._abortController.abort();
}
get events(): AgentEvent[] {
return this._events;
}
// ---------------------------------------------------------------------------
// Core: agentic loop
// ---------------------------------------------------------------------------
private async _runLoop(initialParts: Part[]): Promise<void> {
let currentParts: Part[] = initialParts;
let turnCount = 0;
const maxTurns = this._config.getMaxSessionTurns();
try {
while (true) {
turnCount++;
if (maxTurns >= 0 && turnCount > maxTurns) {
this.ensureStreamStart();
this.appendAndNotify([
this.makeInternalEvent('stream_end', {
streamId: this._translationState.streamId,
reason: 'max_turns',
}),
]);
this._streamDone = true;
return;
}
const toolCallRequests: ToolCallRequestInfo[] = [];
const responseStream = this._client.sendMessageStream(
currentParts,
this._abortController.signal,
this._promptId,
);
// Process the stream — translate events and collect tool requests
for await (const event of responseStream) {
if (this._abortController.signal.aborted) {
this.ensureStreamStart();
this.appendAndNotify([
this.makeInternalEvent('stream_end', {
streamId: this._translationState.streamId,
reason: 'aborted',
}),
]);
this._streamDone = true;
return;
}
// Collect tool call requests BEFORE translating so we can
// decide whether to suppress the Finished event's stream_end.
if (event.type === GeminiEventType.ToolCallRequest) {
toolCallRequests.push(event.value);
}
// Translate to AgentEvents
const agentEvents = translateEvent(event, this._translationState);
// Finished events don't mean the session is done — if there are
// pending tool calls, more turns are coming. Suppress stream_end
// from the Finished event in that case (keep usage events).
if (
event.type === GeminiEventType.Finished &&
toolCallRequests.length > 0
) {
const filtered = agentEvents.filter((e) => e.type !== 'stream_end');
this.appendAndNotify(filtered);
} else {
this.appendAndNotify(agentEvents);
}
// Error events → abort the loop (translator already emitted error AgentEvent)
if (event.type === GeminiEventType.Error) {
this.ensureStreamEnd();
this._streamDone = true;
return;
}
// Terminal events — translator already emitted stream_end
if (
event.type === GeminiEventType.AgentExecutionStopped ||
event.type === GeminiEventType.LoopDetected ||
event.type === GeminiEventType.AgentExecutionBlocked
) {
this._streamDone = true;
return;
}
}
if (toolCallRequests.length === 0) {
// No tool calls — done. Ensure stream_end.
this.ensureStreamEnd();
this._streamDone = true;
return;
}
// Schedule tool calls
const completedToolCalls = await this._scheduler.schedule(
toolCallRequests,
this._abortController.signal,
);
// Emit tool_response AgentEvents for each completed tool call
const toolResponseParts: Part[] = [];
for (const tc of completedToolCalls) {
const response = tc.response;
const request = tc.request;
this.appendAndNotify([
this.makeInternalEvent('tool_response', {
requestId: request.callId,
name: request.name,
content: mapCompletedToolResponseParts(response.responseParts),
isError: response.error !== undefined,
...(response.resultDisplay !== undefined
? {
displayContent: [
{
type: 'text',
text:
typeof response.resultDisplay === 'string'
? response.resultDisplay
: JSON.stringify(response.resultDisplay),
},
],
}
: {}),
...(response.data ? { data: response.data } : {}),
}),
]);
if (response.responseParts) {
toolResponseParts.push(...response.responseParts);
}
}
// Record tool calls in chat history
try {
const currentModel =
this._client.getCurrentSequenceModel() ?? this._config.getModel();
this._client
.getChat()
.recordCompletedToolCalls(currentModel, completedToolCalls);
} catch {
// Recording failures shouldn't break the loop
}
// Check if a tool requested stop execution
const stopTool = completedToolCalls.find(
(tc) => tc.response.errorType === ToolErrorType.STOP_EXECUTION,
);
if (stopTool) {
this.ensureStreamEnd();
this._streamDone = true;
return;
}
// Check for fatal tool errors (e.g. NO_SPACE_LEFT)
const fatalTool = completedToolCalls.find(
(tc) => tc.response.errorType === ToolErrorType.NO_SPACE_LEFT,
);
if (fatalTool) {
const msg = fatalTool.response.error?.message ?? 'Fatal tool error';
this.appendAndNotify([
this.makeInternalEvent('error', {
status: 'INTERNAL',
message: `Fatal tool error (${fatalTool.request.name}): ${msg}`,
fatal: true,
}),
]);
this.ensureStreamEnd();
this._streamDone = true;
return;
}
// Feed tool results back for next turn
currentParts = toolResponseParts;
}
} catch (err) {
this.emitErrorAndStreamEnd(err);
this._streamDone = true;
}
}
// ---------------------------------------------------------------------------
// Internal helpers
// ---------------------------------------------------------------------------
private appendAndNotify(events: AgentEvent[]): void {
for (const event of events) {
this._events.push(event);
}
if (events.length > 0) {
this.notifySubscribers();
}
}
private notifySubscribers(): void {
for (const handler of this._subscribers) {
handler();
}
}
private ensureStreamStart(): void {
if (!this._translationState.streamStartEmitted) {
const startEvent = this.makeInternalEvent('stream_start', {
streamId: this._translationState.streamId,
});
this._events.push(startEvent);
this._translationState.streamStartEmitted = true;
this.notifySubscribers();
}
}
private ensureStreamEnd(): void {
const hasStreamEnd = this._events.some((e) => e.type === 'stream_end');
if (!hasStreamEnd && this._translationState.streamStartEmitted) {
const endEvent = this.makeInternalEvent('stream_end', {
streamId: this._translationState.streamId,
reason: 'completed',
});
this._events.push(endEvent);
this.notifySubscribers();
}
}
private emitErrorAndStreamEnd(err: unknown): void {
const message = err instanceof Error ? err.message : String(err);
this.ensureStreamStart();
const errorEvent = this.makeInternalEvent('error', {
status: 'INTERNAL' as const,
message,
fatal: true,
});
this._events.push(errorEvent);
const hasStreamEnd = this._events.some((e) => e.type === 'stream_end');
if (!hasStreamEnd) {
const endEvent = this.makeInternalEvent('stream_end', {
streamId: this._translationState.streamId,
reason: 'failed',
});
this._events.push(endEvent);
}
this.notifySubscribers();
}
private makeInternalEvent(
type: AgentEvent['type'],
payload: Partial<AgentEvent>,
): AgentEvent {
const id = `${this._translationState.streamId}-${this._translationState.eventCounter++}`;
// eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion -- constructing AgentEvent from common fields + payload
return {
...payload,
id,
timestamp: new Date().toISOString(),
streamId: this._translationState.streamId,
type,
} as AgentEvent;
}
}
// ---------------------------------------------------------------------------
// Conversion helpers
// ---------------------------------------------------------------------------
/** Convert AgentEvent ContentPart[] → @google/genai Part[] */
function contentPartsToGeminiParts(parts: ContentPart[]): Part[] {
return parts.map((cp) => {
switch (cp.type) {
case 'text':
return { text: cp.text };
case 'thought':
return { text: cp.thought };
case 'media':
return {
inlineData: {
data: cp.data ?? '',
mimeType: cp.mimeType ?? 'application/octet-stream',
},
};
case 'reference':
return { text: cp.text };
default:
return { text: JSON.stringify(cp) };
}
});
}
/** Convert @google/genai Part[] → AgentEvent ContentPart[] */
function mapCompletedToolResponseParts(parts: Part[]): ContentPart[] {
const result: ContentPart[] = [];
for (const part of parts) {
if (part.text !== undefined) {
result.push({ type: 'text', text: part.text });
} else if (part.inlineData) {
result.push({
type: 'media',
data: part.inlineData.data,
mimeType: part.inlineData.mimeType,
});
}
}
return result;
}
+9 -2
View File
@@ -79,9 +79,16 @@ export type AgentEventData<
EventType extends keyof AgentEvents = keyof AgentEvents,
> = AgentEvents[EventType] & { type: EventType };
/**
* Mapped type that produces a proper discriminated union when `EventType` is
* the default (all keys), enabling `switch (event.type)` narrowing.
* When a specific EventType is provided, resolves to a single variant.
*/
export type AgentEvent<
EventType extends keyof AgentEvents = keyof AgentEvents,
> = AgentEventCommon & AgentEventData<EventType>;
> = {
[K in EventType]: AgentEventCommon & AgentEvents[K] & { type: K };
}[EventType];
export interface AgentEvents {
/** MUST be the first event emitted in a session. */
@@ -261,7 +268,7 @@ export interface StreamStart {
streamId: string;
}
type StreamEndReason =
export type StreamEndReason =
| 'completed'
| 'failed'
| 'aborted'
+32
View File
@@ -166,6 +166,38 @@ export * from './agents/agentLoader.js';
export * from './agents/local-executor.js';
export * from './agents/agent-scheduler.js';
// Export agent session interface
export {
LegacyAgentSession,
type LegacySessionDeps,
} from './agent/legacy-agent-session.js';
export {
translateEvent,
createTranslationState,
mapFinishReason,
mapHttpToGrpcStatus,
mapError,
mapUsage,
} from './agent/event-translator.js';
export type { TranslationState } from './agent/event-translator.js';
// Agent event types — namespaced to avoid collisions with existing exports
export type {
AgentEvent,
AgentEventCommon,
AgentEventData,
AgentEvents as AgentEventMap,
AgentSend,
AgentSession,
ContentPart,
ErrorData,
StreamEnd,
StreamEndReason,
StreamStart,
Trajectory,
Usage as AgentUsage,
WithMeta,
} from './agent/types.js';
// Export specific tool logic
export * from './tools/read-file.js';
export * from './tools/ls.js';