diff --git a/packages/core/src/agent/legacy-agent-session.test.ts b/packages/core/src/agent/legacy-agent-session.test.ts new file mode 100644 index 0000000000..438b1e5ef0 --- /dev/null +++ b/packages/core/src/agent/legacy-agent-session.test.ts @@ -0,0 +1,1417 @@ +/** + * @license + * Copyright 2026 Google LLC + * SPDX-License-Identifier: Apache-2.0 + */ + +import { describe, expect, it, vi, beforeEach } from 'vitest'; +import { FinishReason } from '@google/genai'; +import { LegacyAgentSession } from './legacy-agent-session.js'; +import type { LegacyAgentSessionDeps } from './legacy-agent-session.js'; +import { GeminiEventType } from '../core/turn.js'; +import type { ServerGeminiStreamEvent } from '../core/turn.js'; +import type { AgentEvent } from './types.js'; +import { ToolErrorType } from '../tools/tool-error.js'; +import type { + CompletedToolCall, + ToolCallRequestInfo, +} from '../scheduler/types.js'; +import { CoreToolCallStatus } from '../scheduler/types.js'; + +// --------------------------------------------------------------------------- +// Mock helpers +// --------------------------------------------------------------------------- + +function createMockDeps( + overrides?: Partial, +): LegacyAgentSessionDeps { + const mockClient = { + sendMessageStream: vi.fn(), + getChat: vi.fn().mockReturnValue({ + recordCompletedToolCalls: vi.fn(), + }), + getCurrentSequenceModel: vi.fn().mockReturnValue(null), + }; + + const mockScheduler = { + schedule: vi.fn().mockResolvedValue([]), + }; + + const mockConfig = { + getMaxSessionTurns: vi.fn().mockReturnValue(-1), + getModel: vi.fn().mockReturnValue('gemini-2.5-pro'), + }; + + return { + client: mockClient as unknown as LegacyAgentSessionDeps['client'], + + scheduler: mockScheduler as unknown as LegacyAgentSessionDeps['scheduler'], + + config: mockConfig as unknown as LegacyAgentSessionDeps['config'], + promptId: 'test-prompt', + streamId: 'test-stream', + ...overrides, + }; +} + +async function* makeStream( + events: ServerGeminiStreamEvent[], +): AsyncGenerator { + for (const event of events) { + yield event; + } +} + +function makeToolRequest(callId: string, name: string): ToolCallRequestInfo { + return { + callId, + name, + args: {}, + isClientInitiated: false, + prompt_id: 'p1', + }; +} + +function makeCompletedToolCall( + callId: string, + name: string, + responseText: string, +): CompletedToolCall { + return { + status: CoreToolCallStatus.Success, + request: makeToolRequest(callId, name), + response: { + callId, + responseParts: [{ text: responseText }], + resultDisplay: undefined, + error: undefined, + errorType: undefined, + }, + + tool: {} as CompletedToolCall extends { tool: infer T } ? T : never, + + invocation: {} as CompletedToolCall extends { invocation: infer T } + ? T + : never, + } as CompletedToolCall; +} + +async function collectEvents( + session: LegacyAgentSession, + options?: { streamId?: string; eventId?: string }, +): Promise { + const events: AgentEvent[] = []; + const streamOptions = + options?.eventId || options?.streamId ? options : undefined; + + for await (const event of streamOptions + ? session.stream(streamOptions) + : session.stream()) { + events.push(event); + } + return events; +} + +// --------------------------------------------------------------------------- +// Tests +// --------------------------------------------------------------------------- + +describe('LegacyAgentSession', () => { + let deps: LegacyAgentSessionDeps; + + beforeEach(() => { + deps = createMockDeps(); + vi.useFakeTimers({ shouldAdvanceTime: true }); + }); + + describe('send', () => { + it('returns streamId', async () => { + const sendMock = deps.client.sendMessageStream as ReturnType< + typeof vi.fn + >; + sendMock.mockReturnValue( + makeStream([ + { type: GeminiEventType.Content, value: 'hello' }, + { + type: GeminiEventType.Finished, + value: { reason: FinishReason.STOP, usageMetadata: undefined }, + }, + ]), + ); + + const session = new LegacyAgentSession(deps); + const result = await session.send({ + message: [{ type: 'text', text: 'hi' }], + }); + + expect(result.streamId).toBe('test-stream'); + }); + + it('records the sent user message in the trajectory before send resolves', async () => { + const sendMock = deps.client.sendMessageStream as ReturnType< + typeof vi.fn + >; + sendMock.mockReturnValue( + makeStream([ + { + type: GeminiEventType.Finished, + value: { reason: FinishReason.STOP, usageMetadata: undefined }, + }, + ]), + ); + + const session = new LegacyAgentSession(deps); + const { streamId } = await session.send({ + message: [{ type: 'text', text: 'hi' }], + _meta: { source: 'user-test' }, + }); + + const userMessage = session.events.find( + (e): e is AgentEvent<'message'> => + e.type === 'message' && e.role === 'user' && e.streamId === streamId, + ); + expect(userMessage?.content).toEqual([{ type: 'text', text: 'hi' }]); + expect(userMessage?._meta).toEqual({ source: 'user-test' }); + + await collectEvents(session, { streamId: streamId ?? undefined }); + }); + + it('returns streamId before emitting agent_start', async () => { + const sendMock = deps.client.sendMessageStream as ReturnType< + typeof vi.fn + >; + sendMock.mockReturnValue( + makeStream([ + { + type: GeminiEventType.Finished, + value: { reason: FinishReason.STOP, usageMetadata: undefined }, + }, + ]), + ); + + const session = new LegacyAgentSession(deps); + const liveEvents: AgentEvent[] = []; + session.subscribe((event) => { + liveEvents.push(event); + }); + + const { streamId } = await session.send({ + message: [{ type: 'text', text: 'hi' }], + }); + + expect(streamId).toBe('test-stream'); + expect(liveEvents.some((event) => event.type === 'agent_start')).toBe( + false, + ); + + await collectEvents(session, { streamId: streamId ?? undefined }); + expect(liveEvents.some((event) => event.type === 'agent_start')).toBe( + true, + ); + }); + + it('throws for non-message payloads', async () => { + const session = new LegacyAgentSession(deps); + await expect(session.send({ update: { title: 'test' } })).rejects.toThrow( + 'only supports message sends', + ); + }); + + it('throws if send is called while a stream is active', async () => { + let resolveHang: (() => void) | undefined; + const sendMock = deps.client.sendMessageStream as ReturnType< + typeof vi.fn + >; + sendMock.mockReturnValue( + (async function* () { + await new Promise((resolve) => { + resolveHang = resolve; + }); + yield { + type: GeminiEventType.Finished, + value: { reason: FinishReason.STOP, usageMetadata: undefined }, + } as ServerGeminiStreamEvent; + })(), + ); + + const session = new LegacyAgentSession(deps); + const { streamId } = await session.send({ + message: [{ type: 'text', text: 'first' }], + }); + await vi.advanceTimersByTimeAsync(0); + + await expect( + session.send({ message: [{ type: 'text', text: 'second' }] }), + ).rejects.toThrow('cannot be called while a stream is active'); + + resolveHang?.(); + await collectEvents(session, { streamId: streamId ?? undefined }); + }); + + it('creates a new streamId after the previous stream completes', async () => { + const sendMock = deps.client.sendMessageStream as ReturnType< + typeof vi.fn + >; + sendMock + .mockReturnValueOnce( + makeStream([ + { type: GeminiEventType.Content, value: 'first response' }, + { + type: GeminiEventType.Finished, + value: { reason: FinishReason.STOP, usageMetadata: undefined }, + }, + ]), + ) + .mockReturnValueOnce( + makeStream([ + { type: GeminiEventType.Content, value: 'second response' }, + { + type: GeminiEventType.Finished, + value: { reason: FinishReason.STOP, usageMetadata: undefined }, + }, + ]), + ); + + const session = new LegacyAgentSession(deps); + const first = await session.send({ + message: [{ type: 'text', text: 'first' }], + }); + const firstEvents = await collectEvents(session, { + streamId: first.streamId ?? undefined, + }); + + const second = await session.send({ + message: [{ type: 'text', text: 'second' }], + }); + const secondEvents = await collectEvents(session, { + streamId: second.streamId ?? undefined, + }); + const userMessages = session.events.filter( + (e): e is AgentEvent<'message'> => + e.type === 'message' && e.role === 'user', + ); + + expect(first.streamId).not.toBe(second.streamId); + expect( + userMessages.some( + (e) => + e.streamId === first.streamId && + e.content[0]?.type === 'text' && + e.content[0].text === 'first', + ), + ).toBe(true); + expect( + userMessages.some( + (e) => + e.streamId === second.streamId && + e.content[0]?.type === 'text' && + e.content[0].text === 'second', + ), + ).toBe(true); + expect(firstEvents.some((e) => e.type === 'agent_end')).toBe(true); + expect(secondEvents.some((e) => e.type === 'agent_end')).toBe(true); + }); + }); + + describe('stream - basic flow', () => { + it('emits agent_start, content messages, and agent_end', async () => { + const sendMock = deps.client.sendMessageStream as ReturnType< + typeof vi.fn + >; + sendMock.mockReturnValue( + makeStream([ + { type: GeminiEventType.Content, value: 'Hello' }, + { type: GeminiEventType.Content, value: ' World' }, + { + type: GeminiEventType.Finished, + value: { reason: FinishReason.STOP, usageMetadata: undefined }, + }, + ]), + ); + + const session = new LegacyAgentSession(deps); + await session.send({ message: [{ type: 'text', text: 'hi' }] }); + const events = await collectEvents(session); + + const types = events.map((e) => e.type); + expect(types).toContain('agent_start'); + expect(types).toContain('message'); + expect(types).toContain('agent_end'); + + const messages = events.filter( + (e): e is AgentEvent<'message'> => + e.type === 'message' && e.role === 'agent', + ); + expect(messages).toHaveLength(2); + expect(messages[0]?.content).toEqual([{ type: 'text', text: 'Hello' }]); + + const streamEnd = events.find( + (e): e is AgentEvent<'agent_end'> => e.type === 'agent_end', + ); + expect(streamEnd?.reason).toBe('completed'); + }); + }); + + describe('stream - tool calls', () => { + it('handles a tool call round-trip', async () => { + const sendMock = deps.client.sendMessageStream as ReturnType< + typeof vi.fn + >; + // First turn: model requests a tool + sendMock.mockReturnValueOnce( + makeStream([ + { + type: GeminiEventType.ToolCallRequest, + value: makeToolRequest('call-1', 'read_file'), + }, + { + type: GeminiEventType.Finished, + value: { reason: FinishReason.STOP, usageMetadata: undefined }, + }, + ]), + ); + // Second turn: model provides final answer + sendMock.mockReturnValueOnce( + makeStream([ + { type: GeminiEventType.Content, value: 'Done!' }, + { + type: GeminiEventType.Finished, + value: { reason: FinishReason.STOP, usageMetadata: undefined }, + }, + ]), + ); + + const scheduleMock = deps.scheduler.schedule as ReturnType; + scheduleMock.mockResolvedValueOnce([ + makeCompletedToolCall('call-1', 'read_file', 'file contents'), + ]); + + const session = new LegacyAgentSession(deps); + await session.send({ message: [{ type: 'text', text: 'read a file' }] }); + const events = await collectEvents(session); + + const types = events.map((e) => e.type); + expect(types).toContain('tool_request'); + expect(types).toContain('tool_response'); + expect(types).toContain('agent_end'); + + const toolReq = events.find( + (e): e is AgentEvent<'tool_request'> => e.type === 'tool_request', + ); + expect(toolReq?.name).toBe('read_file'); + + const toolResp = events.find( + (e): e is AgentEvent<'tool_response'> => e.type === 'tool_response', + ); + expect(toolResp?.name).toBe('read_file'); + expect(toolResp?.content).toEqual([ + { type: 'text', text: 'file contents' }, + ]); + expect(toolResp?.isError).toBe(false); + + // Should have called sendMessageStream twice + expect(sendMock).toHaveBeenCalledTimes(2); + }); + + it('handles tool errors and sends error message in content', async () => { + const sendMock = deps.client.sendMessageStream as ReturnType< + typeof vi.fn + >; + sendMock.mockReturnValueOnce( + makeStream([ + { + type: GeminiEventType.ToolCallRequest, + value: makeToolRequest('call-1', 'write_file'), + }, + { + type: GeminiEventType.Finished, + value: { reason: FinishReason.STOP, usageMetadata: undefined }, + }, + ]), + ); + sendMock.mockReturnValueOnce( + makeStream([ + { type: GeminiEventType.Content, value: 'Failed' }, + { + type: GeminiEventType.Finished, + value: { reason: FinishReason.STOP, usageMetadata: undefined }, + }, + ]), + ); + + const errorToolCall: CompletedToolCall = { + status: CoreToolCallStatus.Error, + request: makeToolRequest('call-1', 'write_file'), + response: { + callId: 'call-1', + responseParts: [{ text: 'stale' }], + resultDisplay: 'Error display', + error: new Error('Permission denied'), + errorType: 'permission_denied', + }, + } as CompletedToolCall; + + const scheduleMock = deps.scheduler.schedule as ReturnType; + scheduleMock.mockResolvedValueOnce([errorToolCall]); + + const session = new LegacyAgentSession(deps); + await session.send({ + message: [{ type: 'text', text: 'write file' }], + }); + const events = await collectEvents(session); + + const toolResp = events.find( + (e): e is AgentEvent<'tool_response'> => e.type === 'tool_response', + ); + expect(toolResp?.isError).toBe(true); + // Uses error.message, not responseParts + expect(toolResp?.content).toEqual([ + { type: 'text', text: 'Permission denied' }, + ]); + expect(toolResp?.displayContent).toEqual([ + { type: 'text', text: 'Error display' }, + ]); + }); + + it('stops on STOP_EXECUTION tool error', async () => { + const sendMock = deps.client.sendMessageStream as ReturnType< + typeof vi.fn + >; + sendMock.mockReturnValueOnce( + makeStream([ + { + type: GeminiEventType.ToolCallRequest, + value: makeToolRequest('call-1', 'dangerous_tool'), + }, + { + type: GeminiEventType.Finished, + value: { reason: FinishReason.STOP, usageMetadata: undefined }, + }, + ]), + ); + + const stopToolCall: CompletedToolCall = { + status: CoreToolCallStatus.Error, + request: makeToolRequest('call-1', 'dangerous_tool'), + response: { + callId: 'call-1', + responseParts: [], + resultDisplay: undefined, + error: new Error('Stopped by policy'), + errorType: ToolErrorType.STOP_EXECUTION, + }, + } as CompletedToolCall; + + const scheduleMock = deps.scheduler.schedule as ReturnType; + scheduleMock.mockResolvedValueOnce([stopToolCall]); + + const session = new LegacyAgentSession(deps); + await session.send({ + message: [{ type: 'text', text: 'do something' }], + }); + const events = await collectEvents(session); + + const streamEnd = events.find( + (e): e is AgentEvent<'agent_end'> => e.type === 'agent_end', + ); + expect(streamEnd?.reason).toBe('completed'); + // Should NOT make a second call + expect(sendMock).toHaveBeenCalledTimes(1); + }); + + it('treats fatal tool errors as tool_response followed by agent_end failed', async () => { + const sendMock = deps.client.sendMessageStream as ReturnType< + typeof vi.fn + >; + sendMock.mockReturnValueOnce( + makeStream([ + { + type: GeminiEventType.ToolCallRequest, + value: makeToolRequest('call-1', 'write_file'), + }, + { + type: GeminiEventType.Finished, + value: { reason: FinishReason.STOP, usageMetadata: undefined }, + }, + ]), + ); + + const fatalToolCall: CompletedToolCall = { + status: CoreToolCallStatus.Error, + request: makeToolRequest('call-1', 'write_file'), + response: { + callId: 'call-1', + responseParts: [], + resultDisplay: undefined, + error: new Error('Disk full'), + errorType: ToolErrorType.NO_SPACE_LEFT, + }, + } as CompletedToolCall; + + const scheduleMock = deps.scheduler.schedule as ReturnType; + scheduleMock.mockResolvedValueOnce([fatalToolCall]); + + const session = new LegacyAgentSession(deps); + await session.send({ + message: [{ type: 'text', text: 'write file' }], + }); + const events = await collectEvents(session); + + const toolResp = events.find( + (e): e is AgentEvent<'tool_response'> => e.type === 'tool_response', + ); + expect(toolResp?.isError).toBe(true); + expect(toolResp?.content).toEqual([{ type: 'text', text: 'Disk full' }]); + expect( + events.some( + (e): e is AgentEvent<'error'> => + e.type === 'error' && e.fatal === true, + ), + ).toBe(false); + + const streamEnd = events.findLast( + (e): e is AgentEvent<'agent_end'> => e.type === 'agent_end', + ); + expect(streamEnd?.reason).toBe('failed'); + expect(sendMock).toHaveBeenCalledTimes(1); + }); + }); + + describe('stream - terminal events', () => { + it('handles AgentExecutionStopped', async () => { + const sendMock = deps.client.sendMessageStream as ReturnType< + typeof vi.fn + >; + sendMock.mockReturnValue( + makeStream([ + { + type: GeminiEventType.AgentExecutionStopped, + value: { reason: 'hook', systemMessage: 'Halted by hook' }, + }, + ]), + ); + + const session = new LegacyAgentSession(deps); + await session.send({ message: [{ type: 'text', text: 'hi' }] }); + const events = await collectEvents(session); + + const streamEnd = events.find( + (e): e is AgentEvent<'agent_end'> => e.type === 'agent_end', + ); + expect(streamEnd?.reason).toBe('completed'); + expect(streamEnd?.data).toEqual({ message: 'Halted by hook' }); + }); + + it('handles AgentExecutionBlocked as non-terminal and continues the stream', async () => { + const sendMock = deps.client.sendMessageStream as ReturnType< + typeof vi.fn + >; + sendMock.mockReturnValue( + makeStream([ + { + type: GeminiEventType.AgentExecutionBlocked, + value: { reason: 'Blocked by hook' }, + }, + { type: GeminiEventType.Content, value: 'Final answer' }, + { + type: GeminiEventType.Finished, + value: { reason: FinishReason.STOP, usageMetadata: undefined }, + }, + ]), + ); + + const session = new LegacyAgentSession(deps); + await session.send({ message: [{ type: 'text', text: 'hi' }] }); + const events = await collectEvents(session); + + const blocked = events.find( + (e): e is AgentEvent<'error'> => + e.type === 'error' && e._meta?.['code'] === 'AGENT_EXECUTION_BLOCKED', + ); + expect(blocked?.fatal).toBe(false); + expect(blocked?.message).toBe('Agent execution blocked: Blocked by hook'); + + const messages = events.filter( + (e): e is AgentEvent<'message'> => + e.type === 'message' && e.role === 'agent', + ); + expect( + messages.some( + (message) => + message.content[0]?.type === 'text' && + message.content[0].text === 'Final answer', + ), + ).toBe(true); + + const streamEnd = events.find( + (e): e is AgentEvent<'agent_end'> => e.type === 'agent_end', + ); + expect(streamEnd?.reason).toBe('completed'); + }); + + it('handles Error events', async () => { + const sendMock = deps.client.sendMessageStream as ReturnType< + typeof vi.fn + >; + sendMock.mockReturnValue( + makeStream([ + { + type: GeminiEventType.Error, + value: { error: new Error('API error') }, + }, + ]), + ); + + const session = new LegacyAgentSession(deps); + await session.send({ message: [{ type: 'text', text: 'hi' }] }); + const events = await collectEvents(session); + + const err = events.find( + (e): e is AgentEvent<'error'> => e.type === 'error', + ); + expect(err?.message).toBe('API error'); + expect(events.some((e) => e.type === 'agent_end')).toBe(true); + }); + + it('handles LoopDetected as non-terminal warning event', async () => { + const sendMock = deps.client.sendMessageStream as ReturnType< + typeof vi.fn + >; + // LoopDetected followed by more content — stream continues + sendMock.mockReturnValue( + makeStream([ + { type: GeminiEventType.LoopDetected }, + { type: GeminiEventType.Content, value: 'continuing after loop' }, + { + type: GeminiEventType.Finished, + value: { reason: FinishReason.STOP, usageMetadata: undefined }, + }, + ]), + ); + + const session = new LegacyAgentSession(deps); + await session.send({ message: [{ type: 'text', text: 'hi' }] }); + const events = await collectEvents(session); + + const warning = events.find( + (e): e is AgentEvent<'error'> => + e.type === 'error' && e._meta?.['code'] === 'LOOP_DETECTED', + ); + expect(warning).toBeDefined(); + expect(warning?.fatal).toBe(false); + + // Stream should have continued — content after loop detected + const messages = events.filter( + (e): e is AgentEvent<'message'> => + e.type === 'message' && e.role === 'agent', + ); + expect( + messages.some( + (m) => + m.content[0]?.type === 'text' && + m.content[0].text === 'continuing after loop', + ), + ).toBe(true); + + // Should still end with agent_end completed + const streamEnd = events.find( + (e): e is AgentEvent<'agent_end'> => e.type === 'agent_end', + ); + expect(streamEnd?.reason).toBe('completed'); + }); + }); + + describe('stream - max turns', () => { + it('emits agent_end with max_turns when the session turn limit is exceeded', async () => { + const configMock = deps.config.getMaxSessionTurns as ReturnType< + typeof vi.fn + >; + configMock.mockReturnValue(0); + + const sendMock = deps.client.sendMessageStream as ReturnType< + typeof vi.fn + >; + sendMock.mockReturnValue( + makeStream([ + { type: GeminiEventType.Content, value: 'should not be reached' }, + ]), + ); + + const session = new LegacyAgentSession(deps); + await session.send({ message: [{ type: 'text', text: 'hi' }] }); + const events = await collectEvents(session); + + const streamEnd = events.find( + (e): e is AgentEvent<'agent_end'> => e.type === 'agent_end', + ); + expect(streamEnd?.reason).toBe('max_turns'); + expect(streamEnd?.data).toEqual({ + code: 'MAX_TURNS_EXCEEDED', + maxTurns: 0, + turnCount: 0, + }); + expect(sendMock).not.toHaveBeenCalled(); + }); + + it('treats GeminiClient MaxSessionTurns as a terminal max_turns stream end', async () => { + const sendMock = deps.client.sendMessageStream as ReturnType< + typeof vi.fn + >; + sendMock.mockReturnValue( + makeStream([{ type: GeminiEventType.MaxSessionTurns }]), + ); + + const session = new LegacyAgentSession(deps); + await session.send({ message: [{ type: 'text', text: 'hi' }] }); + const events = await collectEvents(session); + + const errorEvents = events.filter( + (e): e is AgentEvent<'error'> => e.type === 'error', + ); + expect(errorEvents).toHaveLength(0); + + const streamEnd = events.findLast( + (e): e is AgentEvent<'agent_end'> => e.type === 'agent_end', + ); + expect(streamEnd?.reason).toBe('max_turns'); + expect(streamEnd?.data).toEqual({ + code: 'MAX_TURNS_EXCEEDED', + }); + }); + }); + + describe('abort', () => { + it('treats abort before the first model event as aborted without fatal error', async () => { + let releaseAbort: (() => void) | undefined; + const sendMock = deps.client.sendMessageStream as ReturnType< + typeof vi.fn + >; + sendMock.mockReturnValue( + (async function* () { + await new Promise((resolve) => { + releaseAbort = resolve; + }); + yield* []; + const abortError = new Error('Aborted'); + abortError.name = 'AbortError'; + throw abortError; + })(), + ); + + const session = new LegacyAgentSession(deps); + const { streamId } = await session.send({ + message: [{ type: 'text', text: 'hi' }], + }); + await vi.advanceTimersByTimeAsync(0); + + await session.abort(); + releaseAbort?.(); + + const events = await collectEvents(session, { + streamId: streamId ?? undefined, + }); + expect( + events.some( + (event): event is AgentEvent<'error'> => + event.type === 'error' && event.fatal, + ), + ).toBe(false); + + const streamEnd = events.findLast( + (event): event is AgentEvent<'agent_end'> => event.type === 'agent_end', + ); + expect(streamEnd?.reason).toBe('aborted'); + }); + + it('aborts the stream', async () => { + const sendMock = deps.client.sendMessageStream as ReturnType< + typeof vi.fn + >; + // Stream that yields content then checks abort signal via a deferred + let resolveHang: (() => void) | undefined; + sendMock.mockReturnValue( + (async function* () { + yield { + type: GeminiEventType.Content, + value: 'start', + } as ServerGeminiStreamEvent; + // Wait until externally resolved (by abort) + await new Promise((resolve) => { + resolveHang = resolve; + }); + yield { + type: GeminiEventType.Finished, + value: { reason: FinishReason.STOP, usageMetadata: undefined }, + } as ServerGeminiStreamEvent; + })(), + ); + + const session = new LegacyAgentSession(deps); + await session.send({ message: [{ type: 'text', text: 'hi' }] }); + + // Give the loop time to start processing + await new Promise((r) => setTimeout(r, 50)); + + // Abort and resolve the hang so the generator can finish + await session.abort(); + resolveHang?.(); + + // Collect all events + const events = await collectEvents(session); + + const streamEnd = events.find( + (e): e is AgentEvent<'agent_end'> => e.type === 'agent_end', + ); + expect(streamEnd?.reason).toBe('aborted'); + }); + + it('treats abort during pending scheduler work as aborted without fatal error', async () => { + let resolveSchedule: ((value: CompletedToolCall[]) => void) | undefined; + const sendMock = deps.client.sendMessageStream as ReturnType< + typeof vi.fn + >; + sendMock.mockReturnValue( + makeStream([ + { + type: GeminiEventType.ToolCallRequest, + value: makeToolRequest('call-1', 'slow_tool'), + }, + { + type: GeminiEventType.Finished, + value: { reason: FinishReason.STOP, usageMetadata: undefined }, + }, + ]), + ); + + const scheduleMock = deps.scheduler.schedule as ReturnType; + scheduleMock.mockReturnValue( + new Promise((resolve) => { + resolveSchedule = resolve; + }), + ); + + const session = new LegacyAgentSession(deps); + const { streamId } = await session.send({ + message: [{ type: 'text', text: 'hi' }], + }); + + await new Promise((resolve) => setTimeout(resolve, 25)); + await session.abort(); + resolveSchedule?.([makeCompletedToolCall('call-1', 'slow_tool', 'done')]); + + const events = await collectEvents(session, { + streamId: streamId ?? undefined, + }); + expect( + events.some( + (event): event is AgentEvent<'error'> => + event.type === 'error' && event.fatal, + ), + ).toBe(false); + expect(events.some((event) => event.type === 'tool_response')).toBe( + false, + ); + + const streamEnd = events.findLast( + (event): event is AgentEvent<'agent_end'> => event.type === 'agent_end', + ); + expect(streamEnd?.reason).toBe('aborted'); + }); + }); + + describe('events property', () => { + it('accumulates all events', async () => { + const sendMock = deps.client.sendMessageStream as ReturnType< + typeof vi.fn + >; + sendMock.mockReturnValue( + makeStream([ + { type: GeminiEventType.Content, value: 'hi' }, + { + type: GeminiEventType.Finished, + value: { reason: FinishReason.STOP, usageMetadata: undefined }, + }, + ]), + ); + + const session = new LegacyAgentSession(deps); + await session.send({ message: [{ type: 'text', text: 'hi' }] }); + await collectEvents(session); + + expect(session.events.length).toBeGreaterThan(0); + expect(session.events[0]?.type).toBe('message'); + }); + }); + + describe('subscription and stream scoping', () => { + it('subscribe receives live events for the next stream', async () => { + const sendMock = deps.client.sendMessageStream as ReturnType< + typeof vi.fn + >; + sendMock.mockReturnValue( + makeStream([ + { type: GeminiEventType.Content, value: 'hello later' }, + { + type: GeminiEventType.Finished, + value: { reason: FinishReason.STOP, usageMetadata: undefined }, + }, + ]), + ); + + const session = new LegacyAgentSession(deps); + const liveEvents: AgentEvent[] = []; + const unsubscribe = session.subscribe((event) => { + liveEvents.push(event); + }); + + const { streamId } = await session.send({ + message: [{ type: 'text', text: 'hi' }], + }); + await collectEvents(session, { streamId: streamId ?? undefined }); + unsubscribe(); + + expect(liveEvents.length).toBeGreaterThan(0); + expect(liveEvents[0]?.type).toBe('message'); + expect(liveEvents.every((event) => event.streamId === streamId)).toBe( + true, + ); + }); + + it('subscribe is live-only and does not replay old history when idle', async () => { + const sendMock = deps.client.sendMessageStream as ReturnType< + typeof vi.fn + >; + sendMock + .mockReturnValueOnce( + makeStream([ + { type: GeminiEventType.Content, value: 'first answer' }, + { + type: GeminiEventType.Finished, + value: { reason: FinishReason.STOP, usageMetadata: undefined }, + }, + ]), + ) + .mockReturnValueOnce( + makeStream([ + { type: GeminiEventType.Content, value: 'second answer' }, + { + type: GeminiEventType.Finished, + value: { reason: FinishReason.STOP, usageMetadata: undefined }, + }, + ]), + ); + + const session = new LegacyAgentSession(deps); + const first = await session.send({ + message: [{ type: 'text', text: 'first request' }], + }); + await collectEvents(session, { streamId: first.streamId ?? undefined }); + + const liveEvents: AgentEvent[] = []; + const unsubscribe = session.subscribe((event) => { + liveEvents.push(event); + }); + + const second = await session.send({ + message: [{ type: 'text', text: 'second request' }], + }); + await collectEvents(session, { streamId: second.streamId ?? undefined }); + unsubscribe(); + + expect(liveEvents.length).toBeGreaterThan(0); + expect( + liveEvents.every((event) => event.streamId === second.streamId), + ).toBe(true); + expect( + liveEvents.some( + (event) => + event.type === 'message' && + event.role === 'user' && + event.content[0]?.type === 'text' && + event.content[0].text === 'first request', + ), + ).toBe(false); + }); + + it('streams only the requested streamId', async () => { + const sendMock = deps.client.sendMessageStream as ReturnType< + typeof vi.fn + >; + sendMock + .mockReturnValueOnce( + makeStream([ + { type: GeminiEventType.Content, value: 'first answer' }, + { + type: GeminiEventType.Finished, + value: { reason: FinishReason.STOP, usageMetadata: undefined }, + }, + ]), + ) + .mockReturnValueOnce( + makeStream([ + { type: GeminiEventType.Content, value: 'second answer' }, + { + type: GeminiEventType.Finished, + value: { reason: FinishReason.STOP, usageMetadata: undefined }, + }, + ]), + ); + + const session = new LegacyAgentSession(deps); + const first = await session.send({ + message: [{ type: 'text', text: 'first request' }], + }); + await collectEvents(session, { streamId: first.streamId ?? undefined }); + + const second = await session.send({ + message: [{ type: 'text', text: 'second request' }], + }); + await collectEvents(session, { streamId: second.streamId ?? undefined }); + + const firstStreamEvents = await collectEvents(session, { + streamId: first.streamId ?? undefined, + }); + + expect( + firstStreamEvents.every((event) => event.streamId === first.streamId), + ).toBe(true); + expect( + firstStreamEvents.some( + (e) => + e.type === 'message' && + e.role === 'agent' && + e.content[0]?.type === 'text' && + e.content[0].text === 'first answer', + ), + ).toBe(true); + expect( + firstStreamEvents.some( + (e) => + e.type === 'message' && + e.role === 'agent' && + e.content[0]?.type === 'text' && + e.content[0].text === 'second answer', + ), + ).toBe(false); + }); + + it('resumes from eventId within the same stream only', async () => { + const sendMock = deps.client.sendMessageStream as ReturnType< + typeof vi.fn + >; + sendMock + .mockReturnValueOnce( + makeStream([ + { type: GeminiEventType.Content, value: 'first answer' }, + { + type: GeminiEventType.Finished, + value: { reason: FinishReason.STOP, usageMetadata: undefined }, + }, + ]), + ) + .mockReturnValueOnce( + makeStream([ + { type: GeminiEventType.Content, value: 'second answer' }, + { + type: GeminiEventType.Finished, + value: { reason: FinishReason.STOP, usageMetadata: undefined }, + }, + ]), + ); + + const session = new LegacyAgentSession(deps); + const first = await session.send({ + message: [{ type: 'text', text: 'first request' }], + }); + await collectEvents(session, { streamId: first.streamId ?? undefined }); + + await session.send({ + message: [{ type: 'text', text: 'second request' }], + }); + await collectEvents(session); + + const firstAgentMessage = session.events.find( + (e): e is AgentEvent<'message'> => + e.type === 'message' && + e.role === 'agent' && + e.streamId === first.streamId && + e.content[0]?.type === 'text' && + e.content[0].text === 'first answer', + ); + expect(firstAgentMessage).toBeDefined(); + + const resumedEvents = await collectEvents(session, { + eventId: firstAgentMessage?.id, + }); + expect( + resumedEvents.every((event) => event.streamId === first.streamId), + ).toBe(true); + expect(resumedEvents.map((event) => event.type)).toEqual(['agent_end']); + expect( + resumedEvents.some( + (e) => + e.type === 'message' && + e.role === 'agent' && + e.content[0]?.type === 'text' && + e.content[0].text === 'second answer', + ), + ).toBe(false); + }); + }); + + describe('agent_end ordering', () => { + it('agent_end is always the final event yielded', async () => { + const sendMock = deps.client.sendMessageStream as ReturnType< + typeof vi.fn + >; + sendMock.mockReturnValue( + makeStream([ + { type: GeminiEventType.Content, value: 'Hello' }, + { + type: GeminiEventType.Finished, + value: { reason: FinishReason.STOP, usageMetadata: undefined }, + }, + ]), + ); + + const session = new LegacyAgentSession(deps); + await session.send({ message: [{ type: 'text', text: 'hi' }] }); + const events = await collectEvents(session); + + expect(events.length).toBeGreaterThan(0); + expect(events[events.length - 1]?.type).toBe('agent_end'); + }); + + it('agent_end is final even after error events', async () => { + const sendMock = deps.client.sendMessageStream as ReturnType< + typeof vi.fn + >; + sendMock.mockReturnValue( + makeStream([ + { + type: GeminiEventType.Error, + value: { error: new Error('API error') }, + }, + ]), + ); + + const session = new LegacyAgentSession(deps); + await session.send({ message: [{ type: 'text', text: 'hi' }] }); + const events = await collectEvents(session); + + expect(events[events.length - 1]?.type).toBe('agent_end'); + }); + }); + + describe('intermediate Finished events', () => { + it('does NOT emit agent_end when tool calls are pending', async () => { + const sendMock = deps.client.sendMessageStream as ReturnType< + typeof vi.fn + >; + // First turn: tool request + Finished (should NOT produce agent_end) + sendMock.mockReturnValueOnce( + makeStream([ + { + type: GeminiEventType.ToolCallRequest, + value: makeToolRequest('call-1', 'read_file'), + }, + { + type: GeminiEventType.Finished, + value: { + reason: FinishReason.STOP, + usageMetadata: { + promptTokenCount: 50, + candidatesTokenCount: 20, + }, + }, + }, + ]), + ); + // Second turn: final answer + sendMock.mockReturnValueOnce( + makeStream([ + { type: GeminiEventType.Content, value: 'Answer' }, + { + type: GeminiEventType.Finished, + value: { reason: FinishReason.STOP, usageMetadata: undefined }, + }, + ]), + ); + + const scheduleMock = deps.scheduler.schedule as ReturnType; + scheduleMock.mockResolvedValueOnce([ + makeCompletedToolCall('call-1', 'read_file', 'data'), + ]); + + const session = new LegacyAgentSession(deps); + await session.send({ message: [{ type: 'text', text: 'do it' }] }); + const events = await collectEvents(session); + + // Only one agent_end at the very end + const streamEnds = events.filter((e) => e.type === 'agent_end'); + expect(streamEnds).toHaveLength(1); + expect(streamEnds[0]).toBe(events[events.length - 1]); + }); + + it('emits usage for intermediate Finished events', async () => { + const sendMock = deps.client.sendMessageStream as ReturnType< + typeof vi.fn + >; + sendMock.mockReturnValueOnce( + makeStream([ + { + type: GeminiEventType.ToolCallRequest, + value: makeToolRequest('call-1', 'read_file'), + }, + { + type: GeminiEventType.Finished, + value: { + reason: FinishReason.STOP, + usageMetadata: { + promptTokenCount: 100, + candidatesTokenCount: 30, + }, + }, + }, + ]), + ); + sendMock.mockReturnValueOnce( + makeStream([ + { type: GeminiEventType.Content, value: 'Done' }, + { + type: GeminiEventType.Finished, + value: { reason: FinishReason.STOP, usageMetadata: undefined }, + }, + ]), + ); + + const scheduleMock = deps.scheduler.schedule as ReturnType; + scheduleMock.mockResolvedValueOnce([ + makeCompletedToolCall('call-1', 'read_file', 'contents'), + ]); + + const session = new LegacyAgentSession(deps); + await session.send({ message: [{ type: 'text', text: 'go' }] }); + const events = await collectEvents(session); + + // Should have at least one usage event from the intermediate Finished + const usageEvents = events.filter( + (e): e is AgentEvent<'usage'> => e.type === 'usage', + ); + expect(usageEvents.length).toBeGreaterThanOrEqual(1); + expect(usageEvents[0]?.inputTokens).toBe(100); + expect(usageEvents[0]?.outputTokens).toBe(30); + }); + }); + + describe('error handling in runLoop', () => { + it('catches thrown errors and emits error + agent_end', async () => { + const sendMock = deps.client.sendMessageStream as ReturnType< + typeof vi.fn + >; + sendMock.mockImplementation(() => { + throw new Error('Connection refused'); + }); + + const session = new LegacyAgentSession(deps); + await session.send({ message: [{ type: 'text', text: 'hi' }] }); + const events = await collectEvents(session); + + const err = events.find( + (e): e is AgentEvent<'error'> => e.type === 'error', + ); + expect(err?.message).toBe('Connection refused'); + expect(err?.fatal).toBe(true); + + const streamEnd = events.find( + (e): e is AgentEvent<'agent_end'> => e.type === 'agent_end', + ); + expect(streamEnd?.reason).toBe('failed'); + }); + }); + + describe('_emitErrorAndAgentEnd metadata', () => { + it('preserves exitCode and code in _meta for FatalError', async () => { + const sendMock = deps.client.sendMessageStream as ReturnType< + typeof vi.fn + >; + // Simulate a FatalError being thrown + const { FatalError } = await import('../utils/errors.js'); + sendMock.mockImplementation(() => { + throw new FatalError('Disk full', 44); + }); + + const session = new LegacyAgentSession(deps); + await session.send({ message: [{ type: 'text', text: 'hi' }] }); + const events = await collectEvents(session); + + const err = events.find( + (e): e is AgentEvent<'error'> => e.type === 'error', + ); + expect(err?.message).toBe('Disk full'); + expect(err?.fatal).toBe(true); + expect(err?._meta?.['exitCode']).toBe(44); + expect(err?._meta?.['errorName']).toBe('FatalError'); + }); + + it('preserves exitCode for non-FatalError errors that carry one', async () => { + const sendMock = deps.client.sendMessageStream as ReturnType< + typeof vi.fn + >; + const exitCodeError = new Error('custom exit'); + (exitCodeError as Error & { exitCode: number }).exitCode = 17; + sendMock.mockImplementation(() => { + throw exitCodeError; + }); + + const session = new LegacyAgentSession(deps); + await session.send({ message: [{ type: 'text', text: 'hi' }] }); + const events = await collectEvents(session); + + const err = events.find( + (e): e is AgentEvent<'error'> => e.type === 'error', + ); + expect(err?._meta?.['exitCode']).toBe(17); + }); + + it('preserves code in _meta for errors with code property', async () => { + const sendMock = deps.client.sendMessageStream as ReturnType< + typeof vi.fn + >; + const codedError = new Error('ENOENT'); + (codedError as Error & { code: string }).code = 'ENOENT'; + sendMock.mockImplementation(() => { + throw codedError; + }); + + const session = new LegacyAgentSession(deps); + await session.send({ message: [{ type: 'text', text: 'hi' }] }); + const events = await collectEvents(session); + + const err = events.find( + (e): e is AgentEvent<'error'> => e.type === 'error', + ); + expect(err?._meta?.['code']).toBe('ENOENT'); + }); + + it('preserves status in _meta for errors with status property', async () => { + const sendMock = deps.client.sendMessageStream as ReturnType< + typeof vi.fn + >; + const statusError = new Error('rate limited'); + (statusError as Error & { status: string }).status = 'RESOURCE_EXHAUSTED'; + sendMock.mockImplementation(() => { + throw statusError; + }); + + const session = new LegacyAgentSession(deps); + await session.send({ message: [{ type: 'text', text: 'hi' }] }); + const events = await collectEvents(session); + + const err = events.find( + (e): e is AgentEvent<'error'> => e.type === 'error', + ); + expect(err?._meta?.['status']).toBe('RESOURCE_EXHAUSTED'); + }); + }); +}); diff --git a/packages/core/src/agent/legacy-agent-session.ts b/packages/core/src/agent/legacy-agent-session.ts new file mode 100644 index 0000000000..d8044e77e3 --- /dev/null +++ b/packages/core/src/agent/legacy-agent-session.ts @@ -0,0 +1,452 @@ +/** + * @license + * Copyright 2026 Google LLC + * SPDX-License-Identifier: Apache-2.0 + */ + +/** + * @fileoverview LegacyAgentSession backed by the existing Gemini client + + * scheduler loop, adapted to the merged AgentProtocol / AgentSession surface. + */ + +import { GeminiEventType } from '../core/turn.js'; +import type { Part } from '@google/genai'; +import type { GeminiClient } from '../core/client.js'; +import type { Config } from '../config/config.js'; +import type { ToolCallRequestInfo } from '../scheduler/types.js'; +import type { Scheduler } from '../scheduler/scheduler.js'; +import { recordToolCallInteractions } from '../code_assist/telemetry.js'; +import { ToolErrorType, isFatalToolError } from '../tools/tool-error.js'; +import { debugLogger } from '../utils/debugLogger.js'; +import { + buildToolResponseData, + contentPartsToGeminiParts, + geminiPartsToContentParts, + toolResultDisplayToContentParts, +} from './content-utils.js'; +import { AgentSession } from './agent-session.js'; +import { + createTranslationState, + mapFinishReason, + translateEvent, + type TranslationState, +} from './event-translator.js'; +import type { + AgentEvent, + AgentProtocol, + AgentSend, + ContentPart, + StreamEndReason, + Unsubscribe, +} from './types.js'; + +function isAbortLikeError(err: unknown): boolean { + return err instanceof Error && err.name === 'AbortError'; +} + +export interface LegacyAgentSessionDeps { + client: GeminiClient; + scheduler: Scheduler; + config: Config; + promptId: string; + streamId?: string; +} + +class LegacyAgentProtocol implements AgentProtocol { + private _events: AgentEvent[] = []; + private _subscribers = new Set<(event: AgentEvent) => void>(); + private _translationState: TranslationState; + private _agentEndEmitted = false; + private _activeStreamId?: string; + private _abortController = new AbortController(); + private _nextStreamIdOverride?: string; + + private readonly _client: GeminiClient; + private readonly _scheduler: Scheduler; + private readonly _config: Config; + private readonly _promptId: string; + + constructor(deps: LegacyAgentSessionDeps) { + this._translationState = createTranslationState(deps.streamId); + this._nextStreamIdOverride = deps.streamId; + this._client = deps.client; + this._scheduler = deps.scheduler; + this._config = deps.config; + this._promptId = deps.promptId; + } + + get events(): readonly AgentEvent[] { + return this._events; + } + + subscribe(callback: (event: AgentEvent) => void): Unsubscribe { + this._subscribers.add(callback); + return () => { + this._subscribers.delete(callback); + }; + } + + async send(payload: AgentSend): Promise<{ streamId: string }> { + const message = 'message' in payload ? payload.message : undefined; + if (!message) { + throw new Error( + 'LegacyAgentSession.send() only supports message sends for the moment.', + ); + } + + if (this._activeStreamId) { + // TODO: Interactive may eventually allow selected in-stream sends such as + // updates or elicitation responses. Keep rejecting all concurrent sends + // here until we define those correlation semantics. + throw new Error( + 'LegacyAgentSession.send() cannot be called while a stream is active.', + ); + } + + this._beginNewStream(); + const streamId = this._translationState.streamId; + const parts = contentPartsToGeminiParts(message); + const userMessage = this._makeUserMessageEvent(message, payload._meta); + + this._emit([userMessage]); + + this._scheduleRunLoop(parts); + + return { streamId }; + } + + async abort(): Promise { + this._abortController.abort(); + } + + private _scheduleRunLoop(initialParts: Part[]): void { + // Use a macrotask so send() resolves with the streamId before agent_start + // is emitted and consumers can attach to the stream without racing startup. + setTimeout(() => { + void this._runLoopInBackground(initialParts); + }, 0); + } + + private async _runLoopInBackground(initialParts: Part[]): Promise { + this._ensureAgentStart(); + try { + await this._runLoop(initialParts); + } catch (err: unknown) { + if (this._abortController.signal.aborted || isAbortLikeError(err)) { + this._ensureAgentEnd('aborted'); + } else { + this._emitErrorAndAgentEnd(err); + } + this._clearActiveStream(); + } + } + + private async _runLoop(initialParts: Part[]): Promise { + let currentParts: Part[] = initialParts; + let turnCount = 0; + const maxTurns = this._config.getMaxSessionTurns(); + + while (true) { + turnCount++; + if (maxTurns >= 0 && turnCount > maxTurns) { + this._finishStream('max_turns', { + code: 'MAX_TURNS_EXCEEDED', + maxTurns, + turnCount: turnCount - 1, + }); + return; + } + + const toolCallRequests: ToolCallRequestInfo[] = []; + const responseStream = this._client.sendMessageStream( + currentParts, + this._abortController.signal, + this._promptId, + ); + + for await (const event of responseStream) { + if (this._abortController.signal.aborted) { + this._finishStream('aborted'); + return; + } + + if (event.type === GeminiEventType.ToolCallRequest) { + toolCallRequests.push(event.value); + } + + this._emit(translateEvent(event, this._translationState)); + + switch (event.type) { + case GeminiEventType.Error: + case GeminiEventType.InvalidStream: + case GeminiEventType.ContextWindowWillOverflow: + this._finishStream('failed'); + return; + case GeminiEventType.Finished: + if (toolCallRequests.length === 0) { + this._finishStream(mapFinishReason(event.value.reason)); + return; + } + break; + case GeminiEventType.AgentExecutionStopped: + case GeminiEventType.UserCancelled: + case GeminiEventType.MaxSessionTurns: + this._clearActiveStream(); + return; + default: + break; + } + } + + if (this._abortController.signal.aborted) { + this._finishStream('aborted'); + return; + } + + if (toolCallRequests.length === 0) { + this._finishStream('completed'); + return; + } + + const completedToolCalls = await this._scheduler.schedule( + toolCallRequests, + this._abortController.signal, + ); + + if (this._abortController.signal.aborted) { + this._finishStream('aborted'); + return; + } + + const toolResponseParts: Part[] = []; + for (const tc of completedToolCalls) { + const response = tc.response; + const request = tc.request; + const content: ContentPart[] = response.error + ? [{ type: 'text', text: response.error.message }] + : geminiPartsToContentParts(response.responseParts); + const displayContent = toolResultDisplayToContentParts( + response.resultDisplay, + ); + const data = buildToolResponseData(response); + + this._emit([ + this._makeToolResponseEvent({ + requestId: request.callId, + name: request.name, + content, + isError: response.error !== undefined, + ...(displayContent ? { displayContent } : {}), + ...(data ? { data } : {}), + }), + ]); + + if (response.responseParts) { + toolResponseParts.push(...response.responseParts); + } + } + + try { + const currentModel = + this._client.getCurrentSequenceModel() ?? this._config.getModel(); + this._client + .getChat() + .recordCompletedToolCalls(currentModel, completedToolCalls); + await recordToolCallInteractions(this._config, completedToolCalls); + } catch (error) { + debugLogger.error( + `Error recording completed tool call information: ${error}`, + ); + } + + const stopTool = completedToolCalls.find( + (tc) => + tc.response.errorType === ToolErrorType.STOP_EXECUTION && + tc.response.error !== undefined, + ); + if (stopTool) { + this._finishStream('completed'); + return; + } + + const fatalTool = completedToolCalls.find((tc) => + isFatalToolError(tc.response.errorType), + ); + if (fatalTool) { + this._finishStream('failed'); + return; + } + + currentParts = toolResponseParts; + } + } + + private _emit(events: AgentEvent[]): void { + if (events.length === 0) { + return; + } + + const subscribers = [...this._subscribers]; + for (const event of events) { + if (!this._events.some((existing) => existing.id === event.id)) { + this._events.push(event); + } + if (event.type === 'agent_end') { + this._agentEndEmitted = true; + } + for (const subscriber of subscribers) { + subscriber(event); + } + } + } + + private _clearActiveStream(): void { + this._activeStreamId = undefined; + } + + private _beginNewStream(): void { + this._translationState = createTranslationState(this._nextStreamIdOverride); + this._nextStreamIdOverride = undefined; + this._abortController = new AbortController(); + this._agentEndEmitted = false; + this._activeStreamId = this._translationState.streamId; + } + + private _ensureAgentStart(): void { + if (!this._translationState.streamStartEmitted) { + this._translationState.streamStartEmitted = true; + this._emit([this._makeAgentStartEvent()]); + } + } + + private _ensureAgentEnd(reason: StreamEndReason = 'completed'): void { + if (!this._agentEndEmitted && this._translationState.streamStartEmitted) { + this._agentEndEmitted = true; + this._emit([this._makeAgentEndEvent(reason)]); + } + } + + private _finishStream( + reason: StreamEndReason, + data?: Record, + ): void { + if (data && !this._agentEndEmitted) { + this._emit([this._makeAgentEndEvent(reason, data)]); + } else { + this._ensureAgentEnd(reason); + } + this._clearActiveStream(); + } + + /** + * Preserve error identity fields in _meta so downstream consumers can + * reconstruct fatal CLI errors. + */ + private _emitErrorAndAgentEnd(err: unknown): void { + const message = err instanceof Error ? err.message : String(err); + + this._ensureAgentStart(); + + const meta: Record = {}; + if (err instanceof Error) { + meta['errorName'] = err.constructor.name; + if ('exitCode' in err && typeof err.exitCode === 'number') { + meta['exitCode'] = err.exitCode; + } + if ('code' in err) { + meta['code'] = err.code; + } + if ('status' in err) { + meta['status'] = err.status; + } + } + + this._emit([ + this._makeErrorEvent({ + status: 'INTERNAL', + message, + fatal: true, + ...(Object.keys(meta).length > 0 ? { _meta: meta } : {}), + }), + ]); + + this._ensureAgentEnd('failed'); + } + + private _nextEventFields() { + return { + id: `${this._translationState.streamId}-${this._translationState.eventCounter++}`, + timestamp: new Date().toISOString(), + streamId: this._translationState.streamId, + }; + } + + private _makeUserMessageEvent( + content: ContentPart[], + meta?: Record, + ): AgentEvent<'message'> { + const event = { + ...this._nextEventFields(), + type: 'message', + role: 'user', + content, + ...(meta ? { _meta: meta } : {}), + } satisfies AgentEvent<'message'>; + return event; + } + + private _makeToolResponseEvent( + payload: Omit< + AgentEvent<'tool_response'>, + 'id' | 'timestamp' | 'streamId' | 'type' + >, + ): AgentEvent<'tool_response'> { + const event = { + ...this._nextEventFields(), + type: 'tool_response', + ...payload, + } satisfies AgentEvent<'tool_response'>; + return event; + } + + private _makeAgentStartEvent(): AgentEvent<'agent_start'> { + const event = { + ...this._nextEventFields(), + type: 'agent_start', + } satisfies AgentEvent<'agent_start'>; + return event; + } + + private _makeAgentEndEvent( + reason: StreamEndReason, + data?: Record, + ): AgentEvent<'agent_end'> { + const event = { + ...this._nextEventFields(), + type: 'agent_end', + reason, + ...(data ? { data } : {}), + } satisfies AgentEvent<'agent_end'>; + return event; + } + + private _makeErrorEvent( + payload: Omit< + AgentEvent<'error'>, + 'id' | 'timestamp' | 'streamId' | 'type' + >, + ): AgentEvent<'error'> { + const event = { + ...this._nextEventFields(), + type: 'error', + ...payload, + } satisfies AgentEvent<'error'>; + return event; + } +} + +export class LegacyAgentSession extends AgentSession { + constructor(deps: LegacyAgentSessionDeps) { + super(new LegacyAgentProtocol(deps)); + } +} diff --git a/packages/core/src/index.ts b/packages/core/src/index.ts index f177715487..4a5dc9d11d 100644 --- a/packages/core/src/index.ts +++ b/packages/core/src/index.ts @@ -180,6 +180,31 @@ export * from './agents/agentLoader.js'; export * from './agents/local-executor.js'; export * from './agents/agent-scheduler.js'; +// Export agent session interface +export * from './agent/agent-session.js'; +export * from './agent/legacy-agent-session.js'; +export * from './agent/event-translator.js'; +export * from './agent/content-utils.js'; +// Agent event types — namespaced to avoid collisions with existing exports +export type { + AgentEvent, + AgentEventCommon, + AgentEventData, + AgentEnd, + AgentEvents as AgentEventMap, + AgentEventType, + AgentProtocol, + AgentSend, + AgentStart, + ContentPart, + ErrorData, + StreamEndReason, + Trajectory, + Unsubscribe, + Usage as AgentUsage, + WithMeta, +} from './agent/types.js'; + // Export specific tool logic export * from './tools/read-file.js'; export * from './tools/ls.js';