diff --git a/packages/cli/src/nonInteractiveCli.test.ts b/packages/cli/src/nonInteractiveCli.test.ts index 206d011e63..329aca7138 100644 --- a/packages/cli/src/nonInteractiveCli.test.ts +++ b/packages/cli/src/nonInteractiveCli.test.ts @@ -58,6 +58,12 @@ const mockSchedulerSchedule = vi.hoisted(() => vi.fn()); vi.mock('@google/gemini-cli-core', async (importOriginal) => { const original = await importOriginal(); + const { LegacyAgentSession } = await import( + '../../core/src/agent/legacy-agent-session.js' + ); + const { geminiPartsToContentParts } = await import( + '../../core/src/agent/content-utils.js' + ); class MockChatRecordingService { initialize = vi.fn(); @@ -77,6 +83,8 @@ vi.mock('@google/gemini-cli-core', async (importOriginal) => { uiTelemetryService: { getMetrics: vi.fn(), }, + LegacyAgentSession, + geminiPartsToContentParts, coreEvents: mockCoreEvents, createWorkingStdio: vi.fn(() => ({ stdout: process.stdout, @@ -108,6 +116,8 @@ describe('runNonInteractive', () => { sendMessageStream: Mock; resumeChat: Mock; getChatRecordingService: Mock; + getChat: Mock; + getCurrentSequenceModel: Mock; }; const MOCK_SESSION_METRICS: SessionMetrics = { models: {}, @@ -163,6 +173,8 @@ describe('runNonInteractive', () => { recordMessageTokens: vi.fn(), recordToolCalls: vi.fn(), })), + getChat: vi.fn(() => ({ recordCompletedToolCalls: vi.fn() })), + getCurrentSequenceModel: vi.fn().mockReturnValue(null), }; mockConfig = { @@ -259,9 +271,6 @@ describe('runNonInteractive', () => { [{ text: 'Test input' }], expect.any(AbortSignal), 'prompt-id-1', - undefined, - false, - 'Test input', ); expect(getWrittenOutput()).toBe('Hello World\n'); // Note: Telemetry shutdown is now handled in runExitCleanup() in cleanup.ts @@ -378,9 +387,6 @@ describe('runNonInteractive', () => { [{ text: 'Tool response' }], expect.any(AbortSignal), 'prompt-id-2', - undefined, - false, - undefined, ); expect(getWrittenOutput()).toBe('Final answer\n'); }); @@ -538,9 +544,6 @@ describe('runNonInteractive', () => { ], expect.any(AbortSignal), 'prompt-id-3', - undefined, - false, - undefined, ); expect(getWrittenOutput()).toBe('Sorry, let me try again.\n'); }); @@ -558,7 +561,7 @@ describe('runNonInteractive', () => { input: 'Initial fail', prompt_id: 'prompt-id-4', }), - ).rejects.toThrow(apiError); + ).rejects.toThrow('API connection failed'); }); it('should not exit if a tool is not found, and should send error back to model', async () => { @@ -680,9 +683,6 @@ describe('runNonInteractive', () => { processedParts, expect.any(AbortSignal), 'prompt-id-7', - undefined, - false, - rawInput, ); // 6. Assert the final output is correct @@ -716,9 +716,6 @@ describe('runNonInteractive', () => { [{ text: 'Test input' }], expect.any(AbortSignal), 'prompt-id-1', - undefined, - false, - 'Test input', ); expect(processStdoutSpy).toHaveBeenCalledWith( JSON.stringify( @@ -849,9 +846,6 @@ describe('runNonInteractive', () => { [{ text: 'Empty response test' }], expect.any(AbortSignal), 'prompt-id-empty', - undefined, - false, - 'Empty response test', ); // This should output JSON with empty response but include stats @@ -941,7 +935,7 @@ describe('runNonInteractive', () => { { session_id: 'test-session-id', error: { - type: 'FatalInputError', + type: 'Error', message: 'Invalid command syntax provided', code: 42, }, @@ -986,9 +980,6 @@ describe('runNonInteractive', () => { [{ text: 'Prompt from command' }], expect.any(AbortSignal), 'prompt-id-slash', - undefined, - false, - '/testcommand', ); expect(getWrittenOutput()).toBe('Response from command\n'); @@ -1032,9 +1023,6 @@ describe('runNonInteractive', () => { [{ text: 'Slash command output' }], expect.any(AbortSignal), 'prompt-id-slash', - undefined, - false, - '/help', ); expect(getWrittenOutput()).toBe('Response to slash command\n'); handleSlashCommandSpy.mockRestore(); @@ -1209,9 +1197,6 @@ describe('runNonInteractive', () => { [{ text: '/unknowncommand' }], expect.any(AbortSignal), 'prompt-id-unknown', - undefined, - false, - '/unknowncommand', ); expect(getWrittenOutput()).toBe('Response to unknown\n'); @@ -1776,15 +1761,13 @@ describe('runNonInteractive', () => { throw new Error('Recording failed'); }), }; - // @ts-expect-error - Mocking internal structure mockGeminiClient.getChat = vi.fn().mockReturnValue(mockChat); - // @ts-expect-error - Mocking internal structure mockGeminiClient.getCurrentSequenceModel = vi .fn() .mockReturnValue('model-1'); // Mock debugLogger.error - const { debugLogger } = await import('@google/gemini-cli-core'); + const { debugLogger } = await import('../../core/src/utils/debugLogger.js'); const debugLoggerErrorSpy = vi .spyOn(debugLogger, 'error') .mockImplementation(() => {}); @@ -1999,7 +1982,6 @@ describe('runNonInteractive', () => { expect(processStderrSpy).toHaveBeenCalledWith( 'Agent execution stopped: Stopped by hook\n', ); - // Should exit without calling sendMessageStream again expect(mockGeminiClient.sendMessageStream).toHaveBeenCalledTimes(1); }); @@ -2030,9 +2012,9 @@ describe('runNonInteractive', () => { expect(processStderrSpy).toHaveBeenCalledWith( '[WARNING] Agent execution blocked: Blocked by hook\n', ); - // sendMessageStream is called once, recursion is internal to it and transparent to the caller - expect(mockGeminiClient.sendMessageStream).toHaveBeenCalledTimes(1); + // Stream continues after blocked event — content should be output expect(getWrittenOutput()).toBe('Final answer\n'); + expect(mockGeminiClient.sendMessageStream).toHaveBeenCalledTimes(1); }); }); @@ -2173,6 +2155,40 @@ describe('runNonInteractive', () => { ); }); + it('should emit warning event for loop_detected custom event in streaming JSON mode', async () => { + vi.mocked(mockConfig.getOutputFormat).mockReturnValue( + OutputFormat.STREAM_JSON, + ); + vi.mocked(uiTelemetryService.getMetrics).mockReturnValue( + MOCK_SESSION_METRICS, + ); + + const streamEvents: ServerGeminiStreamEvent[] = [ + { type: GeminiEventType.LoopDetected } as ServerGeminiStreamEvent, + { type: GeminiEventType.Content, value: 'Continuing after loop' }, + { + type: GeminiEventType.Finished, + value: { reason: undefined, usageMetadata: { totalTokenCount: 5 } }, + }, + ]; + mockGeminiClient.sendMessageStream.mockReturnValue( + createStreamFromEvents(streamEvents), + ); + + await runNonInteractive({ + config: mockConfig, + settings: mockSettings, + input: 'Loop test explicit', + prompt_id: 'prompt-id-loop-explicit', + }); + + const output = getWrittenOutput(); + // The STREAM_JSON output should contain an error event with warning severity + expect(output).toContain('"type":"error"'); + expect(output).toContain('"severity":"warning"'); + expect(output).toContain('Loop detected'); + }); + it('should report cancelled tool calls as success in stream-json mode (legacy parity)', async () => { const toolCallEvent: ServerGeminiStreamEvent = { type: GeminiEventType.ToolCallRequest, diff --git a/packages/cli/src/nonInteractiveCli.ts b/packages/cli/src/nonInteractiveCli.ts index 891e3d0ee9..72071d1c2d 100644 --- a/packages/cli/src/nonInteractiveCli.ts +++ b/packages/cli/src/nonInteractiveCli.ts @@ -6,15 +6,15 @@ import type { Config, - ToolCallRequestInfo, ResumedSessionData, UserFeedbackPayload, + AgentEvent, + ContentPart, } from '@google/gemini-cli-core'; import { isSlashCommand } from './ui/utils/commandUtils.js'; import type { LoadedSettings } from './config/settings.js'; import { convertSessionToClientHistory, - GeminiEventType, FatalInputError, promptIdContext, OutputFormat, @@ -22,17 +22,17 @@ import { StreamJsonFormatter, JsonStreamEventType, uiTelemetryService, - debugLogger, coreEvents, CoreEvent, createWorkingStdio, - recordToolCallInteractions, - ToolErrorType, Scheduler, ROOT_SCHEDULER_ID, + LegacyAgentSession, + ToolErrorType, + geminiPartsToContentParts, } from '@google/gemini-cli-core'; -import type { Content, Part } from '@google/genai'; +import type { Part } from '@google/genai'; import readline from 'node:readline'; import stripAnsi from 'strip-ansi'; @@ -150,8 +150,6 @@ export async function runNonInteractive({ }, 200); abortController.abort(); - // Note: Don't exit here - let the abort flow through the system - // and trigger handleCancellationError() which will exit with proper code } }; @@ -246,9 +244,6 @@ export async function runNonInteractive({ config, settings, ); - // If a slash command is found and returns a prompt, use it. - // Otherwise, slashCommandResult falls through to the default prompt - // handling. if (slashCommandResult) { // eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion query = slashCommandResult as Part[]; @@ -266,8 +261,6 @@ export async function runNonInteractive({ escapePastedAtSymbols: false, }); if (error || !processedQuery) { - // An error occurred during @include processing (e.g., file not found). - // The error message is already logged by handleAtCommand. throw new FatalInputError( error || 'Exiting due to an error processing the @ command.', ); @@ -286,235 +279,239 @@ export async function runNonInteractive({ }); } - let currentMessages: Content[] = [{ role: 'user', parts: query }]; + // Create LegacyAgentSession — owns the agentic loop + const session = new LegacyAgentSession({ + client: geminiClient, + scheduler, + config, + promptId: prompt_id, + }); - let turnCount = 0; - while (true) { - turnCount++; - if ( - config.getMaxSessionTurns() >= 0 && - turnCount > config.getMaxSessionTurns() - ) { - handleMaxTurnsExceededError(config); + // Wire Ctrl+C to session abort + abortController.signal.addEventListener('abort', () => { + void session.abort(); + }); + + // Start the agentic loop (runs in background) + await session.send({ + message: geminiPartsToContentParts(query), + }); + + const getFirstText = (parts?: ContentPart[]): string | undefined => { + const part = parts?.[0]; + return part?.type === 'text' ? part.text : undefined; + }; + + const emitFinalSuccessResult = (): void => { + if (streamFormatter) { + const metrics = uiTelemetryService.getMetrics(); + const durationMs = Date.now() - startTime; + streamFormatter.emitEvent({ + type: JsonStreamEventType.RESULT, + timestamp: new Date().toISOString(), + status: 'success', + stats: streamFormatter.convertToStreamStats(metrics, durationMs), + }); + } else if (config.getOutputFormat() === OutputFormat.JSON) { + const formatter = new JsonFormatter(); + const stats = uiTelemetryService.getMetrics(); + textOutput.write( + formatter.format(config.getSessionId(), responseText, stats), + ); + } else { + textOutput.ensureTrailingNewline(); } - const toolCallRequests: ToolCallRequestInfo[] = []; + }; - const responseStream = geminiClient.sendMessageStream( - currentMessages[0]?.parts || [], - abortController.signal, - prompt_id, - undefined, - false, - turnCount === 1 ? input : undefined, - ); + const reconstructFatalError = (event: AgentEvent<'error'>): Error => { + const errToThrow = new Error(event.message); + const errorMeta = event._meta; + if (errorMeta?.['exitCode'] !== undefined) { + Object.defineProperty(errToThrow, 'exitCode', { + value: errorMeta['exitCode'], + enumerable: true, + }); + } + if (errorMeta?.['errorName'] !== undefined) { + Object.defineProperty(errToThrow, 'name', { + value: errorMeta['errorName'], + enumerable: true, + }); + } + if (errorMeta?.['code'] !== undefined) { + Object.defineProperty(errToThrow, 'code', { + value: errorMeta['code'], + enumerable: true, + }); + } + return errToThrow; + }; - let responseText = ''; - for await (const event of responseStream) { - if (abortController.signal.aborted) { - handleCancellationError(config); - } - - if (event.type === GeminiEventType.Content) { - const isRaw = - config.getRawOutput() || config.getAcceptRawOutputRisk(); - const output = isRaw ? event.value : stripAnsi(event.value); - if (streamFormatter) { - streamFormatter.emitEvent({ - type: JsonStreamEventType.MESSAGE, - timestamp: new Date().toISOString(), - role: 'assistant', - content: output, - delta: true, - }); - } else if (config.getOutputFormat() === OutputFormat.JSON) { - responseText += output; - } else { - if (event.value) { - textOutput.write(output); + // Consume AgentEvents for output formatting + let responseText = ''; + let streamEnded = false; + for await (const event of session.stream()) { + if (streamEnded) break; + switch (event.type) { + case 'message': { + if (event.role === 'agent') { + for (const part of event.content) { + if (part.type === 'text') { + const isRaw = + config.getRawOutput() || config.getAcceptRawOutputRisk(); + const output = isRaw ? part.text : stripAnsi(part.text); + if (streamFormatter) { + streamFormatter.emitEvent({ + type: JsonStreamEventType.MESSAGE, + timestamp: new Date().toISOString(), + role: 'assistant', + content: output, + delta: true, + }); + } else if (config.getOutputFormat() === OutputFormat.JSON) { + responseText += output; + } else { + if (part.text) { + textOutput.write(output); + } + } + } } } - } else if (event.type === GeminiEventType.ToolCallRequest) { + break; + } + case 'tool_request': { if (streamFormatter) { streamFormatter.emitEvent({ type: JsonStreamEventType.TOOL_USE, timestamp: new Date().toISOString(), - tool_name: event.value.name, - tool_id: event.value.callId, - parameters: event.value.args, + tool_name: event.name, + tool_id: event.requestId, + parameters: event.args, }); } - toolCallRequests.push(event.value); - } else if (event.type === GeminiEventType.LoopDetected) { - if (streamFormatter) { - streamFormatter.emitEvent({ - type: JsonStreamEventType.ERROR, - timestamp: new Date().toISOString(), - severity: 'warning', - message: 'Loop detected, stopping execution', - }); - } - } else if (event.type === GeminiEventType.MaxSessionTurns) { - if (streamFormatter) { - streamFormatter.emitEvent({ - type: JsonStreamEventType.ERROR, - timestamp: new Date().toISOString(), - severity: 'error', - message: 'Maximum session turns exceeded', - }); - } - } else if (event.type === GeminiEventType.Error) { - throw event.value.error; - } else if (event.type === GeminiEventType.AgentExecutionStopped) { - const stopMessage = `Agent execution stopped: ${event.value.systemMessage?.trim() || event.value.reason}`; - if (config.getOutputFormat() === OutputFormat.TEXT) { - process.stderr.write(`${stopMessage}\n`); - } - // Emit final result event for streaming JSON if needed - if (streamFormatter) { - const metrics = uiTelemetryService.getMetrics(); - const durationMs = Date.now() - startTime; - streamFormatter.emitEvent({ - type: JsonStreamEventType.RESULT, - timestamp: new Date().toISOString(), - status: 'success', - stats: streamFormatter.convertToStreamStats( - metrics, - durationMs, - ), - }); - } - return; - } else if (event.type === GeminiEventType.AgentExecutionBlocked) { - const blockMessage = `Agent execution blocked: ${event.value.systemMessage?.trim() || event.value.reason}`; - if (config.getOutputFormat() === OutputFormat.TEXT) { - process.stderr.write(`[WARNING] ${blockMessage}\n`); - } + break; } - } - - if (toolCallRequests.length > 0) { - textOutput.ensureTrailingNewline(); - const completedToolCalls = await scheduler.schedule( - toolCallRequests, - abortController.signal, - ); - const toolResponseParts: Part[] = []; - - for (const completedToolCall of completedToolCalls) { - const toolResponse = completedToolCall.response; - const requestInfo = completedToolCall.request; - + case 'tool_response': { + textOutput.ensureTrailingNewline(); if (streamFormatter) { + const displayText = getFirstText(event.displayContent); + const errorMsg = getFirstText(event.content) ?? 'Tool error'; streamFormatter.emitEvent({ type: JsonStreamEventType.TOOL_RESULT, timestamp: new Date().toISOString(), - tool_id: requestInfo.callId, - status: - completedToolCall.status === 'error' ? 'error' : 'success', - output: - typeof toolResponse.resultDisplay === 'string' - ? toolResponse.resultDisplay - : undefined, - error: toolResponse.error + tool_id: event.requestId, + status: event.isError ? 'error' : 'success', + output: displayText, + error: event.isError ? { - type: toolResponse.errorType || 'TOOL_EXECUTION_ERROR', - message: toolResponse.error.message, + type: + typeof event.data?.['errorType'] === 'string' + ? event.data['errorType'] + : 'TOOL_EXECUTION_ERROR', + message: errorMsg, } : undefined, }); } + if (event.isError) { + const displayText = getFirstText(event.displayContent); + const errorMsg = getFirstText(event.content) ?? 'Tool error'; + + if (event.data?.['errorType'] === ToolErrorType.STOP_EXECUTION) { + const stopMessage = `Agent execution stopped: ${errorMsg}`; + if (config.getOutputFormat() === OutputFormat.TEXT) { + process.stderr.write(`${stopMessage}\n`); + } + } - if (toolResponse.error) { handleToolError( - requestInfo.name, - toolResponse.error, + event.name, + new Error(errorMsg), config, - toolResponse.errorType || 'TOOL_EXECUTION_ERROR', - typeof toolResponse.resultDisplay === 'string' - ? toolResponse.resultDisplay + typeof event.data?.['errorType'] === 'string' + ? event.data['errorType'] : undefined, + displayText, ); } - - if (toolResponse.responseParts) { - toolResponseParts.push(...toolResponse.responseParts); + break; + } + case 'error': { + if (event.fatal) { + throw reconstructFatalError(event); } - } - // Record tool calls with full metadata before sending responses to Gemini - try { - const currentModel = - geminiClient.getCurrentSequenceModel() ?? config.getModel(); - geminiClient - .getChat() - .recordCompletedToolCalls(currentModel, completedToolCalls); + const errorCode = event._meta?.['code']; - await recordToolCallInteractions(config, completedToolCalls); - } catch (error) { - debugLogger.error( - `Error recording completed tool call information: ${error}`, - ); - } + if (errorCode === 'MAX_TURNS_EXCEEDED') { + if (streamFormatter) { + streamFormatter.emitEvent({ + type: JsonStreamEventType.ERROR, + timestamp: new Date().toISOString(), + severity: 'error', + message: event.message, + }); + } + break; + } - // Check if any tool requested to stop execution immediately - const stopExecutionTool = completedToolCalls.find( - (tc) => tc.response.errorType === ToolErrorType.STOP_EXECUTION, - ); - - if (stopExecutionTool && stopExecutionTool.response.error) { - const stopMessage = `Agent execution stopped: ${stopExecutionTool.response.error.message}`; + if (errorCode === 'AGENT_EXECUTION_BLOCKED') { + if (config.getOutputFormat() === OutputFormat.TEXT) { + process.stderr.write(`[WARNING] ${event.message}\n`); + } + break; + } + const severity = + event.status === 'RESOURCE_EXHAUSTED' ? 'error' : 'warning'; if (config.getOutputFormat() === OutputFormat.TEXT) { - process.stderr.write(`${stopMessage}\n`); + process.stderr.write(`[WARNING] ${event.message}\n`); } - - // Emit final result event for streaming JSON if (streamFormatter) { - const metrics = uiTelemetryService.getMetrics(); - const durationMs = Date.now() - startTime; streamFormatter.emitEvent({ - type: JsonStreamEventType.RESULT, + type: JsonStreamEventType.ERROR, timestamp: new Date().toISOString(), - status: 'success', - stats: streamFormatter.convertToStreamStats( - metrics, - durationMs, - ), + severity, + message: event.message, }); - } else if (config.getOutputFormat() === OutputFormat.JSON) { - const formatter = new JsonFormatter(); - const stats = uiTelemetryService.getMetrics(); - textOutput.write( - formatter.format(config.getSessionId(), responseText, stats), - ); - } else { - textOutput.ensureTrailingNewline(); // Ensure a final newline } - return; + break; } + case 'stream_end': { + if (event.reason === 'aborted') { + handleCancellationError(config); + } else if (event.reason === 'max_turns') { + handleMaxTurnsExceededError(config); + } - currentMessages = [{ role: 'user', parts: toolResponseParts }]; - } else { - // Emit final result event for streaming JSON - if (streamFormatter) { - const metrics = uiTelemetryService.getMetrics(); - const durationMs = Date.now() - startTime; - streamFormatter.emitEvent({ - type: JsonStreamEventType.RESULT, - timestamp: new Date().toISOString(), - status: 'success', - stats: streamFormatter.convertToStreamStats(metrics, durationMs), - }); - } else if (config.getOutputFormat() === OutputFormat.JSON) { - const formatter = new JsonFormatter(); - const stats = uiTelemetryService.getMetrics(); - textOutput.write( - formatter.format(config.getSessionId(), responseText, stats), - ); - } else { - textOutput.ensureTrailingNewline(); // Ensure a final newline + const stopMessage = + typeof event.data?.['message'] === 'string' + ? event.data['message'] + : ''; + if (stopMessage && config.getOutputFormat() === OutputFormat.TEXT) { + process.stderr.write(`Agent execution stopped: ${stopMessage}\n`); + } + + emitFinalSuccessResult(); + streamEnded = true; + break; } - return; + case 'custom': { + if (event.kind === 'loop_detected') { + if (streamFormatter) { + streamFormatter.emitEvent({ + type: JsonStreamEventType.ERROR, + timestamp: new Date().toISOString(), + severity: 'warning', + message: 'Loop detected, stopping execution', + }); + } + } + break; + } + default: + break; } } } catch (error) { diff --git a/packages/core/src/agent/content-utils.test.ts b/packages/core/src/agent/content-utils.test.ts new file mode 100644 index 0000000000..4ca4d2eff4 --- /dev/null +++ b/packages/core/src/agent/content-utils.test.ts @@ -0,0 +1,266 @@ +/** + * @license + * Copyright 2026 Google LLC + * SPDX-License-Identifier: Apache-2.0 + */ + +import { describe, expect, it } from 'vitest'; +import { + geminiPartsToContentParts, + contentPartsToGeminiParts, + toolResultDisplayToContentParts, + buildToolResponseData, +} from './content-utils.js'; +import type { Part } from '@google/genai'; +import type { ContentPart } from './types.js'; + +describe('geminiPartsToContentParts', () => { + it('converts text parts', () => { + const parts: Part[] = [{ text: 'hello' }]; + expect(geminiPartsToContentParts(parts)).toEqual([ + { type: 'text', text: 'hello' }, + ]); + }); + + it('converts thought parts', () => { + const parts: Part[] = [ + { text: 'thinking...', thought: true, thoughtSignature: 'sig123' }, + ]; + expect(geminiPartsToContentParts(parts)).toEqual([ + { + type: 'thought', + thought: 'thinking...', + thoughtSignature: 'sig123', + }, + ]); + }); + + it('converts thought parts without signature', () => { + const parts: Part[] = [{ text: 'thinking...', thought: true }]; + expect(geminiPartsToContentParts(parts)).toEqual([ + { type: 'thought', thought: 'thinking...' }, + ]); + }); + + it('converts inlineData parts to media', () => { + const parts: Part[] = [ + { inlineData: { data: 'base64data', mimeType: 'image/png' } }, + ]; + expect(geminiPartsToContentParts(parts)).toEqual([ + { type: 'media', data: 'base64data', mimeType: 'image/png' }, + ]); + }); + + it('converts fileData parts to media', () => { + const parts: Part[] = [ + { + fileData: { + fileUri: 'gs://bucket/file.pdf', + mimeType: 'application/pdf', + }, + }, + ]; + expect(geminiPartsToContentParts(parts)).toEqual([ + { + type: 'media', + uri: 'gs://bucket/file.pdf', + mimeType: 'application/pdf', + }, + ]); + }); + + it('converts functionCall parts to text with metadata', () => { + const parts: Part[] = [ + { functionCall: { name: 'myFunc', args: { key: 'value' } } }, + ]; + const result = geminiPartsToContentParts(parts); + expect(result).toHaveLength(1); + expect(result[0]?.type).toBe('text'); + expect(result[0]?._meta).toEqual({ partType: 'functionCall' }); + const parsed = JSON.parse( + (result[0] as { type: 'text'; text: string }).text, + ); + expect(parsed.functionCall.name).toBe('myFunc'); + }); + + it('converts functionResponse parts to text with metadata', () => { + const parts: Part[] = [ + { + functionResponse: { + name: 'myFunc', + response: { output: 'result' }, + }, + }, + ]; + const result = geminiPartsToContentParts(parts); + expect(result).toHaveLength(1); + expect(result[0]?.type).toBe('text'); + expect(result[0]?._meta).toEqual({ partType: 'functionResponse' }); + }); + + it('serializes unknown part types to text with _meta', () => { + const parts: Part[] = [{ unknownField: 'data' } as Part]; + const result = geminiPartsToContentParts(parts); + expect(result).toHaveLength(1); + expect(result[0]?.type).toBe('text'); + expect(result[0]?._meta).toEqual({ partType: 'unknown' }); + }); + + it('handles empty array', () => { + expect(geminiPartsToContentParts([])).toEqual([]); + }); + + it('handles mixed parts', () => { + const parts: Part[] = [ + { text: 'hello' }, + { inlineData: { data: 'img', mimeType: 'image/jpeg' } }, + { text: 'thought', thought: true }, + ]; + const result = geminiPartsToContentParts(parts); + expect(result).toHaveLength(3); + expect(result[0]?.type).toBe('text'); + expect(result[1]?.type).toBe('media'); + expect(result[2]?.type).toBe('thought'); + }); +}); + +describe('contentPartsToGeminiParts', () => { + it('converts text ContentParts', () => { + const content: ContentPart[] = [{ type: 'text', text: 'hello' }]; + expect(contentPartsToGeminiParts(content)).toEqual([{ text: 'hello' }]); + }); + + it('converts thought ContentParts', () => { + const content: ContentPart[] = [ + { type: 'thought', thought: 'thinking...', thoughtSignature: 'sig' }, + ]; + expect(contentPartsToGeminiParts(content)).toEqual([ + { text: 'thinking...', thought: true, thoughtSignature: 'sig' }, + ]); + }); + + it('converts thought ContentParts without signature', () => { + const content: ContentPart[] = [ + { type: 'thought', thought: 'thinking...' }, + ]; + expect(contentPartsToGeminiParts(content)).toEqual([ + { text: 'thinking...', thought: true }, + ]); + }); + + it('converts media ContentParts with data to inlineData', () => { + const content: ContentPart[] = [ + { type: 'media', data: 'base64', mimeType: 'image/png' }, + ]; + expect(contentPartsToGeminiParts(content)).toEqual([ + { inlineData: { data: 'base64', mimeType: 'image/png' } }, + ]); + }); + + it('converts media ContentParts with uri to fileData', () => { + const content: ContentPart[] = [ + { type: 'media', uri: 'gs://bucket/file', mimeType: 'application/pdf' }, + ]; + expect(contentPartsToGeminiParts(content)).toEqual([ + { + fileData: { fileUri: 'gs://bucket/file', mimeType: 'application/pdf' }, + }, + ]); + }); + + it('converts reference ContentParts to text', () => { + const content: ContentPart[] = [{ type: 'reference', text: '@file.ts' }]; + expect(contentPartsToGeminiParts(content)).toEqual([{ text: '@file.ts' }]); + }); + + it('handles empty array', () => { + expect(contentPartsToGeminiParts([])).toEqual([]); + }); + + it('skips media parts with no data or uri', () => { + const content: ContentPart[] = [{ type: 'media', mimeType: 'image/png' }]; + expect(contentPartsToGeminiParts(content)).toEqual([]); + }); + + it('defaults mimeType for media with data but no mimeType', () => { + const content: ContentPart[] = [{ type: 'media', data: 'base64data' }]; + const result = contentPartsToGeminiParts(content); + expect(result).toEqual([ + { + inlineData: { + data: 'base64data', + mimeType: 'application/octet-stream', + }, + }, + ]); + }); + + it('serializes unknown ContentPart variants', () => { + // Force an unknown variant past the type system + const content = [ + { type: 'custom_widget', payload: 123 }, + ] as unknown as ContentPart[]; + const result = contentPartsToGeminiParts(content); + expect(result).toHaveLength(1); + expect(result[0]).toEqual({ + text: JSON.stringify({ type: 'custom_widget', payload: 123 }), + }); + }); +}); + +describe('toolResultDisplayToContentParts', () => { + it('returns undefined for undefined', () => { + expect(toolResultDisplayToContentParts(undefined)).toBeUndefined(); + }); + + it('returns undefined for null', () => { + expect(toolResultDisplayToContentParts(null)).toBeUndefined(); + }); + + it('handles string resultDisplay as-is', () => { + const result = toolResultDisplayToContentParts('File written'); + expect(result).toEqual([{ type: 'text', text: 'File written' }]); + }); + + it('stringifies object resultDisplay', () => { + const display = { type: 'FileDiff', oldPath: 'a.ts', newPath: 'b.ts' }; + const result = toolResultDisplayToContentParts(display); + expect(result).toEqual([{ type: 'text', text: JSON.stringify(display) }]); + }); +}); + +describe('buildToolResponseData', () => { + it('preserves outputFile and contentLength', () => { + const result = buildToolResponseData({ + outputFile: '/tmp/result.txt', + contentLength: 256, + }); + expect(result).toEqual({ + outputFile: '/tmp/result.txt', + contentLength: 256, + }); + }); + + it('returns undefined for empty response', () => { + const result = buildToolResponseData({}); + expect(result).toBeUndefined(); + }); + + it('includes errorType when present', () => { + const result = buildToolResponseData({ + errorType: 'permission_denied', + }); + expect(result).toEqual({ errorType: 'permission_denied' }); + }); + + it('merges data with other fields', () => { + const result = buildToolResponseData({ + data: { custom: 'value' }, + outputFile: '/tmp/file.txt', + }); + expect(result).toEqual({ + custom: 'value', + outputFile: '/tmp/file.txt', + }); + }); +}); diff --git a/packages/core/src/agent/content-utils.ts b/packages/core/src/agent/content-utils.ts new file mode 100644 index 0000000000..8364b84a71 --- /dev/null +++ b/packages/core/src/agent/content-utils.ts @@ -0,0 +1,158 @@ +/** + * @license + * Copyright 2026 Google LLC + * SPDX-License-Identifier: Apache-2.0 + */ + +import type { Part } from '@google/genai'; +import type { ContentPart } from './types.js'; + +/** + * Converts Gemini API Part objects to framework-agnostic ContentPart objects. + * Handles text, thought, inlineData, fileData parts and serializes unknown + * part types to text to avoid silent data loss. + */ +export function geminiPartsToContentParts(parts: Part[]): ContentPart[] { + const result: ContentPart[] = []; + for (const part of parts) { + if ('text' in part && part.text !== undefined) { + if ('thought' in part && part.thought) { + result.push({ + type: 'thought', + thought: part.text, + ...(part.thoughtSignature + ? { thoughtSignature: part.thoughtSignature } + : {}), + }); + } else { + result.push({ type: 'text', text: part.text }); + } + } else if ('inlineData' in part && part.inlineData) { + result.push({ + type: 'media', + data: part.inlineData.data, + mimeType: part.inlineData.mimeType, + }); + } else if ('fileData' in part && part.fileData) { + result.push({ + type: 'media', + uri: part.fileData.fileUri, + mimeType: part.fileData.mimeType, + }); + } else if ('functionCall' in part && part.functionCall) { + // Function calls are serialized to text so consumers can inspect them + result.push({ + type: 'text', + text: JSON.stringify({ + functionCall: { + name: part.functionCall.name, + args: part.functionCall.args, + }, + }), + _meta: { partType: 'functionCall' }, + }); + } else if ('functionResponse' in part && part.functionResponse) { + result.push({ + type: 'text', + text: JSON.stringify({ + functionResponse: { + name: part.functionResponse.name, + response: part.functionResponse.response, + }, + }), + _meta: { partType: 'functionResponse' }, + }); + } else { + // Fallback: serialize any unrecognized part type to text + result.push({ + type: 'text', + text: JSON.stringify(part), + _meta: { partType: 'unknown' }, + }); + } + } + return result; +} + +/** + * Converts framework-agnostic ContentPart objects to Gemini API Part objects. + */ +export function contentPartsToGeminiParts(content: ContentPart[]): Part[] { + const result: Part[] = []; + for (const part of content) { + switch (part.type) { + case 'text': + result.push({ text: part.text }); + break; + case 'thought': + result.push({ + text: part.thought, + thought: true, + ...(part.thoughtSignature + ? { thoughtSignature: part.thoughtSignature } + : {}), + }); + break; + case 'media': + if (part.data) { + result.push({ + inlineData: { + data: part.data, + mimeType: part.mimeType ?? 'application/octet-stream', + }, + }); + } else if (part.uri) { + result.push({ + fileData: { fileUri: part.uri, mimeType: part.mimeType }, + }); + } + break; + case 'reference': + // References are converted to text for the model + result.push({ text: part.text }); + break; + default: + // Serialize unknown ContentPart variants instead of dropping them + result.push({ text: JSON.stringify(part) }); + break; + } + } + return result; +} + +/** + * Converts a ToolCallResponseInfo.resultDisplay value into ContentPart[]. + * Handles string, object-valued (FileDiff, SubagentProgress, etc.), + * and undefined resultDisplay consistently. + */ +export function toolResultDisplayToContentParts( + resultDisplay: unknown, +): ContentPart[] | undefined { + if (resultDisplay === undefined || resultDisplay === null) { + return undefined; + } + const text = + typeof resultDisplay === 'string' + ? resultDisplay + : JSON.stringify(resultDisplay); + return [{ type: 'text', text }]; +} + +/** + * Builds the data record for a tool_response AgentEvent, preserving + * all available metadata from the ToolCallResponseInfo. + */ +export function buildToolResponseData(response: { + data?: Record; + errorType?: string; + outputFile?: string; + contentLength?: number; +}): Record | undefined { + const parts: Record = {}; + if (response.data) Object.assign(parts, response.data); + if (response.errorType) parts['errorType'] = response.errorType; + if (response.outputFile) parts['outputFile'] = response.outputFile; + if (response.contentLength !== undefined) + parts['contentLength'] = response.contentLength; + return Object.keys(parts).length > 0 ? parts : undefined; +} diff --git a/packages/core/src/agent/event-translator.test.ts b/packages/core/src/agent/event-translator.test.ts new file mode 100644 index 0000000000..30c87feea5 --- /dev/null +++ b/packages/core/src/agent/event-translator.test.ts @@ -0,0 +1,705 @@ +/** + * @license + * Copyright 2026 Google LLC + * SPDX-License-Identifier: Apache-2.0 + */ + +import { describe, expect, it, beforeEach } from 'vitest'; +import { FinishReason } from '@google/genai'; +import { ToolErrorType } from '../tools/tool-error.js'; +import { + translateEvent, + createTranslationState, + mapFinishReason, + mapHttpToGrpcStatus, + mapError, + mapUsage, + type TranslationState, +} from './event-translator.js'; +import { GeminiEventType } from '../core/turn.js'; +import type { ServerGeminiStreamEvent } from '../core/turn.js'; +import type { AgentEvent } from './types.js'; + +describe('createTranslationState', () => { + it('creates state with default streamId', () => { + const state = createTranslationState(); + expect(state.streamId).toBeDefined(); + expect(state.streamStartEmitted).toBe(false); + expect(state.model).toBeUndefined(); + expect(state.eventCounter).toBe(0); + expect(state.pendingToolNames.size).toBe(0); + }); + + it('creates state with custom streamId', () => { + const state = createTranslationState('custom-stream'); + expect(state.streamId).toBe('custom-stream'); + }); +}); + +describe('translateEvent', () => { + let state: TranslationState; + + beforeEach(() => { + state = createTranslationState('test-stream'); + }); + + describe('Content events', () => { + it('emits stream_start + message for first content event', () => { + const event: ServerGeminiStreamEvent = { + type: GeminiEventType.Content, + value: 'Hello world', + }; + const result = translateEvent(event, state); + expect(result).toHaveLength(2); + expect(result[0]?.type).toBe('stream_start'); + expect(result[1]?.type).toBe('message'); + const msg = result[1] as AgentEvent<'message'>; + expect(msg.role).toBe('agent'); + expect(msg.content).toEqual([{ type: 'text', text: 'Hello world' }]); + }); + + it('skips stream_start for subsequent content events', () => { + state.streamStartEmitted = true; + const event: ServerGeminiStreamEvent = { + type: GeminiEventType.Content, + value: 'more text', + }; + const result = translateEvent(event, state); + expect(result).toHaveLength(1); + expect(result[0]?.type).toBe('message'); + }); + }); + + describe('Thought events', () => { + it('emits thought content with metadata', () => { + state.streamStartEmitted = true; + const event: ServerGeminiStreamEvent = { + type: GeminiEventType.Thought, + value: { subject: 'Planning', description: 'I am thinking...' }, + }; + const result = translateEvent(event, state); + expect(result).toHaveLength(1); + const msg = result[0] as AgentEvent<'message'>; + expect(msg.content).toEqual([ + { type: 'thought', thought: 'I am thinking...' }, + ]); + expect(msg._meta?.['subject']).toBe('Planning'); + }); + }); + + describe('ToolCallRequest events', () => { + it('emits tool_request and tracks pending tool name', () => { + state.streamStartEmitted = true; + const event: ServerGeminiStreamEvent = { + type: GeminiEventType.ToolCallRequest, + value: { + callId: 'call-1', + name: 'read_file', + args: { path: '/tmp/test' }, + isClientInitiated: false, + prompt_id: 'p1', + }, + }; + const result = translateEvent(event, state); + expect(result).toHaveLength(1); + const req = result[0] as AgentEvent<'tool_request'>; + expect(req.requestId).toBe('call-1'); + expect(req.name).toBe('read_file'); + expect(req.args).toEqual({ path: '/tmp/test' }); + expect(state.pendingToolNames.get('call-1')).toBe('read_file'); + }); + }); + + describe('ToolCallResponse events', () => { + it('emits tool_response with content from responseParts', () => { + state.streamStartEmitted = true; + state.pendingToolNames.set('call-1', 'read_file'); + const event: ServerGeminiStreamEvent = { + type: GeminiEventType.ToolCallResponse, + value: { + callId: 'call-1', + responseParts: [{ text: 'file contents' }], + resultDisplay: undefined, + error: undefined, + errorType: undefined, + }, + }; + const result = translateEvent(event, state); + expect(result).toHaveLength(1); + const resp = result[0] as AgentEvent<'tool_response'>; + expect(resp.requestId).toBe('call-1'); + expect(resp.name).toBe('read_file'); + expect(resp.content).toEqual([{ type: 'text', text: 'file contents' }]); + expect(resp.isError).toBe(false); + expect(state.pendingToolNames.has('call-1')).toBe(false); + }); + + it('uses error.message for content when tool errored', () => { + state.streamStartEmitted = true; + state.pendingToolNames.set('call-2', 'write_file'); + const event: ServerGeminiStreamEvent = { + type: GeminiEventType.ToolCallResponse, + value: { + callId: 'call-2', + responseParts: [{ text: 'stale parts' }], + resultDisplay: 'Permission denied', + error: new Error('Permission denied to write'), + errorType: ToolErrorType.PERMISSION_DENIED, + }, + }; + const result = translateEvent(event, state); + expect(result).toHaveLength(1); + const resp = result[0] as AgentEvent<'tool_response'>; + expect(resp.isError).toBe(true); + // Should use error.message, not responseParts + expect(resp.content).toEqual([ + { type: 'text', text: 'Permission denied to write' }, + ]); + expect(resp.displayContent).toEqual([ + { type: 'text', text: 'Permission denied' }, + ]); + expect(resp.data).toEqual({ errorType: 'permission_denied' }); + }); + + it('uses "unknown" name for untracked tool calls', () => { + state.streamStartEmitted = true; + const event: ServerGeminiStreamEvent = { + type: GeminiEventType.ToolCallResponse, + value: { + callId: 'untracked', + responseParts: [{ text: 'data' }], + resultDisplay: undefined, + error: undefined, + errorType: undefined, + }, + }; + const result = translateEvent(event, state); + const resp = result[0] as AgentEvent<'tool_response'>; + expect(resp.name).toBe('unknown'); + }); + + it('stringifies object resultDisplay correctly', () => { + state.streamStartEmitted = true; + state.pendingToolNames.set('call-3', 'diff_tool'); + const objectDisplay = { type: 'FileDiff', before: 'a', after: 'b' }; + const event: ServerGeminiStreamEvent = { + type: GeminiEventType.ToolCallResponse, + value: { + callId: 'call-3', + responseParts: [{ text: 'diff result' }], + resultDisplay: objectDisplay, + error: undefined, + errorType: undefined, + }, + }; + const result = translateEvent(event, state); + const resp = result[0] as AgentEvent<'tool_response'>; + expect(resp.displayContent).toEqual([ + { type: 'text', text: JSON.stringify(objectDisplay) }, + ]); + }); + + it('passes through string resultDisplay as-is', () => { + state.streamStartEmitted = true; + state.pendingToolNames.set('call-4', 'shell'); + const event: ServerGeminiStreamEvent = { + type: GeminiEventType.ToolCallResponse, + value: { + callId: 'call-4', + responseParts: [{ text: 'output' }], + resultDisplay: 'Command output text', + error: undefined, + errorType: undefined, + }, + }; + const result = translateEvent(event, state); + const resp = result[0] as AgentEvent<'tool_response'>; + expect(resp.displayContent).toEqual([ + { type: 'text', text: 'Command output text' }, + ]); + }); + + it('preserves outputFile and contentLength in data', () => { + state.streamStartEmitted = true; + state.pendingToolNames.set('call-5', 'write_file'); + const event: ServerGeminiStreamEvent = { + type: GeminiEventType.ToolCallResponse, + value: { + callId: 'call-5', + responseParts: [{ text: 'written' }], + resultDisplay: undefined, + error: undefined, + errorType: undefined, + outputFile: '/tmp/out.txt', + contentLength: 42, + }, + }; + const result = translateEvent(event, state); + const resp = result[0] as AgentEvent<'tool_response'>; + expect(resp.data?.['outputFile']).toBe('/tmp/out.txt'); + expect(resp.data?.['contentLength']).toBe(42); + }); + + it('handles multi-part responses (text + inlineData)', () => { + state.streamStartEmitted = true; + state.pendingToolNames.set('call-6', 'screenshot'); + const event: ServerGeminiStreamEvent = { + type: GeminiEventType.ToolCallResponse, + value: { + callId: 'call-6', + responseParts: [ + { text: 'Here is the screenshot' }, + { inlineData: { data: 'base64img', mimeType: 'image/png' } }, + ], + resultDisplay: undefined, + error: undefined, + errorType: undefined, + }, + }; + const result = translateEvent(event, state); + const resp = result[0] as AgentEvent<'tool_response'>; + expect(resp.content).toEqual([ + { type: 'text', text: 'Here is the screenshot' }, + { type: 'media', data: 'base64img', mimeType: 'image/png' }, + ]); + expect(resp.isError).toBe(false); + }); + }); + + describe('Error events', () => { + it('emits error event for structured errors', () => { + state.streamStartEmitted = true; + const event: ServerGeminiStreamEvent = { + type: GeminiEventType.Error, + value: { error: { message: 'Rate limited', status: 429 } }, + }; + const result = translateEvent(event, state); + expect(result).toHaveLength(1); + const err = result[0] as AgentEvent<'error'>; + expect(err.status).toBe('RESOURCE_EXHAUSTED'); + expect(err.message).toBe('Rate limited'); + expect(err.fatal).toBe(true); + }); + + it('emits error event for Error instances', () => { + state.streamStartEmitted = true; + const event: ServerGeminiStreamEvent = { + type: GeminiEventType.Error, + value: { error: new Error('Something broke') }, + }; + const result = translateEvent(event, state); + const err = result[0] as AgentEvent<'error'>; + expect(err.status).toBe('INTERNAL'); + expect(err.message).toBe('Something broke'); + }); + }); + + describe('ModelInfo events', () => { + it('emits stream_start when no stream started yet', () => { + const event: ServerGeminiStreamEvent = { + type: GeminiEventType.ModelInfo, + value: 'gemini-2.5-pro', + }; + const result = translateEvent(event, state); + expect(result).toHaveLength(1); + expect(result[0]?.type).toBe('stream_start'); + expect(state.model).toBe('gemini-2.5-pro'); + expect(state.streamStartEmitted).toBe(true); + }); + + it('emits session_update when stream already started', () => { + state.streamStartEmitted = true; + const event: ServerGeminiStreamEvent = { + type: GeminiEventType.ModelInfo, + value: 'gemini-2.5-flash', + }; + const result = translateEvent(event, state); + expect(result).toHaveLength(1); + expect(result[0]?.type).toBe('session_update'); + }); + }); + + describe('AgentExecutionStopped events', () => { + it('emits stream_end with the final stop message in data.message', () => { + state.streamStartEmitted = true; + const event: ServerGeminiStreamEvent = { + type: GeminiEventType.AgentExecutionStopped, + value: { + reason: 'before_model', + systemMessage: 'Stopped by hook', + contextCleared: true, + }, + }; + const result = translateEvent(event, state); + expect(result).toHaveLength(1); + const streamEnd = result[0] as AgentEvent<'stream_end'>; + expect(streamEnd.type).toBe('stream_end'); + expect(streamEnd.reason).toBe('completed'); + expect(streamEnd.data).toEqual({ message: 'Stopped by hook' }); + }); + + it('uses reason when systemMessage is not set', () => { + state.streamStartEmitted = true; + const event: ServerGeminiStreamEvent = { + type: GeminiEventType.AgentExecutionStopped, + value: { reason: 'hook' }, + }; + const result = translateEvent(event, state); + expect(result).toHaveLength(1); + const streamEnd = result[0] as AgentEvent<'stream_end'>; + expect(streamEnd.data).toEqual({ message: 'hook' }); + }); + }); + + describe('AgentExecutionBlocked events', () => { + it('emits non-fatal error event (non-terminal, stream continues)', () => { + state.streamStartEmitted = true; + const event: ServerGeminiStreamEvent = { + type: GeminiEventType.AgentExecutionBlocked, + value: { reason: 'Policy violation' }, + }; + const result = translateEvent(event, state); + expect(result).toHaveLength(1); + const err = result[0] as AgentEvent<'error'>; + expect(err.type).toBe('error'); + expect(err.fatal).toBe(false); + expect(err._meta?.['code']).toBe('AGENT_EXECUTION_BLOCKED'); + expect(err.message).toBe('Agent execution blocked: Policy violation'); + }); + + it('uses systemMessage in the final error message when available', () => { + state.streamStartEmitted = true; + const event: ServerGeminiStreamEvent = { + type: GeminiEventType.AgentExecutionBlocked, + value: { + reason: 'hook_blocked', + systemMessage: 'Blocked by policy hook', + contextCleared: true, + }, + }; + const result = translateEvent(event, state); + const err = result[0] as AgentEvent<'error'>; + expect(err.message).toBe( + 'Agent execution blocked: Blocked by policy hook', + ); + }); + }); + + describe('LoopDetected events', () => { + it('emits a custom loop_detected event', () => { + state.streamStartEmitted = true; + const event: ServerGeminiStreamEvent = { + type: GeminiEventType.LoopDetected, + }; + const result = translateEvent(event, state); + expect(result).toHaveLength(1); + expect(result[0]?.type).toBe('custom'); + expect((result[0] as AgentEvent<'custom'>).kind).toBe('loop_detected'); + }); + }); + + describe('MaxSessionTurns events', () => { + it('emits a non-fatal max-turns error event', () => { + state.streamStartEmitted = true; + const event: ServerGeminiStreamEvent = { + type: GeminiEventType.MaxSessionTurns, + }; + const result = translateEvent(event, state); + expect(result).toHaveLength(1); + const err = result[0] as AgentEvent<'error'>; + expect(err.type).toBe('error'); + expect(err.fatal).toBe(false); + expect(err.status).toBe('RESOURCE_EXHAUSTED'); + expect(err._meta?.['code']).toBe('MAX_TURNS_EXCEEDED'); + expect(err.message).toBe('Maximum session turns exceeded'); + }); + }); + + describe('Finished events', () => { + it('emits usage + stream_end for STOP', () => { + state.streamStartEmitted = true; + state.model = 'gemini-2.5-pro'; + const event: ServerGeminiStreamEvent = { + type: GeminiEventType.Finished, + value: { + reason: FinishReason.STOP, + usageMetadata: { + promptTokenCount: 100, + candidatesTokenCount: 50, + cachedContentTokenCount: 10, + }, + }, + }; + const result = translateEvent(event, state); + expect(result).toHaveLength(2); + + const usage = result[0] as AgentEvent<'usage'>; + expect(usage.model).toBe('gemini-2.5-pro'); + expect(usage.inputTokens).toBe(100); + expect(usage.outputTokens).toBe(50); + expect(usage.cachedTokens).toBe(10); + + const end = result[1] as AgentEvent<'stream_end'>; + expect(end.reason).toBe('completed'); + }); + + it('emits stream_end without usage when no metadata', () => { + state.streamStartEmitted = true; + const event: ServerGeminiStreamEvent = { + type: GeminiEventType.Finished, + value: { reason: undefined, usageMetadata: undefined }, + }; + const result = translateEvent(event, state); + expect(result).toHaveLength(1); + expect(result[0]?.type).toBe('stream_end'); + }); + }); + + describe('Citation events', () => { + it('emits message with citation meta', () => { + state.streamStartEmitted = true; + const event: ServerGeminiStreamEvent = { + type: GeminiEventType.Citation, + value: 'Source: example.com', + }; + const result = translateEvent(event, state); + expect(result).toHaveLength(1); + const msg = result[0] as AgentEvent<'message'>; + expect(msg.content).toEqual([ + { type: 'text', text: 'Source: example.com' }, + ]); + expect(msg._meta?.['citation']).toBe(true); + }); + }); + + describe('UserCancelled events', () => { + it('emits stream_end with reason aborted', () => { + state.streamStartEmitted = true; + const event: ServerGeminiStreamEvent = { + type: GeminiEventType.UserCancelled, + }; + const result = translateEvent(event, state); + expect(result).toHaveLength(1); + const end = result[0] as AgentEvent<'stream_end'>; + expect(end.type).toBe('stream_end'); + expect(end.reason).toBe('aborted'); + }); + }); + + describe('ContextWindowWillOverflow events', () => { + it('emits fatal error', () => { + state.streamStartEmitted = true; + const event: ServerGeminiStreamEvent = { + type: GeminiEventType.ContextWindowWillOverflow, + value: { + estimatedRequestTokenCount: 150000, + remainingTokenCount: 10000, + }, + }; + const result = translateEvent(event, state); + expect(result).toHaveLength(1); + const err = result[0] as AgentEvent<'error'>; + expect(err.status).toBe('RESOURCE_EXHAUSTED'); + expect(err.fatal).toBe(true); + expect(err.message).toContain('150000'); + expect(err.message).toContain('10000'); + }); + }); + + describe('InvalidStream events', () => { + it('emits fatal error', () => { + state.streamStartEmitted = true; + const event: ServerGeminiStreamEvent = { + type: GeminiEventType.InvalidStream, + }; + const result = translateEvent(event, state); + expect(result).toHaveLength(1); + const err = result[0] as AgentEvent<'error'>; + expect(err.status).toBe('INTERNAL'); + expect(err.message).toBe('Invalid stream received from model'); + expect(err.fatal).toBe(true); + }); + }); + + describe('Events with no output', () => { + it('returns empty for Retry', () => { + const result = translateEvent({ type: GeminiEventType.Retry }, state); + expect(result).toEqual([]); + }); + + it('returns empty for ChatCompressed with null', () => { + const result = translateEvent( + { type: GeminiEventType.ChatCompressed, value: null }, + state, + ); + expect(result).toEqual([]); + }); + + it('returns empty for ToolCallConfirmation', () => { + // ToolCallConfirmation is skipped in non-interactive mode (elicitations + // are deferred to the interactive runtime adaptation). + const event = { + type: GeminiEventType.ToolCallConfirmation, + value: { + request: { + callId: 'c1', + name: 'tool', + args: {}, + isClientInitiated: false, + prompt_id: 'p1', + }, + details: { type: 'info', title: 'Confirm', prompt: 'Confirm?' }, + }, + } as ServerGeminiStreamEvent; + const result = translateEvent(event, state); + expect(result).toEqual([]); + }); + }); + + describe('Event IDs', () => { + it('generates sequential IDs', () => { + state.streamStartEmitted = true; + const e1 = translateEvent( + { type: GeminiEventType.Content, value: 'a' }, + state, + ); + const e2 = translateEvent( + { type: GeminiEventType.Content, value: 'b' }, + state, + ); + expect(e1[0]?.id).toBe('test-stream-0'); + expect(e2[0]?.id).toBe('test-stream-1'); + }); + + it('includes streamId in events', () => { + const events = translateEvent( + { type: GeminiEventType.Content, value: 'hi' }, + state, + ); + for (const e of events) { + expect(e.streamId).toBe('test-stream'); + } + }); + }); +}); + +describe('mapFinishReason', () => { + it('maps STOP to completed', () => { + expect(mapFinishReason(FinishReason.STOP)).toBe('completed'); + }); + + it('maps undefined to completed', () => { + expect(mapFinishReason(undefined)).toBe('completed'); + }); + + it('maps MAX_TOKENS to max_budget', () => { + expect(mapFinishReason(FinishReason.MAX_TOKENS)).toBe('max_budget'); + }); + + it('maps SAFETY to refusal', () => { + expect(mapFinishReason(FinishReason.SAFETY)).toBe('refusal'); + }); + + it('maps MALFORMED_FUNCTION_CALL to failed', () => { + expect(mapFinishReason(FinishReason.MALFORMED_FUNCTION_CALL)).toBe( + 'failed', + ); + }); + + it('maps RECITATION to refusal', () => { + expect(mapFinishReason(FinishReason.RECITATION)).toBe('refusal'); + }); + + it('maps LANGUAGE to refusal', () => { + expect(mapFinishReason(FinishReason.LANGUAGE)).toBe('refusal'); + }); + + it('maps BLOCKLIST to refusal', () => { + expect(mapFinishReason(FinishReason.BLOCKLIST)).toBe('refusal'); + }); + + it('maps OTHER to failed', () => { + expect(mapFinishReason(FinishReason.OTHER)).toBe('failed'); + }); + + it('maps PROHIBITED_CONTENT to refusal', () => { + expect(mapFinishReason(FinishReason.PROHIBITED_CONTENT)).toBe('refusal'); + }); +}); + +describe('mapHttpToGrpcStatus', () => { + it('maps 400 to INVALID_ARGUMENT', () => { + expect(mapHttpToGrpcStatus(400)).toBe('INVALID_ARGUMENT'); + }); + + it('maps 401 to UNAUTHENTICATED', () => { + expect(mapHttpToGrpcStatus(401)).toBe('UNAUTHENTICATED'); + }); + + it('maps 429 to RESOURCE_EXHAUSTED', () => { + expect(mapHttpToGrpcStatus(429)).toBe('RESOURCE_EXHAUSTED'); + }); + + it('maps undefined to INTERNAL', () => { + expect(mapHttpToGrpcStatus(undefined)).toBe('INTERNAL'); + }); + + it('maps unknown codes to INTERNAL', () => { + expect(mapHttpToGrpcStatus(418)).toBe('INTERNAL'); + }); +}); + +describe('mapError', () => { + it('maps structured errors with status', () => { + const result = mapError({ message: 'Rate limit', status: 429 }); + expect(result.status).toBe('RESOURCE_EXHAUSTED'); + expect(result.message).toBe('Rate limit'); + expect(result.fatal).toBe(true); + }); + + it('maps Error instances', () => { + const result = mapError(new Error('Something failed')); + expect(result.status).toBe('INTERNAL'); + expect(result.message).toBe('Something failed'); + }); + + it('preserves error name in _meta', () => { + class CustomError extends Error { + constructor(msg: string) { + super(msg); + } + } + const result = mapError(new CustomError('test')); + expect(result._meta?.['errorName']).toBe('CustomError'); + }); + + it('maps non-Error values to string', () => { + const result = mapError('raw string error'); + expect(result.message).toBe('raw string error'); + expect(result.status).toBe('INTERNAL'); + }); +}); + +describe('mapUsage', () => { + it('maps all fields', () => { + const result = mapUsage( + { + promptTokenCount: 100, + candidatesTokenCount: 50, + cachedContentTokenCount: 25, + }, + 'gemini-2.5-pro', + ); + expect(result).toEqual({ + model: 'gemini-2.5-pro', + inputTokens: 100, + outputTokens: 50, + cachedTokens: 25, + }); + }); + + it('uses "unknown" for missing model', () => { + const result = mapUsage({}); + expect(result.model).toBe('unknown'); + }); +}); diff --git a/packages/core/src/agent/event-translator.ts b/packages/core/src/agent/event-translator.ts new file mode 100644 index 0000000000..1c11b745ee --- /dev/null +++ b/packages/core/src/agent/event-translator.ts @@ -0,0 +1,456 @@ +/** + * @license + * Copyright 2026 Google LLC + * SPDX-License-Identifier: Apache-2.0 + */ + +/** + * @fileoverview Pure, stateless-per-call translation functions that convert + * ServerGeminiStreamEvent objects into AgentEvent objects. + * + * No side effects, no generators. Each call to `translateEvent` takes an event + * and mutable TranslationState, returning zero or more AgentEvents. + */ + +import type { FinishReason } from '@google/genai'; +import { GeminiEventType } from '../core/turn.js'; +import type { + ServerGeminiStreamEvent, + StructuredError, + GeminiFinishedEventValue, +} from '../core/turn.js'; +import type { AgentEvent, StreamEndReason, ErrorData, Usage } from './types.js'; +import { + geminiPartsToContentParts, + toolResultDisplayToContentParts, + buildToolResponseData, +} from './content-utils.js'; + +// --------------------------------------------------------------------------- +// Translation State +// --------------------------------------------------------------------------- + +export interface TranslationState { + streamId: string; + streamStartEmitted: boolean; + model: string | undefined; + eventCounter: number; + /** Tracks callId → tool name from requests so responses can reference the name. */ + pendingToolNames: Map; +} + +export function createTranslationState(streamId?: string): TranslationState { + return { + streamId: streamId ?? crypto.randomUUID(), + streamStartEmitted: false, + model: undefined, + eventCounter: 0, + pendingToolNames: new Map(), + }; +} + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +function makeEvent( + type: AgentEvent['type'], + state: TranslationState, + payload: Partial, +): AgentEvent { + const id = `${state.streamId}-${state.eventCounter++}`; + // eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion -- constructing AgentEvent from common fields + payload + return { + ...payload, + id, + timestamp: new Date().toISOString(), + streamId: state.streamId, + type, + } as AgentEvent; +} + +function ensureStreamStart(state: TranslationState, out: AgentEvent[]): void { + if (!state.streamStartEmitted) { + out.push(makeEvent('stream_start', state, { streamId: state.streamId })); + state.streamStartEmitted = true; + } +} + +// --------------------------------------------------------------------------- +// Core Translator +// --------------------------------------------------------------------------- + +/** + * Translates a single ServerGeminiStreamEvent into zero or more AgentEvents. + * Mutates `state` (counter, flags) as a side effect. + */ +export function translateEvent( + event: ServerGeminiStreamEvent, + state: TranslationState, +): AgentEvent[] { + const out: AgentEvent[] = []; + + switch (event.type) { + case GeminiEventType.ModelInfo: + state.model = event.value; + if (!state.streamStartEmitted) { + out.push( + makeEvent('stream_start', state, { streamId: state.streamId }), + ); + state.streamStartEmitted = true; + } else { + out.push(makeEvent('session_update', state, { model: event.value })); + } + break; + + case GeminiEventType.Content: + ensureStreamStart(state, out); + out.push( + makeEvent('message', state, { + role: 'agent', + content: [{ type: 'text', text: event.value }], + }), + ); + break; + + case GeminiEventType.Thought: + ensureStreamStart(state, out); + out.push( + makeEvent('message', state, { + role: 'agent', + content: [{ type: 'thought', thought: event.value.description }], + _meta: event.value.subject + ? { source: 'agent', subject: event.value.subject } + : { source: 'agent' }, + }), + ); + break; + + case GeminiEventType.Citation: + ensureStreamStart(state, out); + out.push( + makeEvent('message', state, { + role: 'agent', + content: [{ type: 'text', text: event.value }], + _meta: { source: 'agent', citation: true }, + }), + ); + break; + + case GeminiEventType.Finished: + handleFinished(event.value, state, out); + break; + + case GeminiEventType.Error: + handleError(event.value.error, state, out); + break; + + case GeminiEventType.UserCancelled: + ensureStreamStart(state, out); + out.push( + makeEvent('stream_end', state, { + streamId: state.streamId, + reason: 'aborted', + }), + ); + break; + + case GeminiEventType.MaxSessionTurns: + ensureStreamStart(state, out); + out.push( + makeEvent('error', state, { + status: 'RESOURCE_EXHAUSTED', + message: 'Maximum session turns exceeded', + fatal: false, + _meta: { code: 'MAX_TURNS_EXCEEDED' }, + }), + ); + break; + + case GeminiEventType.LoopDetected: + ensureStreamStart(state, out); + out.push( + makeEvent('custom', state, { + kind: 'loop_detected', + }), + ); + // No stream_end — the stream continues. Consumer decides how to handle: + // non-interactive emits a warning, interactive shows a confirmation dialog. + break; + + case GeminiEventType.ContextWindowWillOverflow: + ensureStreamStart(state, out); + out.push( + makeEvent('error', state, { + status: 'RESOURCE_EXHAUSTED', + message: `Context window will overflow (estimated: ${event.value.estimatedRequestTokenCount}, remaining: ${event.value.remainingTokenCount})`, + fatal: true, + }), + ); + break; + + case GeminiEventType.AgentExecutionStopped: + ensureStreamStart(state, out); + out.push( + makeEvent('stream_end', state, { + streamId: state.streamId, + reason: 'completed', + data: { + message: event.value.systemMessage?.trim() || event.value.reason, + }, + }), + ); + break; + + case GeminiEventType.AgentExecutionBlocked: + ensureStreamStart(state, out); + out.push( + makeEvent('error', state, { + status: 'PERMISSION_DENIED', + message: `Agent execution blocked: ${event.value.systemMessage?.trim() || event.value.reason}`, + fatal: false, + _meta: { code: 'AGENT_EXECUTION_BLOCKED' }, + }), + ); + break; + + case GeminiEventType.InvalidStream: + ensureStreamStart(state, out); + out.push( + makeEvent('error', state, { + status: 'INTERNAL', + message: 'Invalid stream received from model', + fatal: true, + }), + ); + break; + + case GeminiEventType.ToolCallRequest: + ensureStreamStart(state, out); + state.pendingToolNames.set(event.value.callId, event.value.name); + out.push( + makeEvent('tool_request', state, { + requestId: event.value.callId, + name: event.value.name, + args: event.value.args, + }), + ); + break; + + case GeminiEventType.ToolCallResponse: { + ensureStreamStart(state, out); + const displayContent = toolResultDisplayToContentParts( + event.value.resultDisplay, + ); + const data = buildToolResponseData(event.value); + out.push( + makeEvent('tool_response', state, { + requestId: event.value.callId, + name: state.pendingToolNames.get(event.value.callId) ?? 'unknown', + content: event.value.error + ? [{ type: 'text', text: event.value.error.message }] + : geminiPartsToContentParts(event.value.responseParts), + isError: event.value.error !== undefined, + ...(displayContent ? { displayContent } : {}), + ...(data ? { data } : {}), + }), + ); + state.pendingToolNames.delete(event.value.callId); + break; + } + + case GeminiEventType.ToolCallConfirmation: + // Elicitations are handled separately by the session layer + break; + + // Internal concerns — no AgentEvent emitted + case GeminiEventType.ChatCompressed: + case GeminiEventType.Retry: + break; + + default: + break; + } + + return out; +} + +// --------------------------------------------------------------------------- +// Finished Event Handling +// --------------------------------------------------------------------------- + +function handleFinished( + value: GeminiFinishedEventValue, + state: TranslationState, + out: AgentEvent[], +): void { + ensureStreamStart(state, out); + + if (value.usageMetadata) { + const usage = mapUsage(value.usageMetadata, state.model); + out.push(makeEvent('usage', state, usage)); + } + + out.push( + makeEvent('stream_end', state, { + streamId: state.streamId, + reason: mapFinishReason(value.reason), + }), + ); +} + +// --------------------------------------------------------------------------- +// Error Handling +// --------------------------------------------------------------------------- + +function handleError( + error: unknown, + state: TranslationState, + out: AgentEvent[], +): void { + ensureStreamStart(state, out); + + const mapped = mapError(error); + out.push(makeEvent('error', state, mapped)); +} + +// --------------------------------------------------------------------------- +// Public Mapping Functions +// --------------------------------------------------------------------------- + +/** + * Maps a Gemini FinishReason to a StreamEndReason. + */ +export function mapFinishReason( + reason: FinishReason | undefined, +): StreamEndReason { + if (!reason) return 'completed'; + + switch (reason) { + case 'STOP': + case 'FINISH_REASON_UNSPECIFIED': + return 'completed'; + case 'MAX_TOKENS': + return 'max_budget'; + case 'SAFETY': + case 'RECITATION': + case 'LANGUAGE': + case 'BLOCKLIST': + case 'PROHIBITED_CONTENT': + case 'SPII': + return 'refusal'; + case 'MALFORMED_FUNCTION_CALL': + case 'OTHER': + return 'failed'; + default: + return 'failed'; + } +} + +/** + * Maps an HTTP status code to a gRPC-style status string. + */ +export function mapHttpToGrpcStatus( + httpStatus: number | undefined, +): ErrorData['status'] { + if (httpStatus === undefined) return 'INTERNAL'; + + switch (httpStatus) { + case 400: + return 'INVALID_ARGUMENT'; + case 401: + return 'UNAUTHENTICATED'; + case 403: + return 'PERMISSION_DENIED'; + case 404: + return 'NOT_FOUND'; + case 409: + return 'ALREADY_EXISTS'; + case 429: + return 'RESOURCE_EXHAUSTED'; + case 500: + return 'INTERNAL'; + case 501: + return 'UNIMPLEMENTED'; + case 503: + return 'UNAVAILABLE'; + case 504: + return 'DEADLINE_EXCEEDED'; + default: + return 'INTERNAL'; + } +} + +/** + * Maps a StructuredError (or unknown error value) to an ErrorData payload. + * Review fix #4: preserves error metadata (name, code, stack) in _meta. + */ +export function mapError( + error: unknown, +): ErrorData & { _meta?: Record } { + const meta: Record = {}; + + if (error instanceof Error) { + meta['errorName'] = error.constructor.name; + if ('exitCode' in error && typeof error.exitCode === 'number') { + meta['exitCode'] = error.exitCode; + } + if ('code' in error) { + meta['code'] = error.code; + } + } + + const hasMeta = Object.keys(meta).length > 0; + + if (isStructuredError(error)) { + return { + status: mapHttpToGrpcStatus(error.status), + message: error.message, + fatal: true, + ...(hasMeta ? { _meta: meta } : {}), + }; + } + + if (error instanceof Error) { + return { + status: 'INTERNAL', + message: error.message, + fatal: true, + ...(hasMeta ? { _meta: meta } : {}), + }; + } + + return { + status: 'INTERNAL', + message: String(error), + fatal: true, + }; +} + +function isStructuredError(error: unknown): error is StructuredError { + return ( + typeof error === 'object' && + error !== null && + 'message' in error && + typeof error.message === 'string' + ); +} + +/** + * Maps Gemini usageMetadata to Usage. + */ +export function mapUsage( + metadata: { + promptTokenCount?: number; + candidatesTokenCount?: number; + cachedContentTokenCount?: number; + }, + model?: string, +): Usage { + return { + model: model ?? 'unknown', + inputTokens: metadata.promptTokenCount, + outputTokens: metadata.candidatesTokenCount, + cachedTokens: metadata.cachedContentTokenCount, + }; +} diff --git a/packages/core/src/agent/legacy-agent-session.test.ts b/packages/core/src/agent/legacy-agent-session.test.ts new file mode 100644 index 0000000000..7cf1dadf24 --- /dev/null +++ b/packages/core/src/agent/legacy-agent-session.test.ts @@ -0,0 +1,870 @@ +/** + * @license + * Copyright 2026 Google LLC + * SPDX-License-Identifier: Apache-2.0 + */ + +import { describe, expect, it, vi, beforeEach } from 'vitest'; +import { FinishReason } from '@google/genai'; +import { LegacyAgentSession } from './legacy-agent-session.js'; +import type { LegacySessionDeps } from './legacy-agent-session.js'; +import { GeminiEventType } from '../core/turn.js'; +import type { ServerGeminiStreamEvent } from '../core/turn.js'; +import type { AgentEvent } from './types.js'; +import { ToolErrorType } from '../tools/tool-error.js'; +import type { + CompletedToolCall, + ToolCallRequestInfo, +} from '../scheduler/types.js'; +import { CoreToolCallStatus } from '../scheduler/types.js'; + +// --------------------------------------------------------------------------- +// Mock helpers +// --------------------------------------------------------------------------- + +function createMockDeps( + overrides?: Partial, +): LegacySessionDeps { + const mockClient = { + sendMessageStream: vi.fn(), + getChat: vi.fn().mockReturnValue({ + recordCompletedToolCalls: vi.fn(), + }), + getCurrentSequenceModel: vi.fn().mockReturnValue(null), + }; + + const mockScheduler = { + schedule: vi.fn().mockResolvedValue([]), + }; + + const mockConfig = { + getMaxSessionTurns: vi.fn().mockReturnValue(-1), + getModel: vi.fn().mockReturnValue('gemini-2.5-pro'), + }; + + return { + client: mockClient as unknown as LegacySessionDeps['client'], + + scheduler: mockScheduler as unknown as LegacySessionDeps['scheduler'], + + config: mockConfig as unknown as LegacySessionDeps['config'], + promptId: 'test-prompt', + streamId: 'test-stream', + ...overrides, + }; +} + +async function* makeStream( + events: ServerGeminiStreamEvent[], +): AsyncGenerator { + for (const event of events) { + yield event; + } +} + +function makeToolRequest(callId: string, name: string): ToolCallRequestInfo { + return { + callId, + name, + args: {}, + isClientInitiated: false, + prompt_id: 'p1', + }; +} + +function makeCompletedToolCall( + callId: string, + name: string, + responseText: string, +): CompletedToolCall { + return { + status: CoreToolCallStatus.Success, + request: makeToolRequest(callId, name), + response: { + callId, + responseParts: [{ text: responseText }], + resultDisplay: undefined, + error: undefined, + errorType: undefined, + }, + + tool: {} as CompletedToolCall extends { tool: infer T } ? T : never, + + invocation: {} as CompletedToolCall extends { invocation: infer T } + ? T + : never, + } as CompletedToolCall; +} + +async function collectEvents( + session: LegacyAgentSession, +): Promise { + const events: AgentEvent[] = []; + for await (const event of session.stream()) { + events.push(event); + } + return events; +} + +// --------------------------------------------------------------------------- +// Tests +// --------------------------------------------------------------------------- + +describe('LegacyAgentSession', () => { + let deps: LegacySessionDeps; + + beforeEach(() => { + deps = createMockDeps(); + vi.useFakeTimers({ shouldAdvanceTime: true }); + }); + + describe('send', () => { + it('returns streamId', async () => { + const sendMock = deps.client.sendMessageStream as ReturnType< + typeof vi.fn + >; + sendMock.mockReturnValue( + makeStream([ + { type: GeminiEventType.Content, value: 'hello' }, + { + type: GeminiEventType.Finished, + value: { reason: FinishReason.STOP, usageMetadata: undefined }, + }, + ]), + ); + + const session = new LegacyAgentSession(deps); + const result = await session.send({ + message: [{ type: 'text', text: 'hi' }], + }); + + expect(result.streamId).toBe('test-stream'); + }); + + it('throws for non-message payloads', async () => { + const session = new LegacyAgentSession(deps); + await expect(session.send({ update: { title: 'test' } })).rejects.toThrow( + 'only supports message sends', + ); + }); + }); + + describe('stream - basic flow', () => { + it('emits stream_start, content messages, and stream_end', async () => { + const sendMock = deps.client.sendMessageStream as ReturnType< + typeof vi.fn + >; + sendMock.mockReturnValue( + makeStream([ + { type: GeminiEventType.Content, value: 'Hello' }, + { type: GeminiEventType.Content, value: ' World' }, + { + type: GeminiEventType.Finished, + value: { reason: FinishReason.STOP, usageMetadata: undefined }, + }, + ]), + ); + + const session = new LegacyAgentSession(deps); + await session.send({ message: [{ type: 'text', text: 'hi' }] }); + const events = await collectEvents(session); + + const types = events.map((e) => e.type); + expect(types).toContain('stream_start'); + expect(types).toContain('message'); + expect(types).toContain('stream_end'); + + const messages = events.filter( + (e): e is AgentEvent<'message'> => + e.type === 'message' && e.role === 'agent', + ); + expect(messages).toHaveLength(2); + expect(messages[0]?.content).toEqual([{ type: 'text', text: 'Hello' }]); + + const streamEnd = events.find( + (e): e is AgentEvent<'stream_end'> => e.type === 'stream_end', + ); + expect(streamEnd?.reason).toBe('completed'); + }); + }); + + describe('stream - tool calls', () => { + it('handles a tool call round-trip', async () => { + const sendMock = deps.client.sendMessageStream as ReturnType< + typeof vi.fn + >; + // First turn: model requests a tool + sendMock.mockReturnValueOnce( + makeStream([ + { + type: GeminiEventType.ToolCallRequest, + value: makeToolRequest('call-1', 'read_file'), + }, + { + type: GeminiEventType.Finished, + value: { reason: FinishReason.STOP, usageMetadata: undefined }, + }, + ]), + ); + // Second turn: model provides final answer + sendMock.mockReturnValueOnce( + makeStream([ + { type: GeminiEventType.Content, value: 'Done!' }, + { + type: GeminiEventType.Finished, + value: { reason: FinishReason.STOP, usageMetadata: undefined }, + }, + ]), + ); + + const scheduleMock = deps.scheduler.schedule as ReturnType; + scheduleMock.mockResolvedValueOnce([ + makeCompletedToolCall('call-1', 'read_file', 'file contents'), + ]); + + const session = new LegacyAgentSession(deps); + await session.send({ message: [{ type: 'text', text: 'read a file' }] }); + const events = await collectEvents(session); + + const types = events.map((e) => e.type); + expect(types).toContain('tool_request'); + expect(types).toContain('tool_response'); + expect(types).toContain('stream_end'); + + const toolReq = events.find( + (e): e is AgentEvent<'tool_request'> => e.type === 'tool_request', + ); + expect(toolReq?.name).toBe('read_file'); + + const toolResp = events.find( + (e): e is AgentEvent<'tool_response'> => e.type === 'tool_response', + ); + expect(toolResp?.name).toBe('read_file'); + expect(toolResp?.content).toEqual([ + { type: 'text', text: 'file contents' }, + ]); + expect(toolResp?.isError).toBe(false); + + // Should have called sendMessageStream twice + expect(sendMock).toHaveBeenCalledTimes(2); + }); + + it('handles tool errors and sends error message in content', async () => { + const sendMock = deps.client.sendMessageStream as ReturnType< + typeof vi.fn + >; + sendMock.mockReturnValueOnce( + makeStream([ + { + type: GeminiEventType.ToolCallRequest, + value: makeToolRequest('call-1', 'write_file'), + }, + { + type: GeminiEventType.Finished, + value: { reason: FinishReason.STOP, usageMetadata: undefined }, + }, + ]), + ); + sendMock.mockReturnValueOnce( + makeStream([ + { type: GeminiEventType.Content, value: 'Failed' }, + { + type: GeminiEventType.Finished, + value: { reason: FinishReason.STOP, usageMetadata: undefined }, + }, + ]), + ); + + const errorToolCall: CompletedToolCall = { + status: CoreToolCallStatus.Error, + request: makeToolRequest('call-1', 'write_file'), + response: { + callId: 'call-1', + responseParts: [{ text: 'stale' }], + resultDisplay: 'Error display', + error: new Error('Permission denied'), + errorType: 'permission_denied', + }, + } as CompletedToolCall; + + const scheduleMock = deps.scheduler.schedule as ReturnType; + scheduleMock.mockResolvedValueOnce([errorToolCall]); + + const session = new LegacyAgentSession(deps); + await session.send({ + message: [{ type: 'text', text: 'write file' }], + }); + const events = await collectEvents(session); + + const toolResp = events.find( + (e): e is AgentEvent<'tool_response'> => e.type === 'tool_response', + ); + expect(toolResp?.isError).toBe(true); + // Uses error.message, not responseParts + expect(toolResp?.content).toEqual([ + { type: 'text', text: 'Permission denied' }, + ]); + expect(toolResp?.displayContent).toEqual([ + { type: 'text', text: 'Error display' }, + ]); + }); + + it('stops on STOP_EXECUTION tool error', async () => { + const sendMock = deps.client.sendMessageStream as ReturnType< + typeof vi.fn + >; + sendMock.mockReturnValueOnce( + makeStream([ + { + type: GeminiEventType.ToolCallRequest, + value: makeToolRequest('call-1', 'dangerous_tool'), + }, + { + type: GeminiEventType.Finished, + value: { reason: FinishReason.STOP, usageMetadata: undefined }, + }, + ]), + ); + + const stopToolCall: CompletedToolCall = { + status: CoreToolCallStatus.Error, + request: makeToolRequest('call-1', 'dangerous_tool'), + response: { + callId: 'call-1', + responseParts: [], + resultDisplay: undefined, + error: new Error('Stopped by policy'), + errorType: ToolErrorType.STOP_EXECUTION, + }, + } as CompletedToolCall; + + const scheduleMock = deps.scheduler.schedule as ReturnType; + scheduleMock.mockResolvedValueOnce([stopToolCall]); + + const session = new LegacyAgentSession(deps); + await session.send({ + message: [{ type: 'text', text: 'do something' }], + }); + const events = await collectEvents(session); + + const streamEnd = events.find( + (e): e is AgentEvent<'stream_end'> => e.type === 'stream_end', + ); + expect(streamEnd?.reason).toBe('completed'); + // Should NOT make a second call + expect(sendMock).toHaveBeenCalledTimes(1); + }); + }); + + describe('stream - terminal events', () => { + it('handles AgentExecutionStopped', async () => { + const sendMock = deps.client.sendMessageStream as ReturnType< + typeof vi.fn + >; + sendMock.mockReturnValue( + makeStream([ + { + type: GeminiEventType.AgentExecutionStopped, + value: { reason: 'hook', systemMessage: 'Halted by hook' }, + }, + ]), + ); + + const session = new LegacyAgentSession(deps); + await session.send({ message: [{ type: 'text', text: 'hi' }] }); + const events = await collectEvents(session); + + const streamEnd = events.find( + (e): e is AgentEvent<'stream_end'> => e.type === 'stream_end', + ); + expect(streamEnd?.reason).toBe('completed'); + expect(streamEnd?.data).toEqual({ message: 'Halted by hook' }); + }); + + it('handles AgentExecutionBlocked as non-terminal and continues the stream', async () => { + const sendMock = deps.client.sendMessageStream as ReturnType< + typeof vi.fn + >; + sendMock.mockReturnValue( + makeStream([ + { + type: GeminiEventType.AgentExecutionBlocked, + value: { reason: 'Blocked by hook' }, + }, + { type: GeminiEventType.Content, value: 'Final answer' }, + { + type: GeminiEventType.Finished, + value: { reason: FinishReason.STOP, usageMetadata: undefined }, + }, + ]), + ); + + const session = new LegacyAgentSession(deps); + await session.send({ message: [{ type: 'text', text: 'hi' }] }); + const events = await collectEvents(session); + + const blocked = events.find( + (e): e is AgentEvent<'error'> => + e.type === 'error' && e._meta?.['code'] === 'AGENT_EXECUTION_BLOCKED', + ); + expect(blocked?.fatal).toBe(false); + expect(blocked?.message).toBe('Agent execution blocked: Blocked by hook'); + + const messages = events.filter( + (e): e is AgentEvent<'message'> => + e.type === 'message' && e.role === 'agent', + ); + expect( + messages.some( + (message) => + message.content[0]?.type === 'text' && + message.content[0].text === 'Final answer', + ), + ).toBe(true); + + const streamEnd = events.find( + (e): e is AgentEvent<'stream_end'> => e.type === 'stream_end', + ); + expect(streamEnd?.reason).toBe('completed'); + }); + + it('handles Error events', async () => { + const sendMock = deps.client.sendMessageStream as ReturnType< + typeof vi.fn + >; + sendMock.mockReturnValue( + makeStream([ + { + type: GeminiEventType.Error, + value: { error: new Error('API error') }, + }, + ]), + ); + + const session = new LegacyAgentSession(deps); + await session.send({ message: [{ type: 'text', text: 'hi' }] }); + const events = await collectEvents(session); + + const err = events.find( + (e): e is AgentEvent<'error'> => e.type === 'error', + ); + expect(err?.message).toBe('API error'); + expect(events.some((e) => e.type === 'stream_end')).toBe(true); + }); + + it('handles LoopDetected as non-terminal custom event', async () => { + const sendMock = deps.client.sendMessageStream as ReturnType< + typeof vi.fn + >; + // LoopDetected followed by more content — stream continues + sendMock.mockReturnValue( + makeStream([ + { type: GeminiEventType.LoopDetected }, + { type: GeminiEventType.Content, value: 'continuing after loop' }, + { + type: GeminiEventType.Finished, + value: { reason: FinishReason.STOP, usageMetadata: undefined }, + }, + ]), + ); + + const session = new LegacyAgentSession(deps); + await session.send({ message: [{ type: 'text', text: 'hi' }] }); + const events = await collectEvents(session); + + // Should have a custom loop_detected event + const custom = events.find( + (e): e is AgentEvent<'custom'> => + e.type === 'custom' && e.kind === 'loop_detected', + ); + expect(custom).toBeDefined(); + + // Stream should have continued — content after loop detected + const messages = events.filter( + (e): e is AgentEvent<'message'> => + e.type === 'message' && e.role === 'agent', + ); + expect( + messages.some( + (m) => + m.content[0]?.type === 'text' && + m.content[0].text === 'continuing after loop', + ), + ).toBe(true); + + // Should still end with stream_end completed + const streamEnd = events.find( + (e): e is AgentEvent<'stream_end'> => e.type === 'stream_end', + ); + expect(streamEnd?.reason).toBe('completed'); + }); + }); + + describe('stream - max turns', () => { + it('emits stream_end with max_turns when the session turn limit is exceeded', async () => { + const configMock = deps.config.getMaxSessionTurns as ReturnType< + typeof vi.fn + >; + configMock.mockReturnValue(0); + + const sendMock = deps.client.sendMessageStream as ReturnType< + typeof vi.fn + >; + sendMock.mockReturnValue( + makeStream([ + { type: GeminiEventType.Content, value: 'should not be reached' }, + ]), + ); + + const session = new LegacyAgentSession(deps); + await session.send({ message: [{ type: 'text', text: 'hi' }] }); + const events = await collectEvents(session); + + const streamEnd = events.find( + (e): e is AgentEvent<'stream_end'> => e.type === 'stream_end', + ); + expect(streamEnd?.reason).toBe('max_turns'); + expect(streamEnd?.data).toEqual({ + code: 'MAX_TURNS_EXCEEDED', + maxTurns: 0, + turnCount: 0, + }); + expect(sendMock).not.toHaveBeenCalled(); + }); + + it('treats GeminiClient MaxSessionTurns as a non-terminal warning event', async () => { + const sendMock = deps.client.sendMessageStream as ReturnType< + typeof vi.fn + >; + sendMock.mockReturnValue( + makeStream([ + { type: GeminiEventType.MaxSessionTurns }, + { + type: GeminiEventType.Finished, + value: { reason: FinishReason.STOP, usageMetadata: undefined }, + }, + ]), + ); + + const session = new LegacyAgentSession(deps); + await session.send({ message: [{ type: 'text', text: 'hi' }] }); + const events = await collectEvents(session); + + const warning = events.find( + (e): e is AgentEvent<'error'> => + e.type === 'error' && e._meta?.['code'] === 'MAX_TURNS_EXCEEDED', + ); + expect(warning?.fatal).toBe(false); + + const streamEnds = events.filter( + (e): e is AgentEvent<'stream_end'> => e.type === 'stream_end', + ); + const streamEnd = streamEnds[streamEnds.length - 1]; + expect(streamEnd?.reason).toBe('completed'); + }); + }); + + describe('abort', () => { + it('aborts the stream', async () => { + const sendMock = deps.client.sendMessageStream as ReturnType< + typeof vi.fn + >; + // Stream that yields content then checks abort signal via a deferred + let resolveHang: (() => void) | undefined; + sendMock.mockReturnValue( + (async function* () { + yield { + type: GeminiEventType.Content, + value: 'start', + } as ServerGeminiStreamEvent; + // Wait until externally resolved (by abort) + await new Promise((resolve) => { + resolveHang = resolve; + }); + yield { + type: GeminiEventType.Finished, + value: { reason: FinishReason.STOP, usageMetadata: undefined }, + } as ServerGeminiStreamEvent; + })(), + ); + + const session = new LegacyAgentSession(deps); + await session.send({ message: [{ type: 'text', text: 'hi' }] }); + + // Give the loop time to start processing + await new Promise((r) => setTimeout(r, 50)); + + // Abort and resolve the hang so the generator can finish + await session.abort(); + resolveHang?.(); + + // Collect all events + const events = await collectEvents(session); + + const streamEnd = events.find( + (e): e is AgentEvent<'stream_end'> => e.type === 'stream_end', + ); + expect(streamEnd?.reason).toBe('aborted'); + }); + }); + + describe('events property', () => { + it('accumulates all events', async () => { + const sendMock = deps.client.sendMessageStream as ReturnType< + typeof vi.fn + >; + sendMock.mockReturnValue( + makeStream([ + { type: GeminiEventType.Content, value: 'hi' }, + { + type: GeminiEventType.Finished, + value: { reason: FinishReason.STOP, usageMetadata: undefined }, + }, + ]), + ); + + const session = new LegacyAgentSession(deps); + await session.send({ message: [{ type: 'text', text: 'hi' }] }); + await collectEvents(session); + + expect(session.events.length).toBeGreaterThan(0); + expect(session.events[0]?.type).toBe('stream_start'); + }); + }); + + describe('stream_end ordering', () => { + it('stream_end is always the final event yielded', async () => { + const sendMock = deps.client.sendMessageStream as ReturnType< + typeof vi.fn + >; + sendMock.mockReturnValue( + makeStream([ + { type: GeminiEventType.Content, value: 'Hello' }, + { + type: GeminiEventType.Finished, + value: { reason: FinishReason.STOP, usageMetadata: undefined }, + }, + ]), + ); + + const session = new LegacyAgentSession(deps); + await session.send({ message: [{ type: 'text', text: 'hi' }] }); + const events = await collectEvents(session); + + expect(events.length).toBeGreaterThan(0); + expect(events[events.length - 1]?.type).toBe('stream_end'); + }); + + it('stream_end is final even after error events', async () => { + const sendMock = deps.client.sendMessageStream as ReturnType< + typeof vi.fn + >; + sendMock.mockReturnValue( + makeStream([ + { + type: GeminiEventType.Error, + value: { error: new Error('API error') }, + }, + ]), + ); + + const session = new LegacyAgentSession(deps); + await session.send({ message: [{ type: 'text', text: 'hi' }] }); + const events = await collectEvents(session); + + expect(events[events.length - 1]?.type).toBe('stream_end'); + }); + }); + + describe('intermediate Finished events', () => { + it('does NOT emit stream_end when tool calls are pending', async () => { + const sendMock = deps.client.sendMessageStream as ReturnType< + typeof vi.fn + >; + // First turn: tool request + Finished (should NOT produce stream_end) + sendMock.mockReturnValueOnce( + makeStream([ + { + type: GeminiEventType.ToolCallRequest, + value: makeToolRequest('call-1', 'read_file'), + }, + { + type: GeminiEventType.Finished, + value: { + reason: FinishReason.STOP, + usageMetadata: { + promptTokenCount: 50, + candidatesTokenCount: 20, + }, + }, + }, + ]), + ); + // Second turn: final answer + sendMock.mockReturnValueOnce( + makeStream([ + { type: GeminiEventType.Content, value: 'Answer' }, + { + type: GeminiEventType.Finished, + value: { reason: FinishReason.STOP, usageMetadata: undefined }, + }, + ]), + ); + + const scheduleMock = deps.scheduler.schedule as ReturnType; + scheduleMock.mockResolvedValueOnce([ + makeCompletedToolCall('call-1', 'read_file', 'data'), + ]); + + const session = new LegacyAgentSession(deps); + await session.send({ message: [{ type: 'text', text: 'do it' }] }); + const events = await collectEvents(session); + + // Only one stream_end at the very end + const streamEnds = events.filter((e) => e.type === 'stream_end'); + expect(streamEnds).toHaveLength(1); + expect(streamEnds[0]).toBe(events[events.length - 1]); + }); + + it('emits usage for intermediate Finished events', async () => { + const sendMock = deps.client.sendMessageStream as ReturnType< + typeof vi.fn + >; + sendMock.mockReturnValueOnce( + makeStream([ + { + type: GeminiEventType.ToolCallRequest, + value: makeToolRequest('call-1', 'read_file'), + }, + { + type: GeminiEventType.Finished, + value: { + reason: FinishReason.STOP, + usageMetadata: { + promptTokenCount: 100, + candidatesTokenCount: 30, + }, + }, + }, + ]), + ); + sendMock.mockReturnValueOnce( + makeStream([ + { type: GeminiEventType.Content, value: 'Done' }, + { + type: GeminiEventType.Finished, + value: { reason: FinishReason.STOP, usageMetadata: undefined }, + }, + ]), + ); + + const scheduleMock = deps.scheduler.schedule as ReturnType; + scheduleMock.mockResolvedValueOnce([ + makeCompletedToolCall('call-1', 'read_file', 'contents'), + ]); + + const session = new LegacyAgentSession(deps); + await session.send({ message: [{ type: 'text', text: 'go' }] }); + const events = await collectEvents(session); + + // Should have at least one usage event from the intermediate Finished + const usageEvents = events.filter( + (e): e is AgentEvent<'usage'> => e.type === 'usage', + ); + expect(usageEvents.length).toBeGreaterThanOrEqual(1); + expect(usageEvents[0]?.inputTokens).toBe(100); + expect(usageEvents[0]?.outputTokens).toBe(30); + }); + }); + + describe('error handling in runLoop', () => { + it('catches thrown errors and emits error + stream_end', async () => { + const sendMock = deps.client.sendMessageStream as ReturnType< + typeof vi.fn + >; + sendMock.mockImplementation(() => { + throw new Error('Connection refused'); + }); + + const session = new LegacyAgentSession(deps); + await session.send({ message: [{ type: 'text', text: 'hi' }] }); + const events = await collectEvents(session); + + const err = events.find( + (e): e is AgentEvent<'error'> => e.type === 'error', + ); + expect(err?.message).toBe('Connection refused'); + expect(err?.fatal).toBe(true); + + const streamEnd = events.find( + (e): e is AgentEvent<'stream_end'> => e.type === 'stream_end', + ); + expect(streamEnd?.reason).toBe('failed'); + }); + }); + + describe('_emitErrorAndStreamEnd metadata', () => { + it('preserves exitCode and code in _meta for FatalError', async () => { + const sendMock = deps.client.sendMessageStream as ReturnType< + typeof vi.fn + >; + // Simulate a FatalError being thrown + const { FatalError } = await import('../utils/errors.js'); + sendMock.mockImplementation(() => { + throw new FatalError('Disk full', 44); + }); + + const session = new LegacyAgentSession(deps); + await session.send({ message: [{ type: 'text', text: 'hi' }] }); + const events = await collectEvents(session); + + const err = events.find( + (e): e is AgentEvent<'error'> => e.type === 'error', + ); + expect(err?.message).toBe('Disk full'); + expect(err?.fatal).toBe(true); + expect(err?._meta?.['exitCode']).toBe(44); + expect(err?._meta?.['errorName']).toBe('FatalError'); + }); + + it('preserves exitCode for non-FatalError errors that carry one', async () => { + const sendMock = deps.client.sendMessageStream as ReturnType< + typeof vi.fn + >; + const exitCodeError = new Error('custom exit'); + (exitCodeError as Error & { exitCode: number }).exitCode = 17; + sendMock.mockImplementation(() => { + throw exitCodeError; + }); + + const session = new LegacyAgentSession(deps); + await session.send({ message: [{ type: 'text', text: 'hi' }] }); + const events = await collectEvents(session); + + const err = events.find( + (e): e is AgentEvent<'error'> => e.type === 'error', + ); + expect(err?._meta?.['exitCode']).toBe(17); + }); + + it('preserves code in _meta for errors with code property', async () => { + const sendMock = deps.client.sendMessageStream as ReturnType< + typeof vi.fn + >; + const codedError = new Error('ENOENT'); + (codedError as Error & { code: string }).code = 'ENOENT'; + sendMock.mockImplementation(() => { + throw codedError; + }); + + const session = new LegacyAgentSession(deps); + await session.send({ message: [{ type: 'text', text: 'hi' }] }); + const events = await collectEvents(session); + + const err = events.find( + (e): e is AgentEvent<'error'> => e.type === 'error', + ); + expect(err?._meta?.['code']).toBe('ENOENT'); + }); + }); +}); diff --git a/packages/core/src/agent/legacy-agent-session.ts b/packages/core/src/agent/legacy-agent-session.ts new file mode 100644 index 0000000000..b808e7a723 --- /dev/null +++ b/packages/core/src/agent/legacy-agent-session.ts @@ -0,0 +1,461 @@ +/** + * @license + * Copyright 2026 Google LLC + * SPDX-License-Identifier: Apache-2.0 + */ + +/** + * @fileoverview LegacyAgentSession — owns the agentic loop (send + tool + * scheduling + multi-turn), translating all events to AgentEvents. + */ + +import { GeminiEventType } from '../core/turn.js'; +import type { GeminiClient } from '../core/client.js'; +import type { Scheduler } from '../scheduler/scheduler.js'; +import type { Config } from '../config/config.js'; +import type { ToolCallRequestInfo } from '../scheduler/types.js'; +import { ToolErrorType, isFatalToolError } from '../tools/tool-error.js'; +import { recordToolCallInteractions } from '../code_assist/telemetry.js'; +import { debugLogger } from '../utils/debugLogger.js'; +import { + translateEvent, + createTranslationState, + type TranslationState, +} from './event-translator.js'; +import { + geminiPartsToContentParts, + contentPartsToGeminiParts, + toolResultDisplayToContentParts, + buildToolResponseData, +} from './content-utils.js'; +import type { + AgentEvent, + AgentSession, + AgentSend, + ContentPart, + StreamEndReason, +} from './types.js'; + +export interface LegacySessionDeps { + client: GeminiClient; + scheduler: Scheduler; + config: Config; + promptId: string; + streamId?: string; +} + +// --------------------------------------------------------------------------- +// LegacyAgentSession +// --------------------------------------------------------------------------- + +export class LegacyAgentSession implements AgentSession { + private _events: AgentEvent[] = []; + private _translationState: TranslationState; + private _subscribers: Set<() => void> = new Set(); + private _streamDone: boolean = false; + private _streamEndEmitted: boolean = false; + private _abortController: AbortController = new AbortController(); + + private readonly _client: GeminiClient; + private readonly _scheduler: Scheduler; + private readonly _config: Config; + private readonly _promptId: string; + + constructor(deps: LegacySessionDeps) { + this._translationState = createTranslationState(deps.streamId); + this._client = deps.client; + this._scheduler = deps.scheduler; + this._config = deps.config; + this._promptId = deps.promptId; + } + + // --------------------------------------------------------------------------- + // AgentSession interface + // --------------------------------------------------------------------------- + + async send(payload: AgentSend): Promise<{ streamId: string }> { + const message = 'message' in payload ? payload.message : undefined; + if (!message) { + throw new Error('LegacyAgentSession.send() only supports message sends.'); + } + + const parts = contentPartsToGeminiParts(message); + + // Start the loop in the background — don't await + this._runLoop(parts).catch((err: unknown) => { + this._emitErrorAndStreamEnd(err); + }); + + return { streamId: this._translationState.streamId }; + } + + /** + * Returns an async iterator that replays existing events, then live-follows + * new events as they arrive. Terminates after yielding a stream_end event, + * consistent with MockAgentSession behavior. + */ + async *stream(options?: { + streamId?: string; + eventId?: string; + }): AsyncIterableIterator { + let startIndex = 0; + + if (options?.eventId) { + const idx = this._events.findIndex((e) => e.id === options.eventId); + if (idx !== -1) { + startIndex = idx + 1; + } + } + + // Replay existing events + for (let i = startIndex; i < this._events.length; i++) { + const event = this._events[i]; + if (event) { + yield event; + if (event.type === 'stream_end') return; + } + } + + if (this._streamDone) return; + + // Live-follow new events. Drain any buffered events after each wake-up, + // even if _streamDone was set between the notification and resumption. + let replayedUpTo = this._events.length; + while (true) { + // Wait for new events or stream completion + if (replayedUpTo >= this._events.length && !this._streamDone) { + await new Promise((resolve) => { + if (this._events.length > replayedUpTo || this._streamDone) { + resolve(); + return; + } + const handler = (): void => { + this._subscribers.delete(handler); + resolve(); + }; + this._subscribers.add(handler); + }); + } + + // Always drain buffered events before checking _streamDone + while (replayedUpTo < this._events.length) { + const event = this._events[replayedUpTo]; + replayedUpTo++; + if (event) { + yield event; + if (event.type === 'stream_end') return; + } + } + + // Exit only after draining + if (this._streamDone) return; + } + } + + async abort(): Promise { + this._abortController.abort(); + } + + get events(): AgentEvent[] { + return this._events; + } + + // --------------------------------------------------------------------------- + // Core: agentic loop + // --------------------------------------------------------------------------- + + private async _runLoop(initialParts: Part[]): Promise { + let currentParts: Part[] = initialParts; + let turnCount = 0; + const maxTurns = this._config.getMaxSessionTurns(); + + try { + while (true) { + turnCount++; + if (maxTurns >= 0 && turnCount > maxTurns) { + this._ensureStreamStart(); + this._appendAndNotify([ + this._makeInternalEvent('stream_end', { + streamId: this._translationState.streamId, + reason: 'max_turns', + data: { + code: 'MAX_TURNS_EXCEEDED', + maxTurns, + turnCount: turnCount - 1, + }, + }), + ]); + this._markStreamDone(); + return; + } + + const toolCallRequests: ToolCallRequestInfo[] = []; + + const responseStream = this._client.sendMessageStream( + currentParts, + this._abortController.signal, + this._promptId, + ); + + // Process the stream — translate events and collect tool requests + for await (const event of responseStream) { + if (this._abortController.signal.aborted) { + this._ensureStreamStart(); + this._appendAndNotify([ + this._makeInternalEvent('stream_end', { + streamId: this._translationState.streamId, + reason: 'aborted', + }), + ]); + this._markStreamDone(); + return; + } + + // Collect tool call requests BEFORE translating so we can + // decide whether to suppress the Finished event's stream_end. + if (event.type === GeminiEventType.ToolCallRequest) { + toolCallRequests.push(event.value); + } + + // Translate to AgentEvents + const agentEvents = translateEvent(event, this._translationState); + + // Finished events don't mean the session is done — if there are + // pending tool calls, more turns are coming. Suppress stream_end + // from the Finished event in that case (keep usage events). + if ( + event.type === GeminiEventType.Finished && + toolCallRequests.length > 0 + ) { + const filtered = agentEvents.filter((e) => e.type !== 'stream_end'); + this._appendAndNotify(filtered); + } else { + this._appendAndNotify(agentEvents); + } + + // Error events → abort the loop + if (event.type === GeminiEventType.Error) { + this._ensureStreamEnd('failed'); + this._markStreamDone(); + return; + } + + // Fatal error events that translator doesn't emit stream_end for + if ( + event.type === GeminiEventType.InvalidStream || + event.type === GeminiEventType.ContextWindowWillOverflow + ) { + this._ensureStreamEnd('failed'); + this._markStreamDone(); + return; + } + + // Terminal events — translator already emitted stream_end + if ( + event.type === GeminiEventType.AgentExecutionStopped || + event.type === GeminiEventType.UserCancelled + ) { + this._markStreamDone(); + return; + } + // LoopDetected is NOT terminal — the stream continues. + // Consumer handles it (warning in non-interactive, dialog in interactive). + } + + if (toolCallRequests.length === 0) { + this._ensureStreamEnd('completed'); + this._markStreamDone(); + return; + } + + // Schedule tool calls + const completedToolCalls = await this._scheduler.schedule( + toolCallRequests, + this._abortController.signal, + ); + + // Emit tool_response AgentEvents for each completed tool call + const toolResponseParts: Part[] = []; + for (const tc of completedToolCalls) { + const response = tc.response; + const request = tc.request; + + const content: ContentPart[] = response.error + ? [{ type: 'text', text: response.error.message }] + : geminiPartsToContentParts(response.responseParts); + const displayContent = toolResultDisplayToContentParts( + response.resultDisplay, + ); + const data = buildToolResponseData(response); + + this._appendAndNotify([ + this._makeInternalEvent('tool_response', { + requestId: request.callId, + name: request.name, + content, + isError: response.error !== undefined, + ...(displayContent ? { displayContent } : {}), + ...(data ? { data } : {}), + }), + ]); + + if (response.responseParts) { + toolResponseParts.push(...response.responseParts); + } + } + + // Record tool calls in chat history + try { + const currentModel = + this._client.getCurrentSequenceModel() ?? this._config.getModel(); + this._client + .getChat() + .recordCompletedToolCalls(currentModel, completedToolCalls); + + await recordToolCallInteractions(this._config, completedToolCalls); + } catch (error) { + debugLogger.error( + `Error recording completed tool call information: ${error}`, + ); + } + + // Check if a tool requested stop execution + const stopTool = completedToolCalls.find( + (tc) => + tc.response.errorType === ToolErrorType.STOP_EXECUTION && + tc.response.error !== undefined, + ); + if (stopTool) { + this._ensureStreamEnd('completed'); + this._markStreamDone(); + return; + } + + // Check for fatal tool errors + const fatalTool = completedToolCalls.find((tc) => + isFatalToolError(tc.response.errorType), + ); + if (fatalTool) { + const msg = fatalTool.response.error?.message ?? 'Fatal tool error'; + this._appendAndNotify([ + this._makeInternalEvent('error', { + status: 'INTERNAL', + message: `Fatal tool error (${fatalTool.request.name}): ${msg}`, + fatal: true, + }), + ]); + this._ensureStreamEnd('failed'); + this._markStreamDone(); + return; + } + + // Feed tool results back for next turn + currentParts = toolResponseParts; + } + } catch (err: unknown) { + this._emitErrorAndStreamEnd(err); + this._markStreamDone(); + } + } + + // --------------------------------------------------------------------------- + // Internal helpers + // --------------------------------------------------------------------------- + + /** Sets _streamDone and notifies subscribers so the stream iterator can exit. */ + private _markStreamDone(): void { + this._streamDone = true; + this._notifySubscribers(); + } + + private _appendAndNotify(events: AgentEvent[]): void { + for (const event of events) { + this._events.push(event); + if (event.type === 'stream_end') { + this._streamEndEmitted = true; + } + } + if (events.length > 0) { + this._notifySubscribers(); + } + } + + private _notifySubscribers(): void { + for (const handler of this._subscribers) { + handler(); + } + } + + private _ensureStreamStart(): void { + if (!this._translationState.streamStartEmitted) { + const startEvent = this._makeInternalEvent('stream_start', { + streamId: this._translationState.streamId, + }); + this._events.push(startEvent); + this._translationState.streamStartEmitted = true; + this._notifySubscribers(); + } + } + + private _ensureStreamEnd(reason: StreamEndReason = 'completed'): void { + if (!this._streamEndEmitted && this._translationState.streamStartEmitted) { + this._streamEndEmitted = true; + const endEvent = this._makeInternalEvent('stream_end', { + streamId: this._translationState.streamId, + reason, + }); + this._events.push(endEvent); + this._notifySubscribers(); + } + } + + /** + * Review fix #4: Preserves error metadata (name, exitCode, stack) in _meta + * so downstream consumers can reconstruct proper error types. + */ + private _emitErrorAndStreamEnd(err: unknown): void { + const message = err instanceof Error ? err.message : String(err); + + this._ensureStreamStart(); + + const meta: Record = {}; + if (err instanceof Error) { + meta['errorName'] = err.constructor.name; + if ('exitCode' in err && typeof err.exitCode === 'number') { + meta['exitCode'] = err.exitCode; + } + if ('code' in err) { + meta['code'] = err.code; + } + } + + const errorEvent = this._makeInternalEvent('error', { + status: 'INTERNAL' as const, + message, + fatal: true, + ...(Object.keys(meta).length > 0 ? { _meta: meta } : {}), + }); + this._events.push(errorEvent); + + this._ensureStreamEnd('failed'); + this._notifySubscribers(); + } + + private _makeInternalEvent( + type: AgentEvent['type'], + payload: Partial, + ): AgentEvent { + const id = `${this._translationState.streamId}-${this._translationState.eventCounter++}`; + // eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion -- constructing AgentEvent from common fields + payload + return { + ...payload, + id, + timestamp: new Date().toISOString(), + streamId: this._translationState.streamId, + type, + } as AgentEvent; + } +} + +// Re-export Part type alias for internal use (avoids importing @google/genai directly) +type Part = import('@google/genai').Part; diff --git a/packages/core/src/agent/types.ts b/packages/core/src/agent/types.ts index 8b698a8e48..0f89796918 100644 --- a/packages/core/src/agent/types.ts +++ b/packages/core/src/agent/types.ts @@ -79,9 +79,16 @@ export type AgentEventData< EventType extends keyof AgentEvents = keyof AgentEvents, > = AgentEvents[EventType] & { type: EventType }; +/** + * Mapped type that produces a proper discriminated union when `EventType` is + * the default (all keys), enabling `switch (event.type)` narrowing. + * When a specific EventType is provided, resolves to a single variant. + */ export type AgentEvent< EventType extends keyof AgentEvents = keyof AgentEvents, -> = AgentEventCommon & AgentEventData; +> = { + [K in EventType]: AgentEventCommon & AgentEvents[K] & { type: K }; +}[EventType]; export interface AgentEvents { /** MUST be the first event emitted in a session. */ @@ -261,7 +268,7 @@ export interface StreamStart { streamId: string; } -type StreamEndReason = +export type StreamEndReason = | 'completed' | 'failed' | 'aborted' diff --git a/packages/core/src/index.ts b/packages/core/src/index.ts index 47412dd73c..a689fc601c 100644 --- a/packages/core/src/index.ts +++ b/packages/core/src/index.ts @@ -179,6 +179,28 @@ export * from './agents/agentLoader.js'; export * from './agents/local-executor.js'; export * from './agents/agent-scheduler.js'; +// Export agent session interface +export * from './agent/legacy-agent-session.js'; +export * from './agent/event-translator.js'; +export * from './agent/content-utils.js'; +// Agent event types — namespaced to avoid collisions with existing exports +export type { + AgentEvent, + AgentEventCommon, + AgentEventData, + AgentEvents as AgentEventMap, + AgentSend, + AgentSession, + ContentPart, + ErrorData, + StreamEnd, + StreamEndReason, + StreamStart, + Trajectory, + Usage as AgentUsage, + WithMeta, +} from './agent/types.js'; + // Export specific tool logic export * from './tools/read-file.js'; export * from './tools/ls.js';