feat(core): add LegacyAgentSession and migrate non-interactive CLI

Squashed commit of agent-session/non-interactive branch, including newest update.
This commit is contained in:
Adam Weidman
2026-03-18 14:36:37 -04:00
parent a5a461c234
commit 9df044c836
10 changed files with 3194 additions and 236 deletions
+51 -35
View File
@@ -58,6 +58,12 @@ const mockSchedulerSchedule = vi.hoisted(() => vi.fn());
vi.mock('@google/gemini-cli-core', async (importOriginal) => {
const original =
await importOriginal<typeof import('@google/gemini-cli-core')>();
const { LegacyAgentSession } = await import(
'../../core/src/agent/legacy-agent-session.js'
);
const { geminiPartsToContentParts } = await import(
'../../core/src/agent/content-utils.js'
);
class MockChatRecordingService {
initialize = vi.fn();
@@ -77,6 +83,8 @@ vi.mock('@google/gemini-cli-core', async (importOriginal) => {
uiTelemetryService: {
getMetrics: vi.fn(),
},
LegacyAgentSession,
geminiPartsToContentParts,
coreEvents: mockCoreEvents,
createWorkingStdio: vi.fn(() => ({
stdout: process.stdout,
@@ -108,6 +116,8 @@ describe('runNonInteractive', () => {
sendMessageStream: Mock;
resumeChat: Mock;
getChatRecordingService: Mock;
getChat: Mock;
getCurrentSequenceModel: Mock;
};
const MOCK_SESSION_METRICS: SessionMetrics = {
models: {},
@@ -163,6 +173,8 @@ describe('runNonInteractive', () => {
recordMessageTokens: vi.fn(),
recordToolCalls: vi.fn(),
})),
getChat: vi.fn(() => ({ recordCompletedToolCalls: vi.fn() })),
getCurrentSequenceModel: vi.fn().mockReturnValue(null),
};
mockConfig = {
@@ -259,9 +271,6 @@ describe('runNonInteractive', () => {
[{ text: 'Test input' }],
expect.any(AbortSignal),
'prompt-id-1',
undefined,
false,
'Test input',
);
expect(getWrittenOutput()).toBe('Hello World\n');
// Note: Telemetry shutdown is now handled in runExitCleanup() in cleanup.ts
@@ -378,9 +387,6 @@ describe('runNonInteractive', () => {
[{ text: 'Tool response' }],
expect.any(AbortSignal),
'prompt-id-2',
undefined,
false,
undefined,
);
expect(getWrittenOutput()).toBe('Final answer\n');
});
@@ -538,9 +544,6 @@ describe('runNonInteractive', () => {
],
expect.any(AbortSignal),
'prompt-id-3',
undefined,
false,
undefined,
);
expect(getWrittenOutput()).toBe('Sorry, let me try again.\n');
});
@@ -558,7 +561,7 @@ describe('runNonInteractive', () => {
input: 'Initial fail',
prompt_id: 'prompt-id-4',
}),
).rejects.toThrow(apiError);
).rejects.toThrow('API connection failed');
});
it('should not exit if a tool is not found, and should send error back to model', async () => {
@@ -680,9 +683,6 @@ describe('runNonInteractive', () => {
processedParts,
expect.any(AbortSignal),
'prompt-id-7',
undefined,
false,
rawInput,
);
// 6. Assert the final output is correct
@@ -716,9 +716,6 @@ describe('runNonInteractive', () => {
[{ text: 'Test input' }],
expect.any(AbortSignal),
'prompt-id-1',
undefined,
false,
'Test input',
);
expect(processStdoutSpy).toHaveBeenCalledWith(
JSON.stringify(
@@ -849,9 +846,6 @@ describe('runNonInteractive', () => {
[{ text: 'Empty response test' }],
expect.any(AbortSignal),
'prompt-id-empty',
undefined,
false,
'Empty response test',
);
// This should output JSON with empty response but include stats
@@ -941,7 +935,7 @@ describe('runNonInteractive', () => {
{
session_id: 'test-session-id',
error: {
type: 'FatalInputError',
type: 'Error',
message: 'Invalid command syntax provided',
code: 42,
},
@@ -986,9 +980,6 @@ describe('runNonInteractive', () => {
[{ text: 'Prompt from command' }],
expect.any(AbortSignal),
'prompt-id-slash',
undefined,
false,
'/testcommand',
);
expect(getWrittenOutput()).toBe('Response from command\n');
@@ -1032,9 +1023,6 @@ describe('runNonInteractive', () => {
[{ text: 'Slash command output' }],
expect.any(AbortSignal),
'prompt-id-slash',
undefined,
false,
'/help',
);
expect(getWrittenOutput()).toBe('Response to slash command\n');
handleSlashCommandSpy.mockRestore();
@@ -1209,9 +1197,6 @@ describe('runNonInteractive', () => {
[{ text: '/unknowncommand' }],
expect.any(AbortSignal),
'prompt-id-unknown',
undefined,
false,
'/unknowncommand',
);
expect(getWrittenOutput()).toBe('Response to unknown\n');
@@ -1776,15 +1761,13 @@ describe('runNonInteractive', () => {
throw new Error('Recording failed');
}),
};
// @ts-expect-error - Mocking internal structure
mockGeminiClient.getChat = vi.fn().mockReturnValue(mockChat);
// @ts-expect-error - Mocking internal structure
mockGeminiClient.getCurrentSequenceModel = vi
.fn()
.mockReturnValue('model-1');
// Mock debugLogger.error
const { debugLogger } = await import('@google/gemini-cli-core');
const { debugLogger } = await import('../../core/src/utils/debugLogger.js');
const debugLoggerErrorSpy = vi
.spyOn(debugLogger, 'error')
.mockImplementation(() => {});
@@ -1999,7 +1982,6 @@ describe('runNonInteractive', () => {
expect(processStderrSpy).toHaveBeenCalledWith(
'Agent execution stopped: Stopped by hook\n',
);
// Should exit without calling sendMessageStream again
expect(mockGeminiClient.sendMessageStream).toHaveBeenCalledTimes(1);
});
@@ -2030,9 +2012,9 @@ describe('runNonInteractive', () => {
expect(processStderrSpy).toHaveBeenCalledWith(
'[WARNING] Agent execution blocked: Blocked by hook\n',
);
// sendMessageStream is called once, recursion is internal to it and transparent to the caller
expect(mockGeminiClient.sendMessageStream).toHaveBeenCalledTimes(1);
// Stream continues after blocked event — content should be output
expect(getWrittenOutput()).toBe('Final answer\n');
expect(mockGeminiClient.sendMessageStream).toHaveBeenCalledTimes(1);
});
});
@@ -2173,6 +2155,40 @@ describe('runNonInteractive', () => {
);
});
it('should emit warning event for loop_detected custom event in streaming JSON mode', async () => {
vi.mocked(mockConfig.getOutputFormat).mockReturnValue(
OutputFormat.STREAM_JSON,
);
vi.mocked(uiTelemetryService.getMetrics).mockReturnValue(
MOCK_SESSION_METRICS,
);
const streamEvents: ServerGeminiStreamEvent[] = [
{ type: GeminiEventType.LoopDetected } as ServerGeminiStreamEvent,
{ type: GeminiEventType.Content, value: 'Continuing after loop' },
{
type: GeminiEventType.Finished,
value: { reason: undefined, usageMetadata: { totalTokenCount: 5 } },
},
];
mockGeminiClient.sendMessageStream.mockReturnValue(
createStreamFromEvents(streamEvents),
);
await runNonInteractive({
config: mockConfig,
settings: mockSettings,
input: 'Loop test explicit',
prompt_id: 'prompt-id-loop-explicit',
});
const output = getWrittenOutput();
// The STREAM_JSON output should contain an error event with warning severity
expect(output).toContain('"type":"error"');
expect(output).toContain('"severity":"warning"');
expect(output).toContain('Loop detected');
});
it('should report cancelled tool calls as success in stream-json mode (legacy parity)', async () => {
const toolCallEvent: ServerGeminiStreamEvent = {
type: GeminiEventType.ToolCallRequest,
+219 -222
View File
@@ -6,15 +6,15 @@
import type {
Config,
ToolCallRequestInfo,
ResumedSessionData,
UserFeedbackPayload,
AgentEvent,
ContentPart,
} from '@google/gemini-cli-core';
import { isSlashCommand } from './ui/utils/commandUtils.js';
import type { LoadedSettings } from './config/settings.js';
import {
convertSessionToClientHistory,
GeminiEventType,
FatalInputError,
promptIdContext,
OutputFormat,
@@ -22,17 +22,17 @@ import {
StreamJsonFormatter,
JsonStreamEventType,
uiTelemetryService,
debugLogger,
coreEvents,
CoreEvent,
createWorkingStdio,
recordToolCallInteractions,
ToolErrorType,
Scheduler,
ROOT_SCHEDULER_ID,
LegacyAgentSession,
ToolErrorType,
geminiPartsToContentParts,
} from '@google/gemini-cli-core';
import type { Content, Part } from '@google/genai';
import type { Part } from '@google/genai';
import readline from 'node:readline';
import stripAnsi from 'strip-ansi';
@@ -150,8 +150,6 @@ export async function runNonInteractive({
}, 200);
abortController.abort();
// Note: Don't exit here - let the abort flow through the system
// and trigger handleCancellationError() which will exit with proper code
}
};
@@ -246,9 +244,6 @@ export async function runNonInteractive({
config,
settings,
);
// If a slash command is found and returns a prompt, use it.
// Otherwise, slashCommandResult falls through to the default prompt
// handling.
if (slashCommandResult) {
// eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion
query = slashCommandResult as Part[];
@@ -266,8 +261,6 @@ export async function runNonInteractive({
escapePastedAtSymbols: false,
});
if (error || !processedQuery) {
// An error occurred during @include processing (e.g., file not found).
// The error message is already logged by handleAtCommand.
throw new FatalInputError(
error || 'Exiting due to an error processing the @ command.',
);
@@ -286,216 +279,30 @@ export async function runNonInteractive({
});
}
let currentMessages: Content[] = [{ role: 'user', parts: query }];
let turnCount = 0;
while (true) {
turnCount++;
if (
config.getMaxSessionTurns() >= 0 &&
turnCount > config.getMaxSessionTurns()
) {
handleMaxTurnsExceededError(config);
}
const toolCallRequests: ToolCallRequestInfo[] = [];
const responseStream = geminiClient.sendMessageStream(
currentMessages[0]?.parts || [],
abortController.signal,
prompt_id,
undefined,
false,
turnCount === 1 ? input : undefined,
);
let responseText = '';
for await (const event of responseStream) {
if (abortController.signal.aborted) {
handleCancellationError(config);
}
if (event.type === GeminiEventType.Content) {
const isRaw =
config.getRawOutput() || config.getAcceptRawOutputRisk();
const output = isRaw ? event.value : stripAnsi(event.value);
if (streamFormatter) {
streamFormatter.emitEvent({
type: JsonStreamEventType.MESSAGE,
timestamp: new Date().toISOString(),
role: 'assistant',
content: output,
delta: true,
});
} else if (config.getOutputFormat() === OutputFormat.JSON) {
responseText += output;
} else {
if (event.value) {
textOutput.write(output);
}
}
} else if (event.type === GeminiEventType.ToolCallRequest) {
if (streamFormatter) {
streamFormatter.emitEvent({
type: JsonStreamEventType.TOOL_USE,
timestamp: new Date().toISOString(),
tool_name: event.value.name,
tool_id: event.value.callId,
parameters: event.value.args,
});
}
toolCallRequests.push(event.value);
} else if (event.type === GeminiEventType.LoopDetected) {
if (streamFormatter) {
streamFormatter.emitEvent({
type: JsonStreamEventType.ERROR,
timestamp: new Date().toISOString(),
severity: 'warning',
message: 'Loop detected, stopping execution',
});
}
} else if (event.type === GeminiEventType.MaxSessionTurns) {
if (streamFormatter) {
streamFormatter.emitEvent({
type: JsonStreamEventType.ERROR,
timestamp: new Date().toISOString(),
severity: 'error',
message: 'Maximum session turns exceeded',
});
}
} else if (event.type === GeminiEventType.Error) {
throw event.value.error;
} else if (event.type === GeminiEventType.AgentExecutionStopped) {
const stopMessage = `Agent execution stopped: ${event.value.systemMessage?.trim() || event.value.reason}`;
if (config.getOutputFormat() === OutputFormat.TEXT) {
process.stderr.write(`${stopMessage}\n`);
}
// Emit final result event for streaming JSON if needed
if (streamFormatter) {
const metrics = uiTelemetryService.getMetrics();
const durationMs = Date.now() - startTime;
streamFormatter.emitEvent({
type: JsonStreamEventType.RESULT,
timestamp: new Date().toISOString(),
status: 'success',
stats: streamFormatter.convertToStreamStats(
metrics,
durationMs,
),
});
}
return;
} else if (event.type === GeminiEventType.AgentExecutionBlocked) {
const blockMessage = `Agent execution blocked: ${event.value.systemMessage?.trim() || event.value.reason}`;
if (config.getOutputFormat() === OutputFormat.TEXT) {
process.stderr.write(`[WARNING] ${blockMessage}\n`);
}
}
}
if (toolCallRequests.length > 0) {
textOutput.ensureTrailingNewline();
const completedToolCalls = await scheduler.schedule(
toolCallRequests,
abortController.signal,
);
const toolResponseParts: Part[] = [];
for (const completedToolCall of completedToolCalls) {
const toolResponse = completedToolCall.response;
const requestInfo = completedToolCall.request;
if (streamFormatter) {
streamFormatter.emitEvent({
type: JsonStreamEventType.TOOL_RESULT,
timestamp: new Date().toISOString(),
tool_id: requestInfo.callId,
status:
completedToolCall.status === 'error' ? 'error' : 'success',
output:
typeof toolResponse.resultDisplay === 'string'
? toolResponse.resultDisplay
: undefined,
error: toolResponse.error
? {
type: toolResponse.errorType || 'TOOL_EXECUTION_ERROR',
message: toolResponse.error.message,
}
: undefined,
});
}
if (toolResponse.error) {
handleToolError(
requestInfo.name,
toolResponse.error,
// Create LegacyAgentSession — owns the agentic loop
const session = new LegacyAgentSession({
client: geminiClient,
scheduler,
config,
toolResponse.errorType || 'TOOL_EXECUTION_ERROR',
typeof toolResponse.resultDisplay === 'string'
? toolResponse.resultDisplay
: undefined,
);
}
if (toolResponse.responseParts) {
toolResponseParts.push(...toolResponse.responseParts);
}
}
// Record tool calls with full metadata before sending responses to Gemini
try {
const currentModel =
geminiClient.getCurrentSequenceModel() ?? config.getModel();
geminiClient
.getChat()
.recordCompletedToolCalls(currentModel, completedToolCalls);
await recordToolCallInteractions(config, completedToolCalls);
} catch (error) {
debugLogger.error(
`Error recording completed tool call information: ${error}`,
);
}
// Check if any tool requested to stop execution immediately
const stopExecutionTool = completedToolCalls.find(
(tc) => tc.response.errorType === ToolErrorType.STOP_EXECUTION,
);
if (stopExecutionTool && stopExecutionTool.response.error) {
const stopMessage = `Agent execution stopped: ${stopExecutionTool.response.error.message}`;
if (config.getOutputFormat() === OutputFormat.TEXT) {
process.stderr.write(`${stopMessage}\n`);
}
// Emit final result event for streaming JSON
if (streamFormatter) {
const metrics = uiTelemetryService.getMetrics();
const durationMs = Date.now() - startTime;
streamFormatter.emitEvent({
type: JsonStreamEventType.RESULT,
timestamp: new Date().toISOString(),
status: 'success',
stats: streamFormatter.convertToStreamStats(
metrics,
durationMs,
),
promptId: prompt_id,
});
} else if (config.getOutputFormat() === OutputFormat.JSON) {
const formatter = new JsonFormatter();
const stats = uiTelemetryService.getMetrics();
textOutput.write(
formatter.format(config.getSessionId(), responseText, stats),
);
} else {
textOutput.ensureTrailingNewline(); // Ensure a final newline
}
return;
}
currentMessages = [{ role: 'user', parts: toolResponseParts }];
} else {
// Emit final result event for streaming JSON
// Wire Ctrl+C to session abort
abortController.signal.addEventListener('abort', () => {
void session.abort();
});
// Start the agentic loop (runs in background)
await session.send({
message: geminiPartsToContentParts(query),
});
const getFirstText = (parts?: ContentPart[]): string | undefined => {
const part = parts?.[0];
return part?.type === 'text' ? part.text : undefined;
};
const emitFinalSuccessResult = (): void => {
if (streamFormatter) {
const metrics = uiTelemetryService.getMetrics();
const durationMs = Date.now() - startTime;
@@ -512,9 +319,199 @@ export async function runNonInteractive({
formatter.format(config.getSessionId(), responseText, stats),
);
} else {
textOutput.ensureTrailingNewline(); // Ensure a final newline
textOutput.ensureTrailingNewline();
}
return;
};
const reconstructFatalError = (event: AgentEvent<'error'>): Error => {
const errToThrow = new Error(event.message);
const errorMeta = event._meta;
if (errorMeta?.['exitCode'] !== undefined) {
Object.defineProperty(errToThrow, 'exitCode', {
value: errorMeta['exitCode'],
enumerable: true,
});
}
if (errorMeta?.['errorName'] !== undefined) {
Object.defineProperty(errToThrow, 'name', {
value: errorMeta['errorName'],
enumerable: true,
});
}
if (errorMeta?.['code'] !== undefined) {
Object.defineProperty(errToThrow, 'code', {
value: errorMeta['code'],
enumerable: true,
});
}
return errToThrow;
};
// Consume AgentEvents for output formatting
let responseText = '';
let streamEnded = false;
for await (const event of session.stream()) {
if (streamEnded) break;
switch (event.type) {
case 'message': {
if (event.role === 'agent') {
for (const part of event.content) {
if (part.type === 'text') {
const isRaw =
config.getRawOutput() || config.getAcceptRawOutputRisk();
const output = isRaw ? part.text : stripAnsi(part.text);
if (streamFormatter) {
streamFormatter.emitEvent({
type: JsonStreamEventType.MESSAGE,
timestamp: new Date().toISOString(),
role: 'assistant',
content: output,
delta: true,
});
} else if (config.getOutputFormat() === OutputFormat.JSON) {
responseText += output;
} else {
if (part.text) {
textOutput.write(output);
}
}
}
}
}
break;
}
case 'tool_request': {
if (streamFormatter) {
streamFormatter.emitEvent({
type: JsonStreamEventType.TOOL_USE,
timestamp: new Date().toISOString(),
tool_name: event.name,
tool_id: event.requestId,
parameters: event.args,
});
}
break;
}
case 'tool_response': {
textOutput.ensureTrailingNewline();
if (streamFormatter) {
const displayText = getFirstText(event.displayContent);
const errorMsg = getFirstText(event.content) ?? 'Tool error';
streamFormatter.emitEvent({
type: JsonStreamEventType.TOOL_RESULT,
timestamp: new Date().toISOString(),
tool_id: event.requestId,
status: event.isError ? 'error' : 'success',
output: displayText,
error: event.isError
? {
type:
typeof event.data?.['errorType'] === 'string'
? event.data['errorType']
: 'TOOL_EXECUTION_ERROR',
message: errorMsg,
}
: undefined,
});
}
if (event.isError) {
const displayText = getFirstText(event.displayContent);
const errorMsg = getFirstText(event.content) ?? 'Tool error';
if (event.data?.['errorType'] === ToolErrorType.STOP_EXECUTION) {
const stopMessage = `Agent execution stopped: ${errorMsg}`;
if (config.getOutputFormat() === OutputFormat.TEXT) {
process.stderr.write(`${stopMessage}\n`);
}
}
handleToolError(
event.name,
new Error(errorMsg),
config,
typeof event.data?.['errorType'] === 'string'
? event.data['errorType']
: undefined,
displayText,
);
}
break;
}
case 'error': {
if (event.fatal) {
throw reconstructFatalError(event);
}
const errorCode = event._meta?.['code'];
if (errorCode === 'MAX_TURNS_EXCEEDED') {
if (streamFormatter) {
streamFormatter.emitEvent({
type: JsonStreamEventType.ERROR,
timestamp: new Date().toISOString(),
severity: 'error',
message: event.message,
});
}
break;
}
if (errorCode === 'AGENT_EXECUTION_BLOCKED') {
if (config.getOutputFormat() === OutputFormat.TEXT) {
process.stderr.write(`[WARNING] ${event.message}\n`);
}
break;
}
const severity =
event.status === 'RESOURCE_EXHAUSTED' ? 'error' : 'warning';
if (config.getOutputFormat() === OutputFormat.TEXT) {
process.stderr.write(`[WARNING] ${event.message}\n`);
}
if (streamFormatter) {
streamFormatter.emitEvent({
type: JsonStreamEventType.ERROR,
timestamp: new Date().toISOString(),
severity,
message: event.message,
});
}
break;
}
case 'stream_end': {
if (event.reason === 'aborted') {
handleCancellationError(config);
} else if (event.reason === 'max_turns') {
handleMaxTurnsExceededError(config);
}
const stopMessage =
typeof event.data?.['message'] === 'string'
? event.data['message']
: '';
if (stopMessage && config.getOutputFormat() === OutputFormat.TEXT) {
process.stderr.write(`Agent execution stopped: ${stopMessage}\n`);
}
emitFinalSuccessResult();
streamEnded = true;
break;
}
case 'custom': {
if (event.kind === 'loop_detected') {
if (streamFormatter) {
streamFormatter.emitEvent({
type: JsonStreamEventType.ERROR,
timestamp: new Date().toISOString(),
severity: 'warning',
message: 'Loop detected, stopping execution',
});
}
}
break;
}
default:
break;
}
}
} catch (error) {
@@ -0,0 +1,266 @@
/**
* @license
* Copyright 2026 Google LLC
* SPDX-License-Identifier: Apache-2.0
*/
import { describe, expect, it } from 'vitest';
import {
geminiPartsToContentParts,
contentPartsToGeminiParts,
toolResultDisplayToContentParts,
buildToolResponseData,
} from './content-utils.js';
import type { Part } from '@google/genai';
import type { ContentPart } from './types.js';
describe('geminiPartsToContentParts', () => {
it('converts text parts', () => {
const parts: Part[] = [{ text: 'hello' }];
expect(geminiPartsToContentParts(parts)).toEqual([
{ type: 'text', text: 'hello' },
]);
});
it('converts thought parts', () => {
const parts: Part[] = [
{ text: 'thinking...', thought: true, thoughtSignature: 'sig123' },
];
expect(geminiPartsToContentParts(parts)).toEqual([
{
type: 'thought',
thought: 'thinking...',
thoughtSignature: 'sig123',
},
]);
});
it('converts thought parts without signature', () => {
const parts: Part[] = [{ text: 'thinking...', thought: true }];
expect(geminiPartsToContentParts(parts)).toEqual([
{ type: 'thought', thought: 'thinking...' },
]);
});
it('converts inlineData parts to media', () => {
const parts: Part[] = [
{ inlineData: { data: 'base64data', mimeType: 'image/png' } },
];
expect(geminiPartsToContentParts(parts)).toEqual([
{ type: 'media', data: 'base64data', mimeType: 'image/png' },
]);
});
it('converts fileData parts to media', () => {
const parts: Part[] = [
{
fileData: {
fileUri: 'gs://bucket/file.pdf',
mimeType: 'application/pdf',
},
},
];
expect(geminiPartsToContentParts(parts)).toEqual([
{
type: 'media',
uri: 'gs://bucket/file.pdf',
mimeType: 'application/pdf',
},
]);
});
it('converts functionCall parts to text with metadata', () => {
const parts: Part[] = [
{ functionCall: { name: 'myFunc', args: { key: 'value' } } },
];
const result = geminiPartsToContentParts(parts);
expect(result).toHaveLength(1);
expect(result[0]?.type).toBe('text');
expect(result[0]?._meta).toEqual({ partType: 'functionCall' });
const parsed = JSON.parse(
(result[0] as { type: 'text'; text: string }).text,
);
expect(parsed.functionCall.name).toBe('myFunc');
});
it('converts functionResponse parts to text with metadata', () => {
const parts: Part[] = [
{
functionResponse: {
name: 'myFunc',
response: { output: 'result' },
},
},
];
const result = geminiPartsToContentParts(parts);
expect(result).toHaveLength(1);
expect(result[0]?.type).toBe('text');
expect(result[0]?._meta).toEqual({ partType: 'functionResponse' });
});
it('serializes unknown part types to text with _meta', () => {
const parts: Part[] = [{ unknownField: 'data' } as Part];
const result = geminiPartsToContentParts(parts);
expect(result).toHaveLength(1);
expect(result[0]?.type).toBe('text');
expect(result[0]?._meta).toEqual({ partType: 'unknown' });
});
it('handles empty array', () => {
expect(geminiPartsToContentParts([])).toEqual([]);
});
it('handles mixed parts', () => {
const parts: Part[] = [
{ text: 'hello' },
{ inlineData: { data: 'img', mimeType: 'image/jpeg' } },
{ text: 'thought', thought: true },
];
const result = geminiPartsToContentParts(parts);
expect(result).toHaveLength(3);
expect(result[0]?.type).toBe('text');
expect(result[1]?.type).toBe('media');
expect(result[2]?.type).toBe('thought');
});
});
describe('contentPartsToGeminiParts', () => {
it('converts text ContentParts', () => {
const content: ContentPart[] = [{ type: 'text', text: 'hello' }];
expect(contentPartsToGeminiParts(content)).toEqual([{ text: 'hello' }]);
});
it('converts thought ContentParts', () => {
const content: ContentPart[] = [
{ type: 'thought', thought: 'thinking...', thoughtSignature: 'sig' },
];
expect(contentPartsToGeminiParts(content)).toEqual([
{ text: 'thinking...', thought: true, thoughtSignature: 'sig' },
]);
});
it('converts thought ContentParts without signature', () => {
const content: ContentPart[] = [
{ type: 'thought', thought: 'thinking...' },
];
expect(contentPartsToGeminiParts(content)).toEqual([
{ text: 'thinking...', thought: true },
]);
});
it('converts media ContentParts with data to inlineData', () => {
const content: ContentPart[] = [
{ type: 'media', data: 'base64', mimeType: 'image/png' },
];
expect(contentPartsToGeminiParts(content)).toEqual([
{ inlineData: { data: 'base64', mimeType: 'image/png' } },
]);
});
it('converts media ContentParts with uri to fileData', () => {
const content: ContentPart[] = [
{ type: 'media', uri: 'gs://bucket/file', mimeType: 'application/pdf' },
];
expect(contentPartsToGeminiParts(content)).toEqual([
{
fileData: { fileUri: 'gs://bucket/file', mimeType: 'application/pdf' },
},
]);
});
it('converts reference ContentParts to text', () => {
const content: ContentPart[] = [{ type: 'reference', text: '@file.ts' }];
expect(contentPartsToGeminiParts(content)).toEqual([{ text: '@file.ts' }]);
});
it('handles empty array', () => {
expect(contentPartsToGeminiParts([])).toEqual([]);
});
it('skips media parts with no data or uri', () => {
const content: ContentPart[] = [{ type: 'media', mimeType: 'image/png' }];
expect(contentPartsToGeminiParts(content)).toEqual([]);
});
it('defaults mimeType for media with data but no mimeType', () => {
const content: ContentPart[] = [{ type: 'media', data: 'base64data' }];
const result = contentPartsToGeminiParts(content);
expect(result).toEqual([
{
inlineData: {
data: 'base64data',
mimeType: 'application/octet-stream',
},
},
]);
});
it('serializes unknown ContentPart variants', () => {
// Force an unknown variant past the type system
const content = [
{ type: 'custom_widget', payload: 123 },
] as unknown as ContentPart[];
const result = contentPartsToGeminiParts(content);
expect(result).toHaveLength(1);
expect(result[0]).toEqual({
text: JSON.stringify({ type: 'custom_widget', payload: 123 }),
});
});
});
describe('toolResultDisplayToContentParts', () => {
it('returns undefined for undefined', () => {
expect(toolResultDisplayToContentParts(undefined)).toBeUndefined();
});
it('returns undefined for null', () => {
expect(toolResultDisplayToContentParts(null)).toBeUndefined();
});
it('handles string resultDisplay as-is', () => {
const result = toolResultDisplayToContentParts('File written');
expect(result).toEqual([{ type: 'text', text: 'File written' }]);
});
it('stringifies object resultDisplay', () => {
const display = { type: 'FileDiff', oldPath: 'a.ts', newPath: 'b.ts' };
const result = toolResultDisplayToContentParts(display);
expect(result).toEqual([{ type: 'text', text: JSON.stringify(display) }]);
});
});
describe('buildToolResponseData', () => {
it('preserves outputFile and contentLength', () => {
const result = buildToolResponseData({
outputFile: '/tmp/result.txt',
contentLength: 256,
});
expect(result).toEqual({
outputFile: '/tmp/result.txt',
contentLength: 256,
});
});
it('returns undefined for empty response', () => {
const result = buildToolResponseData({});
expect(result).toBeUndefined();
});
it('includes errorType when present', () => {
const result = buildToolResponseData({
errorType: 'permission_denied',
});
expect(result).toEqual({ errorType: 'permission_denied' });
});
it('merges data with other fields', () => {
const result = buildToolResponseData({
data: { custom: 'value' },
outputFile: '/tmp/file.txt',
});
expect(result).toEqual({
custom: 'value',
outputFile: '/tmp/file.txt',
});
});
});
+158
View File
@@ -0,0 +1,158 @@
/**
* @license
* Copyright 2026 Google LLC
* SPDX-License-Identifier: Apache-2.0
*/
import type { Part } from '@google/genai';
import type { ContentPart } from './types.js';
/**
* Converts Gemini API Part objects to framework-agnostic ContentPart objects.
* Handles text, thought, inlineData, fileData parts and serializes unknown
* part types to text to avoid silent data loss.
*/
export function geminiPartsToContentParts(parts: Part[]): ContentPart[] {
const result: ContentPart[] = [];
for (const part of parts) {
if ('text' in part && part.text !== undefined) {
if ('thought' in part && part.thought) {
result.push({
type: 'thought',
thought: part.text,
...(part.thoughtSignature
? { thoughtSignature: part.thoughtSignature }
: {}),
});
} else {
result.push({ type: 'text', text: part.text });
}
} else if ('inlineData' in part && part.inlineData) {
result.push({
type: 'media',
data: part.inlineData.data,
mimeType: part.inlineData.mimeType,
});
} else if ('fileData' in part && part.fileData) {
result.push({
type: 'media',
uri: part.fileData.fileUri,
mimeType: part.fileData.mimeType,
});
} else if ('functionCall' in part && part.functionCall) {
// Function calls are serialized to text so consumers can inspect them
result.push({
type: 'text',
text: JSON.stringify({
functionCall: {
name: part.functionCall.name,
args: part.functionCall.args,
},
}),
_meta: { partType: 'functionCall' },
});
} else if ('functionResponse' in part && part.functionResponse) {
result.push({
type: 'text',
text: JSON.stringify({
functionResponse: {
name: part.functionResponse.name,
response: part.functionResponse.response,
},
}),
_meta: { partType: 'functionResponse' },
});
} else {
// Fallback: serialize any unrecognized part type to text
result.push({
type: 'text',
text: JSON.stringify(part),
_meta: { partType: 'unknown' },
});
}
}
return result;
}
/**
* Converts framework-agnostic ContentPart objects to Gemini API Part objects.
*/
export function contentPartsToGeminiParts(content: ContentPart[]): Part[] {
const result: Part[] = [];
for (const part of content) {
switch (part.type) {
case 'text':
result.push({ text: part.text });
break;
case 'thought':
result.push({
text: part.thought,
thought: true,
...(part.thoughtSignature
? { thoughtSignature: part.thoughtSignature }
: {}),
});
break;
case 'media':
if (part.data) {
result.push({
inlineData: {
data: part.data,
mimeType: part.mimeType ?? 'application/octet-stream',
},
});
} else if (part.uri) {
result.push({
fileData: { fileUri: part.uri, mimeType: part.mimeType },
});
}
break;
case 'reference':
// References are converted to text for the model
result.push({ text: part.text });
break;
default:
// Serialize unknown ContentPart variants instead of dropping them
result.push({ text: JSON.stringify(part) });
break;
}
}
return result;
}
/**
* Converts a ToolCallResponseInfo.resultDisplay value into ContentPart[].
* Handles string, object-valued (FileDiff, SubagentProgress, etc.),
* and undefined resultDisplay consistently.
*/
export function toolResultDisplayToContentParts(
resultDisplay: unknown,
): ContentPart[] | undefined {
if (resultDisplay === undefined || resultDisplay === null) {
return undefined;
}
const text =
typeof resultDisplay === 'string'
? resultDisplay
: JSON.stringify(resultDisplay);
return [{ type: 'text', text }];
}
/**
* Builds the data record for a tool_response AgentEvent, preserving
* all available metadata from the ToolCallResponseInfo.
*/
export function buildToolResponseData(response: {
data?: Record<string, unknown>;
errorType?: string;
outputFile?: string;
contentLength?: number;
}): Record<string, unknown> | undefined {
const parts: Record<string, unknown> = {};
if (response.data) Object.assign(parts, response.data);
if (response.errorType) parts['errorType'] = response.errorType;
if (response.outputFile) parts['outputFile'] = response.outputFile;
if (response.contentLength !== undefined)
parts['contentLength'] = response.contentLength;
return Object.keys(parts).length > 0 ? parts : undefined;
}
@@ -0,0 +1,705 @@
/**
* @license
* Copyright 2026 Google LLC
* SPDX-License-Identifier: Apache-2.0
*/
import { describe, expect, it, beforeEach } from 'vitest';
import { FinishReason } from '@google/genai';
import { ToolErrorType } from '../tools/tool-error.js';
import {
translateEvent,
createTranslationState,
mapFinishReason,
mapHttpToGrpcStatus,
mapError,
mapUsage,
type TranslationState,
} from './event-translator.js';
import { GeminiEventType } from '../core/turn.js';
import type { ServerGeminiStreamEvent } from '../core/turn.js';
import type { AgentEvent } from './types.js';
describe('createTranslationState', () => {
it('creates state with default streamId', () => {
const state = createTranslationState();
expect(state.streamId).toBeDefined();
expect(state.streamStartEmitted).toBe(false);
expect(state.model).toBeUndefined();
expect(state.eventCounter).toBe(0);
expect(state.pendingToolNames.size).toBe(0);
});
it('creates state with custom streamId', () => {
const state = createTranslationState('custom-stream');
expect(state.streamId).toBe('custom-stream');
});
});
describe('translateEvent', () => {
let state: TranslationState;
beforeEach(() => {
state = createTranslationState('test-stream');
});
describe('Content events', () => {
it('emits stream_start + message for first content event', () => {
const event: ServerGeminiStreamEvent = {
type: GeminiEventType.Content,
value: 'Hello world',
};
const result = translateEvent(event, state);
expect(result).toHaveLength(2);
expect(result[0]?.type).toBe('stream_start');
expect(result[1]?.type).toBe('message');
const msg = result[1] as AgentEvent<'message'>;
expect(msg.role).toBe('agent');
expect(msg.content).toEqual([{ type: 'text', text: 'Hello world' }]);
});
it('skips stream_start for subsequent content events', () => {
state.streamStartEmitted = true;
const event: ServerGeminiStreamEvent = {
type: GeminiEventType.Content,
value: 'more text',
};
const result = translateEvent(event, state);
expect(result).toHaveLength(1);
expect(result[0]?.type).toBe('message');
});
});
describe('Thought events', () => {
it('emits thought content with metadata', () => {
state.streamStartEmitted = true;
const event: ServerGeminiStreamEvent = {
type: GeminiEventType.Thought,
value: { subject: 'Planning', description: 'I am thinking...' },
};
const result = translateEvent(event, state);
expect(result).toHaveLength(1);
const msg = result[0] as AgentEvent<'message'>;
expect(msg.content).toEqual([
{ type: 'thought', thought: 'I am thinking...' },
]);
expect(msg._meta?.['subject']).toBe('Planning');
});
});
describe('ToolCallRequest events', () => {
it('emits tool_request and tracks pending tool name', () => {
state.streamStartEmitted = true;
const event: ServerGeminiStreamEvent = {
type: GeminiEventType.ToolCallRequest,
value: {
callId: 'call-1',
name: 'read_file',
args: { path: '/tmp/test' },
isClientInitiated: false,
prompt_id: 'p1',
},
};
const result = translateEvent(event, state);
expect(result).toHaveLength(1);
const req = result[0] as AgentEvent<'tool_request'>;
expect(req.requestId).toBe('call-1');
expect(req.name).toBe('read_file');
expect(req.args).toEqual({ path: '/tmp/test' });
expect(state.pendingToolNames.get('call-1')).toBe('read_file');
});
});
describe('ToolCallResponse events', () => {
it('emits tool_response with content from responseParts', () => {
state.streamStartEmitted = true;
state.pendingToolNames.set('call-1', 'read_file');
const event: ServerGeminiStreamEvent = {
type: GeminiEventType.ToolCallResponse,
value: {
callId: 'call-1',
responseParts: [{ text: 'file contents' }],
resultDisplay: undefined,
error: undefined,
errorType: undefined,
},
};
const result = translateEvent(event, state);
expect(result).toHaveLength(1);
const resp = result[0] as AgentEvent<'tool_response'>;
expect(resp.requestId).toBe('call-1');
expect(resp.name).toBe('read_file');
expect(resp.content).toEqual([{ type: 'text', text: 'file contents' }]);
expect(resp.isError).toBe(false);
expect(state.pendingToolNames.has('call-1')).toBe(false);
});
it('uses error.message for content when tool errored', () => {
state.streamStartEmitted = true;
state.pendingToolNames.set('call-2', 'write_file');
const event: ServerGeminiStreamEvent = {
type: GeminiEventType.ToolCallResponse,
value: {
callId: 'call-2',
responseParts: [{ text: 'stale parts' }],
resultDisplay: 'Permission denied',
error: new Error('Permission denied to write'),
errorType: ToolErrorType.PERMISSION_DENIED,
},
};
const result = translateEvent(event, state);
expect(result).toHaveLength(1);
const resp = result[0] as AgentEvent<'tool_response'>;
expect(resp.isError).toBe(true);
// Should use error.message, not responseParts
expect(resp.content).toEqual([
{ type: 'text', text: 'Permission denied to write' },
]);
expect(resp.displayContent).toEqual([
{ type: 'text', text: 'Permission denied' },
]);
expect(resp.data).toEqual({ errorType: 'permission_denied' });
});
it('uses "unknown" name for untracked tool calls', () => {
state.streamStartEmitted = true;
const event: ServerGeminiStreamEvent = {
type: GeminiEventType.ToolCallResponse,
value: {
callId: 'untracked',
responseParts: [{ text: 'data' }],
resultDisplay: undefined,
error: undefined,
errorType: undefined,
},
};
const result = translateEvent(event, state);
const resp = result[0] as AgentEvent<'tool_response'>;
expect(resp.name).toBe('unknown');
});
it('stringifies object resultDisplay correctly', () => {
state.streamStartEmitted = true;
state.pendingToolNames.set('call-3', 'diff_tool');
const objectDisplay = { type: 'FileDiff', before: 'a', after: 'b' };
const event: ServerGeminiStreamEvent = {
type: GeminiEventType.ToolCallResponse,
value: {
callId: 'call-3',
responseParts: [{ text: 'diff result' }],
resultDisplay: objectDisplay,
error: undefined,
errorType: undefined,
},
};
const result = translateEvent(event, state);
const resp = result[0] as AgentEvent<'tool_response'>;
expect(resp.displayContent).toEqual([
{ type: 'text', text: JSON.stringify(objectDisplay) },
]);
});
it('passes through string resultDisplay as-is', () => {
state.streamStartEmitted = true;
state.pendingToolNames.set('call-4', 'shell');
const event: ServerGeminiStreamEvent = {
type: GeminiEventType.ToolCallResponse,
value: {
callId: 'call-4',
responseParts: [{ text: 'output' }],
resultDisplay: 'Command output text',
error: undefined,
errorType: undefined,
},
};
const result = translateEvent(event, state);
const resp = result[0] as AgentEvent<'tool_response'>;
expect(resp.displayContent).toEqual([
{ type: 'text', text: 'Command output text' },
]);
});
it('preserves outputFile and contentLength in data', () => {
state.streamStartEmitted = true;
state.pendingToolNames.set('call-5', 'write_file');
const event: ServerGeminiStreamEvent = {
type: GeminiEventType.ToolCallResponse,
value: {
callId: 'call-5',
responseParts: [{ text: 'written' }],
resultDisplay: undefined,
error: undefined,
errorType: undefined,
outputFile: '/tmp/out.txt',
contentLength: 42,
},
};
const result = translateEvent(event, state);
const resp = result[0] as AgentEvent<'tool_response'>;
expect(resp.data?.['outputFile']).toBe('/tmp/out.txt');
expect(resp.data?.['contentLength']).toBe(42);
});
it('handles multi-part responses (text + inlineData)', () => {
state.streamStartEmitted = true;
state.pendingToolNames.set('call-6', 'screenshot');
const event: ServerGeminiStreamEvent = {
type: GeminiEventType.ToolCallResponse,
value: {
callId: 'call-6',
responseParts: [
{ text: 'Here is the screenshot' },
{ inlineData: { data: 'base64img', mimeType: 'image/png' } },
],
resultDisplay: undefined,
error: undefined,
errorType: undefined,
},
};
const result = translateEvent(event, state);
const resp = result[0] as AgentEvent<'tool_response'>;
expect(resp.content).toEqual([
{ type: 'text', text: 'Here is the screenshot' },
{ type: 'media', data: 'base64img', mimeType: 'image/png' },
]);
expect(resp.isError).toBe(false);
});
});
describe('Error events', () => {
it('emits error event for structured errors', () => {
state.streamStartEmitted = true;
const event: ServerGeminiStreamEvent = {
type: GeminiEventType.Error,
value: { error: { message: 'Rate limited', status: 429 } },
};
const result = translateEvent(event, state);
expect(result).toHaveLength(1);
const err = result[0] as AgentEvent<'error'>;
expect(err.status).toBe('RESOURCE_EXHAUSTED');
expect(err.message).toBe('Rate limited');
expect(err.fatal).toBe(true);
});
it('emits error event for Error instances', () => {
state.streamStartEmitted = true;
const event: ServerGeminiStreamEvent = {
type: GeminiEventType.Error,
value: { error: new Error('Something broke') },
};
const result = translateEvent(event, state);
const err = result[0] as AgentEvent<'error'>;
expect(err.status).toBe('INTERNAL');
expect(err.message).toBe('Something broke');
});
});
describe('ModelInfo events', () => {
it('emits stream_start when no stream started yet', () => {
const event: ServerGeminiStreamEvent = {
type: GeminiEventType.ModelInfo,
value: 'gemini-2.5-pro',
};
const result = translateEvent(event, state);
expect(result).toHaveLength(1);
expect(result[0]?.type).toBe('stream_start');
expect(state.model).toBe('gemini-2.5-pro');
expect(state.streamStartEmitted).toBe(true);
});
it('emits session_update when stream already started', () => {
state.streamStartEmitted = true;
const event: ServerGeminiStreamEvent = {
type: GeminiEventType.ModelInfo,
value: 'gemini-2.5-flash',
};
const result = translateEvent(event, state);
expect(result).toHaveLength(1);
expect(result[0]?.type).toBe('session_update');
});
});
describe('AgentExecutionStopped events', () => {
it('emits stream_end with the final stop message in data.message', () => {
state.streamStartEmitted = true;
const event: ServerGeminiStreamEvent = {
type: GeminiEventType.AgentExecutionStopped,
value: {
reason: 'before_model',
systemMessage: 'Stopped by hook',
contextCleared: true,
},
};
const result = translateEvent(event, state);
expect(result).toHaveLength(1);
const streamEnd = result[0] as AgentEvent<'stream_end'>;
expect(streamEnd.type).toBe('stream_end');
expect(streamEnd.reason).toBe('completed');
expect(streamEnd.data).toEqual({ message: 'Stopped by hook' });
});
it('uses reason when systemMessage is not set', () => {
state.streamStartEmitted = true;
const event: ServerGeminiStreamEvent = {
type: GeminiEventType.AgentExecutionStopped,
value: { reason: 'hook' },
};
const result = translateEvent(event, state);
expect(result).toHaveLength(1);
const streamEnd = result[0] as AgentEvent<'stream_end'>;
expect(streamEnd.data).toEqual({ message: 'hook' });
});
});
describe('AgentExecutionBlocked events', () => {
it('emits non-fatal error event (non-terminal, stream continues)', () => {
state.streamStartEmitted = true;
const event: ServerGeminiStreamEvent = {
type: GeminiEventType.AgentExecutionBlocked,
value: { reason: 'Policy violation' },
};
const result = translateEvent(event, state);
expect(result).toHaveLength(1);
const err = result[0] as AgentEvent<'error'>;
expect(err.type).toBe('error');
expect(err.fatal).toBe(false);
expect(err._meta?.['code']).toBe('AGENT_EXECUTION_BLOCKED');
expect(err.message).toBe('Agent execution blocked: Policy violation');
});
it('uses systemMessage in the final error message when available', () => {
state.streamStartEmitted = true;
const event: ServerGeminiStreamEvent = {
type: GeminiEventType.AgentExecutionBlocked,
value: {
reason: 'hook_blocked',
systemMessage: 'Blocked by policy hook',
contextCleared: true,
},
};
const result = translateEvent(event, state);
const err = result[0] as AgentEvent<'error'>;
expect(err.message).toBe(
'Agent execution blocked: Blocked by policy hook',
);
});
});
describe('LoopDetected events', () => {
it('emits a custom loop_detected event', () => {
state.streamStartEmitted = true;
const event: ServerGeminiStreamEvent = {
type: GeminiEventType.LoopDetected,
};
const result = translateEvent(event, state);
expect(result).toHaveLength(1);
expect(result[0]?.type).toBe('custom');
expect((result[0] as AgentEvent<'custom'>).kind).toBe('loop_detected');
});
});
describe('MaxSessionTurns events', () => {
it('emits a non-fatal max-turns error event', () => {
state.streamStartEmitted = true;
const event: ServerGeminiStreamEvent = {
type: GeminiEventType.MaxSessionTurns,
};
const result = translateEvent(event, state);
expect(result).toHaveLength(1);
const err = result[0] as AgentEvent<'error'>;
expect(err.type).toBe('error');
expect(err.fatal).toBe(false);
expect(err.status).toBe('RESOURCE_EXHAUSTED');
expect(err._meta?.['code']).toBe('MAX_TURNS_EXCEEDED');
expect(err.message).toBe('Maximum session turns exceeded');
});
});
describe('Finished events', () => {
it('emits usage + stream_end for STOP', () => {
state.streamStartEmitted = true;
state.model = 'gemini-2.5-pro';
const event: ServerGeminiStreamEvent = {
type: GeminiEventType.Finished,
value: {
reason: FinishReason.STOP,
usageMetadata: {
promptTokenCount: 100,
candidatesTokenCount: 50,
cachedContentTokenCount: 10,
},
},
};
const result = translateEvent(event, state);
expect(result).toHaveLength(2);
const usage = result[0] as AgentEvent<'usage'>;
expect(usage.model).toBe('gemini-2.5-pro');
expect(usage.inputTokens).toBe(100);
expect(usage.outputTokens).toBe(50);
expect(usage.cachedTokens).toBe(10);
const end = result[1] as AgentEvent<'stream_end'>;
expect(end.reason).toBe('completed');
});
it('emits stream_end without usage when no metadata', () => {
state.streamStartEmitted = true;
const event: ServerGeminiStreamEvent = {
type: GeminiEventType.Finished,
value: { reason: undefined, usageMetadata: undefined },
};
const result = translateEvent(event, state);
expect(result).toHaveLength(1);
expect(result[0]?.type).toBe('stream_end');
});
});
describe('Citation events', () => {
it('emits message with citation meta', () => {
state.streamStartEmitted = true;
const event: ServerGeminiStreamEvent = {
type: GeminiEventType.Citation,
value: 'Source: example.com',
};
const result = translateEvent(event, state);
expect(result).toHaveLength(1);
const msg = result[0] as AgentEvent<'message'>;
expect(msg.content).toEqual([
{ type: 'text', text: 'Source: example.com' },
]);
expect(msg._meta?.['citation']).toBe(true);
});
});
describe('UserCancelled events', () => {
it('emits stream_end with reason aborted', () => {
state.streamStartEmitted = true;
const event: ServerGeminiStreamEvent = {
type: GeminiEventType.UserCancelled,
};
const result = translateEvent(event, state);
expect(result).toHaveLength(1);
const end = result[0] as AgentEvent<'stream_end'>;
expect(end.type).toBe('stream_end');
expect(end.reason).toBe('aborted');
});
});
describe('ContextWindowWillOverflow events', () => {
it('emits fatal error', () => {
state.streamStartEmitted = true;
const event: ServerGeminiStreamEvent = {
type: GeminiEventType.ContextWindowWillOverflow,
value: {
estimatedRequestTokenCount: 150000,
remainingTokenCount: 10000,
},
};
const result = translateEvent(event, state);
expect(result).toHaveLength(1);
const err = result[0] as AgentEvent<'error'>;
expect(err.status).toBe('RESOURCE_EXHAUSTED');
expect(err.fatal).toBe(true);
expect(err.message).toContain('150000');
expect(err.message).toContain('10000');
});
});
describe('InvalidStream events', () => {
it('emits fatal error', () => {
state.streamStartEmitted = true;
const event: ServerGeminiStreamEvent = {
type: GeminiEventType.InvalidStream,
};
const result = translateEvent(event, state);
expect(result).toHaveLength(1);
const err = result[0] as AgentEvent<'error'>;
expect(err.status).toBe('INTERNAL');
expect(err.message).toBe('Invalid stream received from model');
expect(err.fatal).toBe(true);
});
});
describe('Events with no output', () => {
it('returns empty for Retry', () => {
const result = translateEvent({ type: GeminiEventType.Retry }, state);
expect(result).toEqual([]);
});
it('returns empty for ChatCompressed with null', () => {
const result = translateEvent(
{ type: GeminiEventType.ChatCompressed, value: null },
state,
);
expect(result).toEqual([]);
});
it('returns empty for ToolCallConfirmation', () => {
// ToolCallConfirmation is skipped in non-interactive mode (elicitations
// are deferred to the interactive runtime adaptation).
const event = {
type: GeminiEventType.ToolCallConfirmation,
value: {
request: {
callId: 'c1',
name: 'tool',
args: {},
isClientInitiated: false,
prompt_id: 'p1',
},
details: { type: 'info', title: 'Confirm', prompt: 'Confirm?' },
},
} as ServerGeminiStreamEvent;
const result = translateEvent(event, state);
expect(result).toEqual([]);
});
});
describe('Event IDs', () => {
it('generates sequential IDs', () => {
state.streamStartEmitted = true;
const e1 = translateEvent(
{ type: GeminiEventType.Content, value: 'a' },
state,
);
const e2 = translateEvent(
{ type: GeminiEventType.Content, value: 'b' },
state,
);
expect(e1[0]?.id).toBe('test-stream-0');
expect(e2[0]?.id).toBe('test-stream-1');
});
it('includes streamId in events', () => {
const events = translateEvent(
{ type: GeminiEventType.Content, value: 'hi' },
state,
);
for (const e of events) {
expect(e.streamId).toBe('test-stream');
}
});
});
});
describe('mapFinishReason', () => {
it('maps STOP to completed', () => {
expect(mapFinishReason(FinishReason.STOP)).toBe('completed');
});
it('maps undefined to completed', () => {
expect(mapFinishReason(undefined)).toBe('completed');
});
it('maps MAX_TOKENS to max_budget', () => {
expect(mapFinishReason(FinishReason.MAX_TOKENS)).toBe('max_budget');
});
it('maps SAFETY to refusal', () => {
expect(mapFinishReason(FinishReason.SAFETY)).toBe('refusal');
});
it('maps MALFORMED_FUNCTION_CALL to failed', () => {
expect(mapFinishReason(FinishReason.MALFORMED_FUNCTION_CALL)).toBe(
'failed',
);
});
it('maps RECITATION to refusal', () => {
expect(mapFinishReason(FinishReason.RECITATION)).toBe('refusal');
});
it('maps LANGUAGE to refusal', () => {
expect(mapFinishReason(FinishReason.LANGUAGE)).toBe('refusal');
});
it('maps BLOCKLIST to refusal', () => {
expect(mapFinishReason(FinishReason.BLOCKLIST)).toBe('refusal');
});
it('maps OTHER to failed', () => {
expect(mapFinishReason(FinishReason.OTHER)).toBe('failed');
});
it('maps PROHIBITED_CONTENT to refusal', () => {
expect(mapFinishReason(FinishReason.PROHIBITED_CONTENT)).toBe('refusal');
});
});
describe('mapHttpToGrpcStatus', () => {
it('maps 400 to INVALID_ARGUMENT', () => {
expect(mapHttpToGrpcStatus(400)).toBe('INVALID_ARGUMENT');
});
it('maps 401 to UNAUTHENTICATED', () => {
expect(mapHttpToGrpcStatus(401)).toBe('UNAUTHENTICATED');
});
it('maps 429 to RESOURCE_EXHAUSTED', () => {
expect(mapHttpToGrpcStatus(429)).toBe('RESOURCE_EXHAUSTED');
});
it('maps undefined to INTERNAL', () => {
expect(mapHttpToGrpcStatus(undefined)).toBe('INTERNAL');
});
it('maps unknown codes to INTERNAL', () => {
expect(mapHttpToGrpcStatus(418)).toBe('INTERNAL');
});
});
describe('mapError', () => {
it('maps structured errors with status', () => {
const result = mapError({ message: 'Rate limit', status: 429 });
expect(result.status).toBe('RESOURCE_EXHAUSTED');
expect(result.message).toBe('Rate limit');
expect(result.fatal).toBe(true);
});
it('maps Error instances', () => {
const result = mapError(new Error('Something failed'));
expect(result.status).toBe('INTERNAL');
expect(result.message).toBe('Something failed');
});
it('preserves error name in _meta', () => {
class CustomError extends Error {
constructor(msg: string) {
super(msg);
}
}
const result = mapError(new CustomError('test'));
expect(result._meta?.['errorName']).toBe('CustomError');
});
it('maps non-Error values to string', () => {
const result = mapError('raw string error');
expect(result.message).toBe('raw string error');
expect(result.status).toBe('INTERNAL');
});
});
describe('mapUsage', () => {
it('maps all fields', () => {
const result = mapUsage(
{
promptTokenCount: 100,
candidatesTokenCount: 50,
cachedContentTokenCount: 25,
},
'gemini-2.5-pro',
);
expect(result).toEqual({
model: 'gemini-2.5-pro',
inputTokens: 100,
outputTokens: 50,
cachedTokens: 25,
});
});
it('uses "unknown" for missing model', () => {
const result = mapUsage({});
expect(result.model).toBe('unknown');
});
});
+456
View File
@@ -0,0 +1,456 @@
/**
* @license
* Copyright 2026 Google LLC
* SPDX-License-Identifier: Apache-2.0
*/
/**
* @fileoverview Pure, stateless-per-call translation functions that convert
* ServerGeminiStreamEvent objects into AgentEvent objects.
*
* No side effects, no generators. Each call to `translateEvent` takes an event
* and mutable TranslationState, returning zero or more AgentEvents.
*/
import type { FinishReason } from '@google/genai';
import { GeminiEventType } from '../core/turn.js';
import type {
ServerGeminiStreamEvent,
StructuredError,
GeminiFinishedEventValue,
} from '../core/turn.js';
import type { AgentEvent, StreamEndReason, ErrorData, Usage } from './types.js';
import {
geminiPartsToContentParts,
toolResultDisplayToContentParts,
buildToolResponseData,
} from './content-utils.js';
// ---------------------------------------------------------------------------
// Translation State
// ---------------------------------------------------------------------------
export interface TranslationState {
streamId: string;
streamStartEmitted: boolean;
model: string | undefined;
eventCounter: number;
/** Tracks callId → tool name from requests so responses can reference the name. */
pendingToolNames: Map<string, string>;
}
export function createTranslationState(streamId?: string): TranslationState {
return {
streamId: streamId ?? crypto.randomUUID(),
streamStartEmitted: false,
model: undefined,
eventCounter: 0,
pendingToolNames: new Map(),
};
}
// ---------------------------------------------------------------------------
// Helpers
// ---------------------------------------------------------------------------
function makeEvent(
type: AgentEvent['type'],
state: TranslationState,
payload: Partial<AgentEvent>,
): AgentEvent {
const id = `${state.streamId}-${state.eventCounter++}`;
// eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion -- constructing AgentEvent from common fields + payload
return {
...payload,
id,
timestamp: new Date().toISOString(),
streamId: state.streamId,
type,
} as AgentEvent;
}
function ensureStreamStart(state: TranslationState, out: AgentEvent[]): void {
if (!state.streamStartEmitted) {
out.push(makeEvent('stream_start', state, { streamId: state.streamId }));
state.streamStartEmitted = true;
}
}
// ---------------------------------------------------------------------------
// Core Translator
// ---------------------------------------------------------------------------
/**
* Translates a single ServerGeminiStreamEvent into zero or more AgentEvents.
* Mutates `state` (counter, flags) as a side effect.
*/
export function translateEvent(
event: ServerGeminiStreamEvent,
state: TranslationState,
): AgentEvent[] {
const out: AgentEvent[] = [];
switch (event.type) {
case GeminiEventType.ModelInfo:
state.model = event.value;
if (!state.streamStartEmitted) {
out.push(
makeEvent('stream_start', state, { streamId: state.streamId }),
);
state.streamStartEmitted = true;
} else {
out.push(makeEvent('session_update', state, { model: event.value }));
}
break;
case GeminiEventType.Content:
ensureStreamStart(state, out);
out.push(
makeEvent('message', state, {
role: 'agent',
content: [{ type: 'text', text: event.value }],
}),
);
break;
case GeminiEventType.Thought:
ensureStreamStart(state, out);
out.push(
makeEvent('message', state, {
role: 'agent',
content: [{ type: 'thought', thought: event.value.description }],
_meta: event.value.subject
? { source: 'agent', subject: event.value.subject }
: { source: 'agent' },
}),
);
break;
case GeminiEventType.Citation:
ensureStreamStart(state, out);
out.push(
makeEvent('message', state, {
role: 'agent',
content: [{ type: 'text', text: event.value }],
_meta: { source: 'agent', citation: true },
}),
);
break;
case GeminiEventType.Finished:
handleFinished(event.value, state, out);
break;
case GeminiEventType.Error:
handleError(event.value.error, state, out);
break;
case GeminiEventType.UserCancelled:
ensureStreamStart(state, out);
out.push(
makeEvent('stream_end', state, {
streamId: state.streamId,
reason: 'aborted',
}),
);
break;
case GeminiEventType.MaxSessionTurns:
ensureStreamStart(state, out);
out.push(
makeEvent('error', state, {
status: 'RESOURCE_EXHAUSTED',
message: 'Maximum session turns exceeded',
fatal: false,
_meta: { code: 'MAX_TURNS_EXCEEDED' },
}),
);
break;
case GeminiEventType.LoopDetected:
ensureStreamStart(state, out);
out.push(
makeEvent('custom', state, {
kind: 'loop_detected',
}),
);
// No stream_end — the stream continues. Consumer decides how to handle:
// non-interactive emits a warning, interactive shows a confirmation dialog.
break;
case GeminiEventType.ContextWindowWillOverflow:
ensureStreamStart(state, out);
out.push(
makeEvent('error', state, {
status: 'RESOURCE_EXHAUSTED',
message: `Context window will overflow (estimated: ${event.value.estimatedRequestTokenCount}, remaining: ${event.value.remainingTokenCount})`,
fatal: true,
}),
);
break;
case GeminiEventType.AgentExecutionStopped:
ensureStreamStart(state, out);
out.push(
makeEvent('stream_end', state, {
streamId: state.streamId,
reason: 'completed',
data: {
message: event.value.systemMessage?.trim() || event.value.reason,
},
}),
);
break;
case GeminiEventType.AgentExecutionBlocked:
ensureStreamStart(state, out);
out.push(
makeEvent('error', state, {
status: 'PERMISSION_DENIED',
message: `Agent execution blocked: ${event.value.systemMessage?.trim() || event.value.reason}`,
fatal: false,
_meta: { code: 'AGENT_EXECUTION_BLOCKED' },
}),
);
break;
case GeminiEventType.InvalidStream:
ensureStreamStart(state, out);
out.push(
makeEvent('error', state, {
status: 'INTERNAL',
message: 'Invalid stream received from model',
fatal: true,
}),
);
break;
case GeminiEventType.ToolCallRequest:
ensureStreamStart(state, out);
state.pendingToolNames.set(event.value.callId, event.value.name);
out.push(
makeEvent('tool_request', state, {
requestId: event.value.callId,
name: event.value.name,
args: event.value.args,
}),
);
break;
case GeminiEventType.ToolCallResponse: {
ensureStreamStart(state, out);
const displayContent = toolResultDisplayToContentParts(
event.value.resultDisplay,
);
const data = buildToolResponseData(event.value);
out.push(
makeEvent('tool_response', state, {
requestId: event.value.callId,
name: state.pendingToolNames.get(event.value.callId) ?? 'unknown',
content: event.value.error
? [{ type: 'text', text: event.value.error.message }]
: geminiPartsToContentParts(event.value.responseParts),
isError: event.value.error !== undefined,
...(displayContent ? { displayContent } : {}),
...(data ? { data } : {}),
}),
);
state.pendingToolNames.delete(event.value.callId);
break;
}
case GeminiEventType.ToolCallConfirmation:
// Elicitations are handled separately by the session layer
break;
// Internal concerns — no AgentEvent emitted
case GeminiEventType.ChatCompressed:
case GeminiEventType.Retry:
break;
default:
break;
}
return out;
}
// ---------------------------------------------------------------------------
// Finished Event Handling
// ---------------------------------------------------------------------------
function handleFinished(
value: GeminiFinishedEventValue,
state: TranslationState,
out: AgentEvent[],
): void {
ensureStreamStart(state, out);
if (value.usageMetadata) {
const usage = mapUsage(value.usageMetadata, state.model);
out.push(makeEvent('usage', state, usage));
}
out.push(
makeEvent('stream_end', state, {
streamId: state.streamId,
reason: mapFinishReason(value.reason),
}),
);
}
// ---------------------------------------------------------------------------
// Error Handling
// ---------------------------------------------------------------------------
function handleError(
error: unknown,
state: TranslationState,
out: AgentEvent[],
): void {
ensureStreamStart(state, out);
const mapped = mapError(error);
out.push(makeEvent('error', state, mapped));
}
// ---------------------------------------------------------------------------
// Public Mapping Functions
// ---------------------------------------------------------------------------
/**
* Maps a Gemini FinishReason to a StreamEndReason.
*/
export function mapFinishReason(
reason: FinishReason | undefined,
): StreamEndReason {
if (!reason) return 'completed';
switch (reason) {
case 'STOP':
case 'FINISH_REASON_UNSPECIFIED':
return 'completed';
case 'MAX_TOKENS':
return 'max_budget';
case 'SAFETY':
case 'RECITATION':
case 'LANGUAGE':
case 'BLOCKLIST':
case 'PROHIBITED_CONTENT':
case 'SPII':
return 'refusal';
case 'MALFORMED_FUNCTION_CALL':
case 'OTHER':
return 'failed';
default:
return 'failed';
}
}
/**
* Maps an HTTP status code to a gRPC-style status string.
*/
export function mapHttpToGrpcStatus(
httpStatus: number | undefined,
): ErrorData['status'] {
if (httpStatus === undefined) return 'INTERNAL';
switch (httpStatus) {
case 400:
return 'INVALID_ARGUMENT';
case 401:
return 'UNAUTHENTICATED';
case 403:
return 'PERMISSION_DENIED';
case 404:
return 'NOT_FOUND';
case 409:
return 'ALREADY_EXISTS';
case 429:
return 'RESOURCE_EXHAUSTED';
case 500:
return 'INTERNAL';
case 501:
return 'UNIMPLEMENTED';
case 503:
return 'UNAVAILABLE';
case 504:
return 'DEADLINE_EXCEEDED';
default:
return 'INTERNAL';
}
}
/**
* Maps a StructuredError (or unknown error value) to an ErrorData payload.
* Review fix #4: preserves error metadata (name, code, stack) in _meta.
*/
export function mapError(
error: unknown,
): ErrorData & { _meta?: Record<string, unknown> } {
const meta: Record<string, unknown> = {};
if (error instanceof Error) {
meta['errorName'] = error.constructor.name;
if ('exitCode' in error && typeof error.exitCode === 'number') {
meta['exitCode'] = error.exitCode;
}
if ('code' in error) {
meta['code'] = error.code;
}
}
const hasMeta = Object.keys(meta).length > 0;
if (isStructuredError(error)) {
return {
status: mapHttpToGrpcStatus(error.status),
message: error.message,
fatal: true,
...(hasMeta ? { _meta: meta } : {}),
};
}
if (error instanceof Error) {
return {
status: 'INTERNAL',
message: error.message,
fatal: true,
...(hasMeta ? { _meta: meta } : {}),
};
}
return {
status: 'INTERNAL',
message: String(error),
fatal: true,
};
}
function isStructuredError(error: unknown): error is StructuredError {
return (
typeof error === 'object' &&
error !== null &&
'message' in error &&
typeof error.message === 'string'
);
}
/**
* Maps Gemini usageMetadata to Usage.
*/
export function mapUsage(
metadata: {
promptTokenCount?: number;
candidatesTokenCount?: number;
cachedContentTokenCount?: number;
},
model?: string,
): Usage {
return {
model: model ?? 'unknown',
inputTokens: metadata.promptTokenCount,
outputTokens: metadata.candidatesTokenCount,
cachedTokens: metadata.cachedContentTokenCount,
};
}
@@ -0,0 +1,870 @@
/**
* @license
* Copyright 2026 Google LLC
* SPDX-License-Identifier: Apache-2.0
*/
import { describe, expect, it, vi, beforeEach } from 'vitest';
import { FinishReason } from '@google/genai';
import { LegacyAgentSession } from './legacy-agent-session.js';
import type { LegacySessionDeps } from './legacy-agent-session.js';
import { GeminiEventType } from '../core/turn.js';
import type { ServerGeminiStreamEvent } from '../core/turn.js';
import type { AgentEvent } from './types.js';
import { ToolErrorType } from '../tools/tool-error.js';
import type {
CompletedToolCall,
ToolCallRequestInfo,
} from '../scheduler/types.js';
import { CoreToolCallStatus } from '../scheduler/types.js';
// ---------------------------------------------------------------------------
// Mock helpers
// ---------------------------------------------------------------------------
function createMockDeps(
overrides?: Partial<LegacySessionDeps>,
): LegacySessionDeps {
const mockClient = {
sendMessageStream: vi.fn(),
getChat: vi.fn().mockReturnValue({
recordCompletedToolCalls: vi.fn(),
}),
getCurrentSequenceModel: vi.fn().mockReturnValue(null),
};
const mockScheduler = {
schedule: vi.fn().mockResolvedValue([]),
};
const mockConfig = {
getMaxSessionTurns: vi.fn().mockReturnValue(-1),
getModel: vi.fn().mockReturnValue('gemini-2.5-pro'),
};
return {
client: mockClient as unknown as LegacySessionDeps['client'],
scheduler: mockScheduler as unknown as LegacySessionDeps['scheduler'],
config: mockConfig as unknown as LegacySessionDeps['config'],
promptId: 'test-prompt',
streamId: 'test-stream',
...overrides,
};
}
async function* makeStream(
events: ServerGeminiStreamEvent[],
): AsyncGenerator<ServerGeminiStreamEvent> {
for (const event of events) {
yield event;
}
}
function makeToolRequest(callId: string, name: string): ToolCallRequestInfo {
return {
callId,
name,
args: {},
isClientInitiated: false,
prompt_id: 'p1',
};
}
function makeCompletedToolCall(
callId: string,
name: string,
responseText: string,
): CompletedToolCall {
return {
status: CoreToolCallStatus.Success,
request: makeToolRequest(callId, name),
response: {
callId,
responseParts: [{ text: responseText }],
resultDisplay: undefined,
error: undefined,
errorType: undefined,
},
tool: {} as CompletedToolCall extends { tool: infer T } ? T : never,
invocation: {} as CompletedToolCall extends { invocation: infer T }
? T
: never,
} as CompletedToolCall;
}
async function collectEvents(
session: LegacyAgentSession,
): Promise<AgentEvent[]> {
const events: AgentEvent[] = [];
for await (const event of session.stream()) {
events.push(event);
}
return events;
}
// ---------------------------------------------------------------------------
// Tests
// ---------------------------------------------------------------------------
describe('LegacyAgentSession', () => {
let deps: LegacySessionDeps;
beforeEach(() => {
deps = createMockDeps();
vi.useFakeTimers({ shouldAdvanceTime: true });
});
describe('send', () => {
it('returns streamId', async () => {
const sendMock = deps.client.sendMessageStream as ReturnType<
typeof vi.fn
>;
sendMock.mockReturnValue(
makeStream([
{ type: GeminiEventType.Content, value: 'hello' },
{
type: GeminiEventType.Finished,
value: { reason: FinishReason.STOP, usageMetadata: undefined },
},
]),
);
const session = new LegacyAgentSession(deps);
const result = await session.send({
message: [{ type: 'text', text: 'hi' }],
});
expect(result.streamId).toBe('test-stream');
});
it('throws for non-message payloads', async () => {
const session = new LegacyAgentSession(deps);
await expect(session.send({ update: { title: 'test' } })).rejects.toThrow(
'only supports message sends',
);
});
});
describe('stream - basic flow', () => {
it('emits stream_start, content messages, and stream_end', async () => {
const sendMock = deps.client.sendMessageStream as ReturnType<
typeof vi.fn
>;
sendMock.mockReturnValue(
makeStream([
{ type: GeminiEventType.Content, value: 'Hello' },
{ type: GeminiEventType.Content, value: ' World' },
{
type: GeminiEventType.Finished,
value: { reason: FinishReason.STOP, usageMetadata: undefined },
},
]),
);
const session = new LegacyAgentSession(deps);
await session.send({ message: [{ type: 'text', text: 'hi' }] });
const events = await collectEvents(session);
const types = events.map((e) => e.type);
expect(types).toContain('stream_start');
expect(types).toContain('message');
expect(types).toContain('stream_end');
const messages = events.filter(
(e): e is AgentEvent<'message'> =>
e.type === 'message' && e.role === 'agent',
);
expect(messages).toHaveLength(2);
expect(messages[0]?.content).toEqual([{ type: 'text', text: 'Hello' }]);
const streamEnd = events.find(
(e): e is AgentEvent<'stream_end'> => e.type === 'stream_end',
);
expect(streamEnd?.reason).toBe('completed');
});
});
describe('stream - tool calls', () => {
it('handles a tool call round-trip', async () => {
const sendMock = deps.client.sendMessageStream as ReturnType<
typeof vi.fn
>;
// First turn: model requests a tool
sendMock.mockReturnValueOnce(
makeStream([
{
type: GeminiEventType.ToolCallRequest,
value: makeToolRequest('call-1', 'read_file'),
},
{
type: GeminiEventType.Finished,
value: { reason: FinishReason.STOP, usageMetadata: undefined },
},
]),
);
// Second turn: model provides final answer
sendMock.mockReturnValueOnce(
makeStream([
{ type: GeminiEventType.Content, value: 'Done!' },
{
type: GeminiEventType.Finished,
value: { reason: FinishReason.STOP, usageMetadata: undefined },
},
]),
);
const scheduleMock = deps.scheduler.schedule as ReturnType<typeof vi.fn>;
scheduleMock.mockResolvedValueOnce([
makeCompletedToolCall('call-1', 'read_file', 'file contents'),
]);
const session = new LegacyAgentSession(deps);
await session.send({ message: [{ type: 'text', text: 'read a file' }] });
const events = await collectEvents(session);
const types = events.map((e) => e.type);
expect(types).toContain('tool_request');
expect(types).toContain('tool_response');
expect(types).toContain('stream_end');
const toolReq = events.find(
(e): e is AgentEvent<'tool_request'> => e.type === 'tool_request',
);
expect(toolReq?.name).toBe('read_file');
const toolResp = events.find(
(e): e is AgentEvent<'tool_response'> => e.type === 'tool_response',
);
expect(toolResp?.name).toBe('read_file');
expect(toolResp?.content).toEqual([
{ type: 'text', text: 'file contents' },
]);
expect(toolResp?.isError).toBe(false);
// Should have called sendMessageStream twice
expect(sendMock).toHaveBeenCalledTimes(2);
});
it('handles tool errors and sends error message in content', async () => {
const sendMock = deps.client.sendMessageStream as ReturnType<
typeof vi.fn
>;
sendMock.mockReturnValueOnce(
makeStream([
{
type: GeminiEventType.ToolCallRequest,
value: makeToolRequest('call-1', 'write_file'),
},
{
type: GeminiEventType.Finished,
value: { reason: FinishReason.STOP, usageMetadata: undefined },
},
]),
);
sendMock.mockReturnValueOnce(
makeStream([
{ type: GeminiEventType.Content, value: 'Failed' },
{
type: GeminiEventType.Finished,
value: { reason: FinishReason.STOP, usageMetadata: undefined },
},
]),
);
const errorToolCall: CompletedToolCall = {
status: CoreToolCallStatus.Error,
request: makeToolRequest('call-1', 'write_file'),
response: {
callId: 'call-1',
responseParts: [{ text: 'stale' }],
resultDisplay: 'Error display',
error: new Error('Permission denied'),
errorType: 'permission_denied',
},
} as CompletedToolCall;
const scheduleMock = deps.scheduler.schedule as ReturnType<typeof vi.fn>;
scheduleMock.mockResolvedValueOnce([errorToolCall]);
const session = new LegacyAgentSession(deps);
await session.send({
message: [{ type: 'text', text: 'write file' }],
});
const events = await collectEvents(session);
const toolResp = events.find(
(e): e is AgentEvent<'tool_response'> => e.type === 'tool_response',
);
expect(toolResp?.isError).toBe(true);
// Uses error.message, not responseParts
expect(toolResp?.content).toEqual([
{ type: 'text', text: 'Permission denied' },
]);
expect(toolResp?.displayContent).toEqual([
{ type: 'text', text: 'Error display' },
]);
});
it('stops on STOP_EXECUTION tool error', async () => {
const sendMock = deps.client.sendMessageStream as ReturnType<
typeof vi.fn
>;
sendMock.mockReturnValueOnce(
makeStream([
{
type: GeminiEventType.ToolCallRequest,
value: makeToolRequest('call-1', 'dangerous_tool'),
},
{
type: GeminiEventType.Finished,
value: { reason: FinishReason.STOP, usageMetadata: undefined },
},
]),
);
const stopToolCall: CompletedToolCall = {
status: CoreToolCallStatus.Error,
request: makeToolRequest('call-1', 'dangerous_tool'),
response: {
callId: 'call-1',
responseParts: [],
resultDisplay: undefined,
error: new Error('Stopped by policy'),
errorType: ToolErrorType.STOP_EXECUTION,
},
} as CompletedToolCall;
const scheduleMock = deps.scheduler.schedule as ReturnType<typeof vi.fn>;
scheduleMock.mockResolvedValueOnce([stopToolCall]);
const session = new LegacyAgentSession(deps);
await session.send({
message: [{ type: 'text', text: 'do something' }],
});
const events = await collectEvents(session);
const streamEnd = events.find(
(e): e is AgentEvent<'stream_end'> => e.type === 'stream_end',
);
expect(streamEnd?.reason).toBe('completed');
// Should NOT make a second call
expect(sendMock).toHaveBeenCalledTimes(1);
});
});
describe('stream - terminal events', () => {
it('handles AgentExecutionStopped', async () => {
const sendMock = deps.client.sendMessageStream as ReturnType<
typeof vi.fn
>;
sendMock.mockReturnValue(
makeStream([
{
type: GeminiEventType.AgentExecutionStopped,
value: { reason: 'hook', systemMessage: 'Halted by hook' },
},
]),
);
const session = new LegacyAgentSession(deps);
await session.send({ message: [{ type: 'text', text: 'hi' }] });
const events = await collectEvents(session);
const streamEnd = events.find(
(e): e is AgentEvent<'stream_end'> => e.type === 'stream_end',
);
expect(streamEnd?.reason).toBe('completed');
expect(streamEnd?.data).toEqual({ message: 'Halted by hook' });
});
it('handles AgentExecutionBlocked as non-terminal and continues the stream', async () => {
const sendMock = deps.client.sendMessageStream as ReturnType<
typeof vi.fn
>;
sendMock.mockReturnValue(
makeStream([
{
type: GeminiEventType.AgentExecutionBlocked,
value: { reason: 'Blocked by hook' },
},
{ type: GeminiEventType.Content, value: 'Final answer' },
{
type: GeminiEventType.Finished,
value: { reason: FinishReason.STOP, usageMetadata: undefined },
},
]),
);
const session = new LegacyAgentSession(deps);
await session.send({ message: [{ type: 'text', text: 'hi' }] });
const events = await collectEvents(session);
const blocked = events.find(
(e): e is AgentEvent<'error'> =>
e.type === 'error' && e._meta?.['code'] === 'AGENT_EXECUTION_BLOCKED',
);
expect(blocked?.fatal).toBe(false);
expect(blocked?.message).toBe('Agent execution blocked: Blocked by hook');
const messages = events.filter(
(e): e is AgentEvent<'message'> =>
e.type === 'message' && e.role === 'agent',
);
expect(
messages.some(
(message) =>
message.content[0]?.type === 'text' &&
message.content[0].text === 'Final answer',
),
).toBe(true);
const streamEnd = events.find(
(e): e is AgentEvent<'stream_end'> => e.type === 'stream_end',
);
expect(streamEnd?.reason).toBe('completed');
});
it('handles Error events', async () => {
const sendMock = deps.client.sendMessageStream as ReturnType<
typeof vi.fn
>;
sendMock.mockReturnValue(
makeStream([
{
type: GeminiEventType.Error,
value: { error: new Error('API error') },
},
]),
);
const session = new LegacyAgentSession(deps);
await session.send({ message: [{ type: 'text', text: 'hi' }] });
const events = await collectEvents(session);
const err = events.find(
(e): e is AgentEvent<'error'> => e.type === 'error',
);
expect(err?.message).toBe('API error');
expect(events.some((e) => e.type === 'stream_end')).toBe(true);
});
it('handles LoopDetected as non-terminal custom event', async () => {
const sendMock = deps.client.sendMessageStream as ReturnType<
typeof vi.fn
>;
// LoopDetected followed by more content — stream continues
sendMock.mockReturnValue(
makeStream([
{ type: GeminiEventType.LoopDetected },
{ type: GeminiEventType.Content, value: 'continuing after loop' },
{
type: GeminiEventType.Finished,
value: { reason: FinishReason.STOP, usageMetadata: undefined },
},
]),
);
const session = new LegacyAgentSession(deps);
await session.send({ message: [{ type: 'text', text: 'hi' }] });
const events = await collectEvents(session);
// Should have a custom loop_detected event
const custom = events.find(
(e): e is AgentEvent<'custom'> =>
e.type === 'custom' && e.kind === 'loop_detected',
);
expect(custom).toBeDefined();
// Stream should have continued — content after loop detected
const messages = events.filter(
(e): e is AgentEvent<'message'> =>
e.type === 'message' && e.role === 'agent',
);
expect(
messages.some(
(m) =>
m.content[0]?.type === 'text' &&
m.content[0].text === 'continuing after loop',
),
).toBe(true);
// Should still end with stream_end completed
const streamEnd = events.find(
(e): e is AgentEvent<'stream_end'> => e.type === 'stream_end',
);
expect(streamEnd?.reason).toBe('completed');
});
});
describe('stream - max turns', () => {
it('emits stream_end with max_turns when the session turn limit is exceeded', async () => {
const configMock = deps.config.getMaxSessionTurns as ReturnType<
typeof vi.fn
>;
configMock.mockReturnValue(0);
const sendMock = deps.client.sendMessageStream as ReturnType<
typeof vi.fn
>;
sendMock.mockReturnValue(
makeStream([
{ type: GeminiEventType.Content, value: 'should not be reached' },
]),
);
const session = new LegacyAgentSession(deps);
await session.send({ message: [{ type: 'text', text: 'hi' }] });
const events = await collectEvents(session);
const streamEnd = events.find(
(e): e is AgentEvent<'stream_end'> => e.type === 'stream_end',
);
expect(streamEnd?.reason).toBe('max_turns');
expect(streamEnd?.data).toEqual({
code: 'MAX_TURNS_EXCEEDED',
maxTurns: 0,
turnCount: 0,
});
expect(sendMock).not.toHaveBeenCalled();
});
it('treats GeminiClient MaxSessionTurns as a non-terminal warning event', async () => {
const sendMock = deps.client.sendMessageStream as ReturnType<
typeof vi.fn
>;
sendMock.mockReturnValue(
makeStream([
{ type: GeminiEventType.MaxSessionTurns },
{
type: GeminiEventType.Finished,
value: { reason: FinishReason.STOP, usageMetadata: undefined },
},
]),
);
const session = new LegacyAgentSession(deps);
await session.send({ message: [{ type: 'text', text: 'hi' }] });
const events = await collectEvents(session);
const warning = events.find(
(e): e is AgentEvent<'error'> =>
e.type === 'error' && e._meta?.['code'] === 'MAX_TURNS_EXCEEDED',
);
expect(warning?.fatal).toBe(false);
const streamEnds = events.filter(
(e): e is AgentEvent<'stream_end'> => e.type === 'stream_end',
);
const streamEnd = streamEnds[streamEnds.length - 1];
expect(streamEnd?.reason).toBe('completed');
});
});
describe('abort', () => {
it('aborts the stream', async () => {
const sendMock = deps.client.sendMessageStream as ReturnType<
typeof vi.fn
>;
// Stream that yields content then checks abort signal via a deferred
let resolveHang: (() => void) | undefined;
sendMock.mockReturnValue(
(async function* () {
yield {
type: GeminiEventType.Content,
value: 'start',
} as ServerGeminiStreamEvent;
// Wait until externally resolved (by abort)
await new Promise<void>((resolve) => {
resolveHang = resolve;
});
yield {
type: GeminiEventType.Finished,
value: { reason: FinishReason.STOP, usageMetadata: undefined },
} as ServerGeminiStreamEvent;
})(),
);
const session = new LegacyAgentSession(deps);
await session.send({ message: [{ type: 'text', text: 'hi' }] });
// Give the loop time to start processing
await new Promise((r) => setTimeout(r, 50));
// Abort and resolve the hang so the generator can finish
await session.abort();
resolveHang?.();
// Collect all events
const events = await collectEvents(session);
const streamEnd = events.find(
(e): e is AgentEvent<'stream_end'> => e.type === 'stream_end',
);
expect(streamEnd?.reason).toBe('aborted');
});
});
describe('events property', () => {
it('accumulates all events', async () => {
const sendMock = deps.client.sendMessageStream as ReturnType<
typeof vi.fn
>;
sendMock.mockReturnValue(
makeStream([
{ type: GeminiEventType.Content, value: 'hi' },
{
type: GeminiEventType.Finished,
value: { reason: FinishReason.STOP, usageMetadata: undefined },
},
]),
);
const session = new LegacyAgentSession(deps);
await session.send({ message: [{ type: 'text', text: 'hi' }] });
await collectEvents(session);
expect(session.events.length).toBeGreaterThan(0);
expect(session.events[0]?.type).toBe('stream_start');
});
});
describe('stream_end ordering', () => {
it('stream_end is always the final event yielded', async () => {
const sendMock = deps.client.sendMessageStream as ReturnType<
typeof vi.fn
>;
sendMock.mockReturnValue(
makeStream([
{ type: GeminiEventType.Content, value: 'Hello' },
{
type: GeminiEventType.Finished,
value: { reason: FinishReason.STOP, usageMetadata: undefined },
},
]),
);
const session = new LegacyAgentSession(deps);
await session.send({ message: [{ type: 'text', text: 'hi' }] });
const events = await collectEvents(session);
expect(events.length).toBeGreaterThan(0);
expect(events[events.length - 1]?.type).toBe('stream_end');
});
it('stream_end is final even after error events', async () => {
const sendMock = deps.client.sendMessageStream as ReturnType<
typeof vi.fn
>;
sendMock.mockReturnValue(
makeStream([
{
type: GeminiEventType.Error,
value: { error: new Error('API error') },
},
]),
);
const session = new LegacyAgentSession(deps);
await session.send({ message: [{ type: 'text', text: 'hi' }] });
const events = await collectEvents(session);
expect(events[events.length - 1]?.type).toBe('stream_end');
});
});
describe('intermediate Finished events', () => {
it('does NOT emit stream_end when tool calls are pending', async () => {
const sendMock = deps.client.sendMessageStream as ReturnType<
typeof vi.fn
>;
// First turn: tool request + Finished (should NOT produce stream_end)
sendMock.mockReturnValueOnce(
makeStream([
{
type: GeminiEventType.ToolCallRequest,
value: makeToolRequest('call-1', 'read_file'),
},
{
type: GeminiEventType.Finished,
value: {
reason: FinishReason.STOP,
usageMetadata: {
promptTokenCount: 50,
candidatesTokenCount: 20,
},
},
},
]),
);
// Second turn: final answer
sendMock.mockReturnValueOnce(
makeStream([
{ type: GeminiEventType.Content, value: 'Answer' },
{
type: GeminiEventType.Finished,
value: { reason: FinishReason.STOP, usageMetadata: undefined },
},
]),
);
const scheduleMock = deps.scheduler.schedule as ReturnType<typeof vi.fn>;
scheduleMock.mockResolvedValueOnce([
makeCompletedToolCall('call-1', 'read_file', 'data'),
]);
const session = new LegacyAgentSession(deps);
await session.send({ message: [{ type: 'text', text: 'do it' }] });
const events = await collectEvents(session);
// Only one stream_end at the very end
const streamEnds = events.filter((e) => e.type === 'stream_end');
expect(streamEnds).toHaveLength(1);
expect(streamEnds[0]).toBe(events[events.length - 1]);
});
it('emits usage for intermediate Finished events', async () => {
const sendMock = deps.client.sendMessageStream as ReturnType<
typeof vi.fn
>;
sendMock.mockReturnValueOnce(
makeStream([
{
type: GeminiEventType.ToolCallRequest,
value: makeToolRequest('call-1', 'read_file'),
},
{
type: GeminiEventType.Finished,
value: {
reason: FinishReason.STOP,
usageMetadata: {
promptTokenCount: 100,
candidatesTokenCount: 30,
},
},
},
]),
);
sendMock.mockReturnValueOnce(
makeStream([
{ type: GeminiEventType.Content, value: 'Done' },
{
type: GeminiEventType.Finished,
value: { reason: FinishReason.STOP, usageMetadata: undefined },
},
]),
);
const scheduleMock = deps.scheduler.schedule as ReturnType<typeof vi.fn>;
scheduleMock.mockResolvedValueOnce([
makeCompletedToolCall('call-1', 'read_file', 'contents'),
]);
const session = new LegacyAgentSession(deps);
await session.send({ message: [{ type: 'text', text: 'go' }] });
const events = await collectEvents(session);
// Should have at least one usage event from the intermediate Finished
const usageEvents = events.filter(
(e): e is AgentEvent<'usage'> => e.type === 'usage',
);
expect(usageEvents.length).toBeGreaterThanOrEqual(1);
expect(usageEvents[0]?.inputTokens).toBe(100);
expect(usageEvents[0]?.outputTokens).toBe(30);
});
});
describe('error handling in runLoop', () => {
it('catches thrown errors and emits error + stream_end', async () => {
const sendMock = deps.client.sendMessageStream as ReturnType<
typeof vi.fn
>;
sendMock.mockImplementation(() => {
throw new Error('Connection refused');
});
const session = new LegacyAgentSession(deps);
await session.send({ message: [{ type: 'text', text: 'hi' }] });
const events = await collectEvents(session);
const err = events.find(
(e): e is AgentEvent<'error'> => e.type === 'error',
);
expect(err?.message).toBe('Connection refused');
expect(err?.fatal).toBe(true);
const streamEnd = events.find(
(e): e is AgentEvent<'stream_end'> => e.type === 'stream_end',
);
expect(streamEnd?.reason).toBe('failed');
});
});
describe('_emitErrorAndStreamEnd metadata', () => {
it('preserves exitCode and code in _meta for FatalError', async () => {
const sendMock = deps.client.sendMessageStream as ReturnType<
typeof vi.fn
>;
// Simulate a FatalError being thrown
const { FatalError } = await import('../utils/errors.js');
sendMock.mockImplementation(() => {
throw new FatalError('Disk full', 44);
});
const session = new LegacyAgentSession(deps);
await session.send({ message: [{ type: 'text', text: 'hi' }] });
const events = await collectEvents(session);
const err = events.find(
(e): e is AgentEvent<'error'> => e.type === 'error',
);
expect(err?.message).toBe('Disk full');
expect(err?.fatal).toBe(true);
expect(err?._meta?.['exitCode']).toBe(44);
expect(err?._meta?.['errorName']).toBe('FatalError');
});
it('preserves exitCode for non-FatalError errors that carry one', async () => {
const sendMock = deps.client.sendMessageStream as ReturnType<
typeof vi.fn
>;
const exitCodeError = new Error('custom exit');
(exitCodeError as Error & { exitCode: number }).exitCode = 17;
sendMock.mockImplementation(() => {
throw exitCodeError;
});
const session = new LegacyAgentSession(deps);
await session.send({ message: [{ type: 'text', text: 'hi' }] });
const events = await collectEvents(session);
const err = events.find(
(e): e is AgentEvent<'error'> => e.type === 'error',
);
expect(err?._meta?.['exitCode']).toBe(17);
});
it('preserves code in _meta for errors with code property', async () => {
const sendMock = deps.client.sendMessageStream as ReturnType<
typeof vi.fn
>;
const codedError = new Error('ENOENT');
(codedError as Error & { code: string }).code = 'ENOENT';
sendMock.mockImplementation(() => {
throw codedError;
});
const session = new LegacyAgentSession(deps);
await session.send({ message: [{ type: 'text', text: 'hi' }] });
const events = await collectEvents(session);
const err = events.find(
(e): e is AgentEvent<'error'> => e.type === 'error',
);
expect(err?._meta?.['code']).toBe('ENOENT');
});
});
});
@@ -0,0 +1,461 @@
/**
* @license
* Copyright 2026 Google LLC
* SPDX-License-Identifier: Apache-2.0
*/
/**
* @fileoverview LegacyAgentSession owns the agentic loop (send + tool
* scheduling + multi-turn), translating all events to AgentEvents.
*/
import { GeminiEventType } from '../core/turn.js';
import type { GeminiClient } from '../core/client.js';
import type { Scheduler } from '../scheduler/scheduler.js';
import type { Config } from '../config/config.js';
import type { ToolCallRequestInfo } from '../scheduler/types.js';
import { ToolErrorType, isFatalToolError } from '../tools/tool-error.js';
import { recordToolCallInteractions } from '../code_assist/telemetry.js';
import { debugLogger } from '../utils/debugLogger.js';
import {
translateEvent,
createTranslationState,
type TranslationState,
} from './event-translator.js';
import {
geminiPartsToContentParts,
contentPartsToGeminiParts,
toolResultDisplayToContentParts,
buildToolResponseData,
} from './content-utils.js';
import type {
AgentEvent,
AgentSession,
AgentSend,
ContentPart,
StreamEndReason,
} from './types.js';
export interface LegacySessionDeps {
client: GeminiClient;
scheduler: Scheduler;
config: Config;
promptId: string;
streamId?: string;
}
// ---------------------------------------------------------------------------
// LegacyAgentSession
// ---------------------------------------------------------------------------
export class LegacyAgentSession implements AgentSession {
private _events: AgentEvent[] = [];
private _translationState: TranslationState;
private _subscribers: Set<() => void> = new Set();
private _streamDone: boolean = false;
private _streamEndEmitted: boolean = false;
private _abortController: AbortController = new AbortController();
private readonly _client: GeminiClient;
private readonly _scheduler: Scheduler;
private readonly _config: Config;
private readonly _promptId: string;
constructor(deps: LegacySessionDeps) {
this._translationState = createTranslationState(deps.streamId);
this._client = deps.client;
this._scheduler = deps.scheduler;
this._config = deps.config;
this._promptId = deps.promptId;
}
// ---------------------------------------------------------------------------
// AgentSession interface
// ---------------------------------------------------------------------------
async send(payload: AgentSend): Promise<{ streamId: string }> {
const message = 'message' in payload ? payload.message : undefined;
if (!message) {
throw new Error('LegacyAgentSession.send() only supports message sends.');
}
const parts = contentPartsToGeminiParts(message);
// Start the loop in the background — don't await
this._runLoop(parts).catch((err: unknown) => {
this._emitErrorAndStreamEnd(err);
});
return { streamId: this._translationState.streamId };
}
/**
* Returns an async iterator that replays existing events, then live-follows
* new events as they arrive. Terminates after yielding a stream_end event,
* consistent with MockAgentSession behavior.
*/
async *stream(options?: {
streamId?: string;
eventId?: string;
}): AsyncIterableIterator<AgentEvent> {
let startIndex = 0;
if (options?.eventId) {
const idx = this._events.findIndex((e) => e.id === options.eventId);
if (idx !== -1) {
startIndex = idx + 1;
}
}
// Replay existing events
for (let i = startIndex; i < this._events.length; i++) {
const event = this._events[i];
if (event) {
yield event;
if (event.type === 'stream_end') return;
}
}
if (this._streamDone) return;
// Live-follow new events. Drain any buffered events after each wake-up,
// even if _streamDone was set between the notification and resumption.
let replayedUpTo = this._events.length;
while (true) {
// Wait for new events or stream completion
if (replayedUpTo >= this._events.length && !this._streamDone) {
await new Promise<void>((resolve) => {
if (this._events.length > replayedUpTo || this._streamDone) {
resolve();
return;
}
const handler = (): void => {
this._subscribers.delete(handler);
resolve();
};
this._subscribers.add(handler);
});
}
// Always drain buffered events before checking _streamDone
while (replayedUpTo < this._events.length) {
const event = this._events[replayedUpTo];
replayedUpTo++;
if (event) {
yield event;
if (event.type === 'stream_end') return;
}
}
// Exit only after draining
if (this._streamDone) return;
}
}
async abort(): Promise<void> {
this._abortController.abort();
}
get events(): AgentEvent[] {
return this._events;
}
// ---------------------------------------------------------------------------
// Core: agentic loop
// ---------------------------------------------------------------------------
private async _runLoop(initialParts: Part[]): Promise<void> {
let currentParts: Part[] = initialParts;
let turnCount = 0;
const maxTurns = this._config.getMaxSessionTurns();
try {
while (true) {
turnCount++;
if (maxTurns >= 0 && turnCount > maxTurns) {
this._ensureStreamStart();
this._appendAndNotify([
this._makeInternalEvent('stream_end', {
streamId: this._translationState.streamId,
reason: 'max_turns',
data: {
code: 'MAX_TURNS_EXCEEDED',
maxTurns,
turnCount: turnCount - 1,
},
}),
]);
this._markStreamDone();
return;
}
const toolCallRequests: ToolCallRequestInfo[] = [];
const responseStream = this._client.sendMessageStream(
currentParts,
this._abortController.signal,
this._promptId,
);
// Process the stream — translate events and collect tool requests
for await (const event of responseStream) {
if (this._abortController.signal.aborted) {
this._ensureStreamStart();
this._appendAndNotify([
this._makeInternalEvent('stream_end', {
streamId: this._translationState.streamId,
reason: 'aborted',
}),
]);
this._markStreamDone();
return;
}
// Collect tool call requests BEFORE translating so we can
// decide whether to suppress the Finished event's stream_end.
if (event.type === GeminiEventType.ToolCallRequest) {
toolCallRequests.push(event.value);
}
// Translate to AgentEvents
const agentEvents = translateEvent(event, this._translationState);
// Finished events don't mean the session is done — if there are
// pending tool calls, more turns are coming. Suppress stream_end
// from the Finished event in that case (keep usage events).
if (
event.type === GeminiEventType.Finished &&
toolCallRequests.length > 0
) {
const filtered = agentEvents.filter((e) => e.type !== 'stream_end');
this._appendAndNotify(filtered);
} else {
this._appendAndNotify(agentEvents);
}
// Error events → abort the loop
if (event.type === GeminiEventType.Error) {
this._ensureStreamEnd('failed');
this._markStreamDone();
return;
}
// Fatal error events that translator doesn't emit stream_end for
if (
event.type === GeminiEventType.InvalidStream ||
event.type === GeminiEventType.ContextWindowWillOverflow
) {
this._ensureStreamEnd('failed');
this._markStreamDone();
return;
}
// Terminal events — translator already emitted stream_end
if (
event.type === GeminiEventType.AgentExecutionStopped ||
event.type === GeminiEventType.UserCancelled
) {
this._markStreamDone();
return;
}
// LoopDetected is NOT terminal — the stream continues.
// Consumer handles it (warning in non-interactive, dialog in interactive).
}
if (toolCallRequests.length === 0) {
this._ensureStreamEnd('completed');
this._markStreamDone();
return;
}
// Schedule tool calls
const completedToolCalls = await this._scheduler.schedule(
toolCallRequests,
this._abortController.signal,
);
// Emit tool_response AgentEvents for each completed tool call
const toolResponseParts: Part[] = [];
for (const tc of completedToolCalls) {
const response = tc.response;
const request = tc.request;
const content: ContentPart[] = response.error
? [{ type: 'text', text: response.error.message }]
: geminiPartsToContentParts(response.responseParts);
const displayContent = toolResultDisplayToContentParts(
response.resultDisplay,
);
const data = buildToolResponseData(response);
this._appendAndNotify([
this._makeInternalEvent('tool_response', {
requestId: request.callId,
name: request.name,
content,
isError: response.error !== undefined,
...(displayContent ? { displayContent } : {}),
...(data ? { data } : {}),
}),
]);
if (response.responseParts) {
toolResponseParts.push(...response.responseParts);
}
}
// Record tool calls in chat history
try {
const currentModel =
this._client.getCurrentSequenceModel() ?? this._config.getModel();
this._client
.getChat()
.recordCompletedToolCalls(currentModel, completedToolCalls);
await recordToolCallInteractions(this._config, completedToolCalls);
} catch (error) {
debugLogger.error(
`Error recording completed tool call information: ${error}`,
);
}
// Check if a tool requested stop execution
const stopTool = completedToolCalls.find(
(tc) =>
tc.response.errorType === ToolErrorType.STOP_EXECUTION &&
tc.response.error !== undefined,
);
if (stopTool) {
this._ensureStreamEnd('completed');
this._markStreamDone();
return;
}
// Check for fatal tool errors
const fatalTool = completedToolCalls.find((tc) =>
isFatalToolError(tc.response.errorType),
);
if (fatalTool) {
const msg = fatalTool.response.error?.message ?? 'Fatal tool error';
this._appendAndNotify([
this._makeInternalEvent('error', {
status: 'INTERNAL',
message: `Fatal tool error (${fatalTool.request.name}): ${msg}`,
fatal: true,
}),
]);
this._ensureStreamEnd('failed');
this._markStreamDone();
return;
}
// Feed tool results back for next turn
currentParts = toolResponseParts;
}
} catch (err: unknown) {
this._emitErrorAndStreamEnd(err);
this._markStreamDone();
}
}
// ---------------------------------------------------------------------------
// Internal helpers
// ---------------------------------------------------------------------------
/** Sets _streamDone and notifies subscribers so the stream iterator can exit. */
private _markStreamDone(): void {
this._streamDone = true;
this._notifySubscribers();
}
private _appendAndNotify(events: AgentEvent[]): void {
for (const event of events) {
this._events.push(event);
if (event.type === 'stream_end') {
this._streamEndEmitted = true;
}
}
if (events.length > 0) {
this._notifySubscribers();
}
}
private _notifySubscribers(): void {
for (const handler of this._subscribers) {
handler();
}
}
private _ensureStreamStart(): void {
if (!this._translationState.streamStartEmitted) {
const startEvent = this._makeInternalEvent('stream_start', {
streamId: this._translationState.streamId,
});
this._events.push(startEvent);
this._translationState.streamStartEmitted = true;
this._notifySubscribers();
}
}
private _ensureStreamEnd(reason: StreamEndReason = 'completed'): void {
if (!this._streamEndEmitted && this._translationState.streamStartEmitted) {
this._streamEndEmitted = true;
const endEvent = this._makeInternalEvent('stream_end', {
streamId: this._translationState.streamId,
reason,
});
this._events.push(endEvent);
this._notifySubscribers();
}
}
/**
* Review fix #4: Preserves error metadata (name, exitCode, stack) in _meta
* so downstream consumers can reconstruct proper error types.
*/
private _emitErrorAndStreamEnd(err: unknown): void {
const message = err instanceof Error ? err.message : String(err);
this._ensureStreamStart();
const meta: Record<string, unknown> = {};
if (err instanceof Error) {
meta['errorName'] = err.constructor.name;
if ('exitCode' in err && typeof err.exitCode === 'number') {
meta['exitCode'] = err.exitCode;
}
if ('code' in err) {
meta['code'] = err.code;
}
}
const errorEvent = this._makeInternalEvent('error', {
status: 'INTERNAL' as const,
message,
fatal: true,
...(Object.keys(meta).length > 0 ? { _meta: meta } : {}),
});
this._events.push(errorEvent);
this._ensureStreamEnd('failed');
this._notifySubscribers();
}
private _makeInternalEvent(
type: AgentEvent['type'],
payload: Partial<AgentEvent>,
): AgentEvent {
const id = `${this._translationState.streamId}-${this._translationState.eventCounter++}`;
// eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion -- constructing AgentEvent from common fields + payload
return {
...payload,
id,
timestamp: new Date().toISOString(),
streamId: this._translationState.streamId,
type,
} as AgentEvent;
}
}
// Re-export Part type alias for internal use (avoids importing @google/genai directly)
type Part = import('@google/genai').Part;
+9 -2
View File
@@ -79,9 +79,16 @@ export type AgentEventData<
EventType extends keyof AgentEvents = keyof AgentEvents,
> = AgentEvents[EventType] & { type: EventType };
/**
* Mapped type that produces a proper discriminated union when `EventType` is
* the default (all keys), enabling `switch (event.type)` narrowing.
* When a specific EventType is provided, resolves to a single variant.
*/
export type AgentEvent<
EventType extends keyof AgentEvents = keyof AgentEvents,
> = AgentEventCommon & AgentEventData<EventType>;
> = {
[K in EventType]: AgentEventCommon & AgentEvents[K] & { type: K };
}[EventType];
export interface AgentEvents {
/** MUST be the first event emitted in a session. */
@@ -261,7 +268,7 @@ export interface StreamStart {
streamId: string;
}
type StreamEndReason =
export type StreamEndReason =
| 'completed'
| 'failed'
| 'aborted'
+22
View File
@@ -179,6 +179,28 @@ export * from './agents/agentLoader.js';
export * from './agents/local-executor.js';
export * from './agents/agent-scheduler.js';
// Export agent session interface
export * from './agent/legacy-agent-session.js';
export * from './agent/event-translator.js';
export * from './agent/content-utils.js';
// Agent event types — namespaced to avoid collisions with existing exports
export type {
AgentEvent,
AgentEventCommon,
AgentEventData,
AgentEvents as AgentEventMap,
AgentSend,
AgentSession,
ContentPart,
ErrorData,
StreamEnd,
StreamEndReason,
StreamStart,
Trajectory,
Usage as AgentUsage,
WithMeta,
} from './agent/types.js';
// Export specific tool logic
export * from './tools/read-file.js';
export * from './tools/ls.js';