From f1a338716098a0332cd956ab326531db0179b659 Mon Sep 17 00:00:00 2001 From: Adam Weidman Date: Thu, 26 Mar 2026 12:09:47 -0400 Subject: [PATCH] feat(core): wrap local and remote subagents behind AgentProtocol/AgentSession Introduce LocalSubagentProtocol and RemoteSubagentProtocol that implement AgentProtocol, wrapping LocalAgentExecutor and A2A client streaming respectively. LocalSubagentSession and RemoteSubagentSession extend AgentSession and are the public entry points. - LocalSubagentProtocol: translates SubagentActivityEvent -> AgentEvent (THOUGHT_CHUNK->message/thought, TOOL_CALL_START->tool_request, TOOL_CALL_END->tool_response, ERROR->error). Accepts optional rawActivityCallback for rich SubagentProgress display without losing displayName/errorType detail that AgentEvent types do not carry. - RemoteSubagentProtocol: wraps A2A sendMessageStream, maintains contextId/taskId session state, tracks SubagentProgress per chunk for error recovery, and returns a ToolResult with proper SubagentProgress as returnDisplay. - LocalSubagentInvocation: now uses LocalSubagentSession internally, preserving all existing SubagentProgress display logic via rawActivityCallback. External AbortSignal wired through session.abort(). - RemoteAgentInvocation: now uses RemoteSubagentSession, subscribing to message events for live progress updates. - SubagentToolWrapper and SubagentTool: add optional onAgentEvent callback for future parent session observability (currently unused, wired through invocation constructors to avoid a second pass later). - index.ts: export LocalSubagentSession and RemoteSubagentSession. No behavioral change to SubagentProgress display or ToolResult output. --- .../core/src/agents/local-invocation.test.ts | 7 +- packages/core/src/agents/local-invocation.ts | 342 +++++++------ .../src/agents/local-subagent-protocol.ts | 427 +++++++++++++++++ packages/core/src/agents/remote-invocation.ts | 199 ++------ .../src/agents/remote-subagent-protocol.ts | 452 ++++++++++++++++++ .../src/agents/subagent-tool-wrapper.test.ts | 2 + .../core/src/agents/subagent-tool-wrapper.ts | 4 + .../core/src/agents/subagent-tool.test.ts | 2 + packages/core/src/agents/subagent-tool.ts | 5 + packages/core/src/index.ts | 2 + 10 files changed, 1134 insertions(+), 308 deletions(-) create mode 100644 packages/core/src/agents/local-subagent-protocol.ts create mode 100644 packages/core/src/agents/remote-subagent-protocol.ts diff --git a/packages/core/src/agents/local-invocation.test.ts b/packages/core/src/agents/local-invocation.test.ts index 478ceb9f34..e3a182284e 100644 --- a/packages/core/src/agents/local-invocation.test.ts +++ b/packages/core/src/agents/local-invocation.test.ts @@ -200,7 +200,10 @@ describe('LocalSubagentInvocation', () => { }), ); - expect(mockExecutorInstance.run).toHaveBeenCalledWith(params, signal); + expect(mockExecutorInstance.run).toHaveBeenCalledWith( + params, + expect.any(AbortSignal), + ); expect(result.llmContent).toEqual([ { @@ -495,7 +498,7 @@ describe('LocalSubagentInvocation', () => { expect(mockExecutorInstance.run).toHaveBeenCalledWith( params, - controller.signal, + expect.any(AbortSignal), ); }); diff --git a/packages/core/src/agents/local-invocation.ts b/packages/core/src/agents/local-invocation.ts index 0d28dcbe64..43d81dae97 100644 --- a/packages/core/src/agents/local-invocation.ts +++ b/packages/core/src/agents/local-invocation.ts @@ -5,7 +5,6 @@ */ import { type AgentLoopContext } from '../config/agent-loop-context.js'; -import { LocalAgentExecutor } from './local-executor.js'; import { BaseToolInvocation, type ToolResult, @@ -30,6 +29,8 @@ import { sanitizeToolArgs, sanitizeErrorMessage, } from '../utils/agent-sanitization-utils.js'; +import { LocalSubagentSession } from './local-subagent-protocol.js'; +import type { AgentEvent } from '../agent/types.js'; const INPUT_PREVIEW_MAX_LENGTH = 50; const DESCRIPTION_MAX_LENGTH = 200; @@ -39,11 +40,10 @@ const MAX_RECENT_ACTIVITY = 3; * Represents a validated, executable instance of a subagent tool. * * This class orchestrates the execution of a defined agent by: - * 1. Initializing the {@link LocalAgentExecutor}. - * 2. Running the agent's execution loop. - * 3. Bridging the agent's streaming activity (e.g., thoughts) to the tool's - * live output stream. - * 4. Formatting the final result into a {@link ToolResult}. + * 1. Using {@link LocalSubagentSession} as the execution engine. + * 2. Bridging the agent's streaming activity (e.g., thoughts) to the tool's + * live output stream via the session's rawActivityCallback. + * 3. Formatting the final result into a {@link ToolResult}. */ export class LocalSubagentInvocation extends BaseToolInvocation< AgentInputs, @@ -54,6 +54,9 @@ export class LocalSubagentInvocation extends BaseToolInvocation< * @param context The agent loop context. * @param params The validated input parameters for the agent. * @param messageBus Message bus for policy enforcement. + * @param _toolName Optional override for the tool name. + * @param _toolDisplayName Optional override for the tool display name. + * @param _onAgentEvent Optional callback for parent session observability. */ constructor( private readonly definition: LocalAgentDefinition, @@ -62,6 +65,7 @@ export class LocalSubagentInvocation extends BaseToolInvocation< messageBus: MessageBus, _toolName?: string, _toolDisplayName?: string, + private readonly _onAgentEvent?: (event: AgentEvent) => void, ) { super( params, @@ -101,9 +105,170 @@ export class LocalSubagentInvocation extends BaseToolInvocation< ): Promise { let recentActivity: SubagentActivityItem[] = []; + // Raw SubagentActivityEvent handler — preserves all existing progress display logic. + // Passed as rawActivityCallback to LocalSubagentSession so the protocol can call it + // before translating to AgentEvents. + const onActivity = (activity: SubagentActivityEvent): void => { + if (!updateOutput) return; + + let updated = false; + + switch (activity.type) { + case 'THOUGHT_CHUNK': { + const text = String(activity.data['text']); + const lastItem = recentActivity[recentActivity.length - 1]; + + if ( + lastItem && + lastItem.type === 'thought' && + lastItem.status === 'running' + ) { + lastItem.content = sanitizeThoughtContent(text); + } else { + recentActivity.push({ + id: randomUUID(), + type: 'thought', + content: sanitizeThoughtContent(text), + status: 'running', + }); + } + updated = true; + break; + } + case 'TOOL_CALL_START': { + const name = String(activity.data['name']); + const displayName = activity.data['displayName'] + ? sanitizeErrorMessage(String(activity.data['displayName'])) + : undefined; + const description = activity.data['description'] + ? sanitizeErrorMessage(String(activity.data['description'])) + : undefined; + const args = JSON.stringify(sanitizeToolArgs(activity.data['args'])); + recentActivity.push({ + id: randomUUID(), + type: 'tool_call', + content: name, + displayName, + description, + args, + status: 'running', + }); + updated = true; + break; + } + case 'TOOL_CALL_END': { + const name = String(activity.data['name']); + const data = activity.data['data']; + const isError = isToolActivityError(data); + + for (let i = recentActivity.length - 1; i >= 0; i--) { + if ( + recentActivity[i].type === 'tool_call' && + recentActivity[i].content === name && + recentActivity[i].status === 'running' + ) { + recentActivity[i].status = isError ? 'error' : 'completed'; + updated = true; + break; + } + } + break; + } + case 'ERROR': { + const error = String(activity.data['error']); + const errorType = activity.data['errorType']; + const sanitizedError = sanitizeErrorMessage(error); + const isCancellation = + errorType === SubagentActivityErrorType.CANCELLED || + error === SUBAGENT_CANCELLED_ERROR_MESSAGE; + const isRejection = + errorType === SubagentActivityErrorType.REJECTED || + error.startsWith(SUBAGENT_REJECTED_ERROR_PREFIX); + + const toolName = activity.data['name'] + ? String(activity.data['name']) + : undefined; + + if (toolName && (isCancellation || isRejection)) { + for (let i = recentActivity.length - 1; i >= 0; i--) { + if ( + recentActivity[i].type === 'tool_call' && + recentActivity[i].content === toolName && + recentActivity[i].status === 'running' + ) { + recentActivity[i].status = 'cancelled'; + updated = true; + break; + } + } + } else if (toolName) { + // Mark non-rejection/non-cancellation errors as 'error' + for (let i = recentActivity.length - 1; i >= 0; i--) { + if ( + recentActivity[i].type === 'tool_call' && + recentActivity[i].content === toolName && + recentActivity[i].status === 'running' + ) { + recentActivity[i].status = 'error'; + updated = true; + break; + } + } + } + + recentActivity.push({ + id: randomUUID(), + type: 'thought', + content: + isCancellation || isRejection + ? sanitizedError + : `Error: ${sanitizedError}`, + status: isCancellation || isRejection ? 'cancelled' : 'error', + }); + updated = true; + break; + } + default: + break; + } + + if (updated) { + // Keep only the last N items + if (recentActivity.length > MAX_RECENT_ACTIVITY) { + recentActivity = recentActivity.slice(-MAX_RECENT_ACTIVITY); + } + + const progress: SubagentProgress = { + isSubagentProgress: true, + agentName: this.definition.name, + recentActivity: [...recentActivity], // Copy to avoid mutation issues + state: 'running', + }; + + updateOutput(progress); + } + }; + + // Create session with the raw activity callback for rich progress display + const session = new LocalSubagentSession( + this.definition, + this.context, + this.messageBus, + onActivity, + ); + + // Subscribe for parent session observability (future use) + let unsubscribeParent: (() => void) | undefined; + if (this._onAgentEvent) { + unsubscribeParent = session.subscribe(this._onAgentEvent); + } + + // Wire external abort signal to session abort + const abortListener = () => void session.abort(); + signal.addEventListener('abort', abortListener, { once: true }); + try { if (updateOutput) { - // Send initial state const initialProgress: SubagentProgress = { isSubagentProgress: true, agentName: this.definition.name, @@ -113,158 +278,16 @@ export class LocalSubagentInvocation extends BaseToolInvocation< updateOutput(initialProgress); } - // Create an activity callback to bridge the executor's events to the - // tool's streaming output. - const onActivity = (activity: SubagentActivityEvent): void => { - if (!updateOutput) return; + // Buffer non-query params, then send query as message to start execution + const query = String(this.params['query'] ?? ''); + const otherParams = { ...this.params } as Record; + delete otherParams['query']; + if (Object.keys(otherParams).length > 0) { + await session.send({ update: { config: otherParams } }); + } + await session.send({ message: [{ type: 'text', text: query }] }); - let updated = false; - - switch (activity.type) { - case 'THOUGHT_CHUNK': { - const text = String(activity.data['text']); - const lastItem = recentActivity[recentActivity.length - 1]; - - if ( - lastItem && - lastItem.type === 'thought' && - lastItem.status === 'running' - ) { - lastItem.content = sanitizeThoughtContent(text); - } else { - recentActivity.push({ - id: randomUUID(), - type: 'thought', - content: sanitizeThoughtContent(text), - status: 'running', - }); - } - updated = true; - break; - } - case 'TOOL_CALL_START': { - const name = String(activity.data['name']); - const displayName = activity.data['displayName'] - ? sanitizeErrorMessage(String(activity.data['displayName'])) - : undefined; - const description = activity.data['description'] - ? sanitizeErrorMessage(String(activity.data['description'])) - : undefined; - const args = JSON.stringify( - sanitizeToolArgs(activity.data['args']), - ); - recentActivity.push({ - id: randomUUID(), - type: 'tool_call', - content: name, - displayName, - description, - args, - status: 'running', - }); - updated = true; - break; - } - case 'TOOL_CALL_END': { - const name = String(activity.data['name']); - const data = activity.data['data']; - const isError = isToolActivityError(data); - - for (let i = recentActivity.length - 1; i >= 0; i--) { - if ( - recentActivity[i].type === 'tool_call' && - recentActivity[i].content === name && - recentActivity[i].status === 'running' - ) { - recentActivity[i].status = isError ? 'error' : 'completed'; - updated = true; - break; - } - } - break; - } - case 'ERROR': { - const error = String(activity.data['error']); - const errorType = activity.data['errorType']; - const sanitizedError = sanitizeErrorMessage(error); - const isCancellation = - errorType === SubagentActivityErrorType.CANCELLED || - error === SUBAGENT_CANCELLED_ERROR_MESSAGE; - const isRejection = - errorType === SubagentActivityErrorType.REJECTED || - error.startsWith(SUBAGENT_REJECTED_ERROR_PREFIX); - - const toolName = activity.data['name'] - ? String(activity.data['name']) - : undefined; - - if (toolName && (isCancellation || isRejection)) { - for (let i = recentActivity.length - 1; i >= 0; i--) { - if ( - recentActivity[i].type === 'tool_call' && - recentActivity[i].content === toolName && - recentActivity[i].status === 'running' - ) { - recentActivity[i].status = 'cancelled'; - updated = true; - break; - } - } - } else if (toolName) { - // Mark non-rejection/non-cancellation errors as 'error' - for (let i = recentActivity.length - 1; i >= 0; i--) { - if ( - recentActivity[i].type === 'tool_call' && - recentActivity[i].content === toolName && - recentActivity[i].status === 'running' - ) { - recentActivity[i].status = 'error'; - updated = true; - break; - } - } - } - - recentActivity.push({ - id: randomUUID(), - type: 'thought', - content: - isCancellation || isRejection - ? sanitizedError - : `Error: ${sanitizedError}`, - status: isCancellation || isRejection ? 'cancelled' : 'error', - }); - updated = true; - break; - } - default: - break; - } - - if (updated) { - // Keep only the last N items - if (recentActivity.length > MAX_RECENT_ACTIVITY) { - recentActivity = recentActivity.slice(-MAX_RECENT_ACTIVITY); - } - - const progress: SubagentProgress = { - isSubagentProgress: true, - agentName: this.definition.name, - recentActivity: [...recentActivity], // Copy to avoid mutation issues - state: 'running', - }; - - updateOutput(progress); - } - }; - - const executor = await LocalAgentExecutor.create( - this.definition, - this.context, - onActivity, - ); - - const output = await executor.run(this.params, signal); + const output = await session.getResult(); if (output.terminate_reason === AgentTerminateMode.ABORTED) { const progress: SubagentProgress = { @@ -359,6 +382,9 @@ ${output.result}`; // We omit the 'error' property so that the UI renders our rich returnDisplay // instead of the raw error message. The llmContent still informs the agent of the failure. }; + } finally { + signal.removeEventListener('abort', abortListener); + unsubscribeParent?.(); } } } diff --git a/packages/core/src/agents/local-subagent-protocol.ts b/packages/core/src/agents/local-subagent-protocol.ts new file mode 100644 index 0000000000..0968811552 --- /dev/null +++ b/packages/core/src/agents/local-subagent-protocol.ts @@ -0,0 +1,427 @@ +/** + * @license + * Copyright 2026 Google LLC + * SPDX-License-Identifier: Apache-2.0 + */ + +/** + * @fileoverview LocalSubagentProtocol — wraps LocalAgentExecutor behind the + * AgentProtocol interface, translating SubagentActivityEvent callbacks into + * AgentEvents and exposing the executor result via getResult(). + * + * Pattern mirrors LegacyAgentProtocol, but the loop body runs + * LocalAgentExecutor instead of GeminiClient.sendMessageStream(). + */ + +import { randomUUID } from 'node:crypto'; +import type { AgentLoopContext } from '../config/agent-loop-context.js'; +import { AgentSession } from '../agent/agent-session.js'; +import type { + AgentProtocol, + AgentSend, + AgentEvent, + StreamEndReason, + Unsubscribe, + ContentPart, +} from '../agent/types.js'; +import { LocalAgentExecutor } from './local-executor.js'; +import { + AgentTerminateMode, + type LocalAgentDefinition, + type AgentInputs, + type OutputObject, + type SubagentActivityEvent, +} from './types.js'; +import type { MessageBus } from '../confirmation-bus/message-bus.js'; + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +function isAbortLikeError(err: unknown): boolean { + return err instanceof Error && err.name === 'AbortError'; +} + +function mapTerminateMode(mode: AgentTerminateMode): StreamEndReason { + switch (mode) { + case AgentTerminateMode.GOAL: + return 'completed'; + case AgentTerminateMode.TIMEOUT: + return 'max_time'; + case AgentTerminateMode.MAX_TURNS: + return 'max_turns'; + case AgentTerminateMode.ABORTED: + return 'aborted'; + case AgentTerminateMode.ERROR: + case AgentTerminateMode.ERROR_NO_COMPLETE_TASK_CALL: + default: + return 'failed'; + } +} + +// --------------------------------------------------------------------------- +// LocalSubagentProtocol +// --------------------------------------------------------------------------- + +class LocalSubagentProtocol implements AgentProtocol { + private _events: AgentEvent[] = []; + private _subscribers = new Set<(event: AgentEvent) => void>(); + private _streamId: string = randomUUID(); + private _eventCounter = 0; + private _agentStartEmitted = false; + private _agentEndEmitted = false; + private _activeStreamId: string | undefined; + private _abortController = new AbortController(); + + // Result promise wiring + private _resultResolve!: (output: OutputObject) => void; + private _resultReject!: (err: unknown) => void; + private readonly _resultPromise: Promise; + + // Buffered config from send({update}) + private _bufferedConfig: Record = {}; + + constructor( + private readonly definition: LocalAgentDefinition, + private readonly context: AgentLoopContext, + _messageBus: MessageBus, + private readonly _rawActivityCallback?: ( + activity: SubagentActivityEvent, + ) => void, + ) { + this._resultPromise = new Promise((resolve, reject) => { + this._resultResolve = resolve; + this._resultReject = reject; + }); + } + + // --------------------------------------------------------------------------- + // AgentProtocol interface + // --------------------------------------------------------------------------- + + get events(): readonly AgentEvent[] { + return this._events; + } + + subscribe(callback: (event: AgentEvent) => void): Unsubscribe { + this._subscribers.add(callback); + return () => { + this._subscribers.delete(callback); + }; + } + + async send(payload: AgentSend): Promise<{ streamId: string | null }> { + if ('update' in payload && payload.update) { + // Buffer config for use when message send arrives + if (payload.update.config) { + this._bufferedConfig = { + ...this._bufferedConfig, + ...payload.update.config, + }; + } + return { streamId: null }; + } + + if ('message' in payload && payload.message) { + if (this._activeStreamId) { + throw new Error( + 'LocalSubagentProtocol.send() cannot be called while a stream is active.', + ); + } + + // Extract query text from the message ContentParts + const queryText = payload.message + .filter((p): p is ContentPart & { type: 'text' } => p.type === 'text') + .map((p) => p.text) + .join(''); + + // Only include 'query' in params when the message text is non-empty, + // so that callers that pass all fields via update.config are not affected. + const params: AgentInputs = { + ...this._bufferedConfig, + ...(queryText.length > 0 ? { query: queryText } : {}), + }; + + this._beginNewStream(); + const streamId = this._streamId; + + // Schedule execution in a macrotask so send() resolves before agent_start + setTimeout(() => { + void this._runExecutionInBackground(params); + }, 0); + + return { streamId }; + } + + // action and elicitations are not supported + return { streamId: null }; + } + + async abort(): Promise { + this._abortController.abort(); + } + + // --------------------------------------------------------------------------- + // Protocol-specific: result access + // --------------------------------------------------------------------------- + + /** + * Resolves when the executor completes, with the raw OutputObject. + * Used by LocalSubagentInvocation to build the ToolResult. + */ + getResult(): Promise { + return this._resultPromise; + } + + // --------------------------------------------------------------------------- + // Core: execution + // --------------------------------------------------------------------------- + + private _beginNewStream(): void { + this._streamId = randomUUID(); + this._eventCounter = 0; + this._abortController = new AbortController(); + this._agentStartEmitted = false; + this._agentEndEmitted = false; + this._activeStreamId = this._streamId; + } + + private async _runExecutionInBackground(params: AgentInputs): Promise { + this._ensureAgentStart(); + try { + await this._runExecution(params); + } catch (err: unknown) { + if (this._abortController.signal.aborted || isAbortLikeError(err)) { + this._ensureAgentEnd('aborted'); + this._resultReject(err); + } else { + this._emitErrorAndAgentEnd(err); + this._resultReject(err); + } + this._clearActiveStream(); + } + } + + private async _runExecution(params: AgentInputs): Promise { + const signal = this._abortController.signal; + + const onActivity = (activity: SubagentActivityEvent): void => { + // Forward raw activity to invocation-level callback (for rich SubagentProgress display) + this._rawActivityCallback?.(activity); + this._emit(this._translateActivity(activity)); + }; + + const executor = await LocalAgentExecutor.create( + this.definition, + this.context, + onActivity, + ); + + const output = await executor.run(params, signal); + + if ( + output.terminate_reason === AgentTerminateMode.ABORTED || + signal.aborted + ) { + this._finishStream('aborted'); + } else { + this._finishStream(mapTerminateMode(output.terminate_reason)); + } + + this._resultResolve(output); + } + + // --------------------------------------------------------------------------- + // Activity → AgentEvent translation + // --------------------------------------------------------------------------- + + private _translateActivity(activity: SubagentActivityEvent): AgentEvent[] { + switch (activity.type) { + case 'THOUGHT_CHUNK': { + const rawText = activity.data['text']; + const text = String(rawText ?? ''); + return [ + this._makeEvent('message', { + role: 'agent', + content: [{ type: 'thought', thought: text }], + }), + ]; + } + case 'TOOL_CALL_START': { + const rawCallId = activity.data['callId']; + const callId = String(rawCallId ?? randomUUID()); + const rawName = activity.data['name']; + const name = String(rawName ?? 'unknown'); + const rawArgs = activity.data['args']; + const args: Record = + rawArgs !== null && + typeof rawArgs === 'object' && + !Array.isArray(rawArgs) + ? // eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion + (rawArgs as Record) + : {}; + return [ + this._makeEvent('tool_request', { + requestId: callId, + name, + args, + }), + ]; + } + case 'TOOL_CALL_END': { + const rawId = activity.data['id']; + const requestId = String(rawId ?? randomUUID()); + const rawName = activity.data['name']; + const name = String(rawName ?? 'unknown'); + const rawOutput = activity.data['output']; + const output = String(rawOutput ?? ''); + return [ + this._makeEvent('tool_response', { + requestId, + name, + content: [{ type: 'text', text: output }], + }), + ]; + } + case 'ERROR': { + const rawError = activity.data['error']; + const errorMsg = String(rawError ?? 'Unknown error'); + return [ + this._makeEvent('error', { + status: 'INTERNAL', + message: errorMsg, + fatal: false, + }), + ]; + } + default: + return []; + } + } + + // --------------------------------------------------------------------------- + // Internal helpers (mirrors LegacyAgentProtocol) + // --------------------------------------------------------------------------- + + private _emit(events: AgentEvent[]): void { + if (events.length === 0) return; + const subscribers = [...this._subscribers]; + for (const event of events) { + this._events.push(event); + if (event.type === 'agent_end') { + this._agentEndEmitted = true; + } + for (const sub of subscribers) { + sub(event); + } + } + } + + private _clearActiveStream(): void { + this._activeStreamId = undefined; + } + + private _ensureAgentStart(): void { + if (!this._agentStartEmitted) { + this._agentStartEmitted = true; + this._emit([this._makeEvent('agent_start', {})]); + } + } + + private _ensureAgentEnd(reason: StreamEndReason = 'completed'): void { + if (!this._agentEndEmitted && this._agentStartEmitted) { + this._emit([this._makeEvent('agent_end', { reason })]); + } + } + + private _finishStream( + reason: StreamEndReason, + data?: Record, + ): void { + if (data && !this._agentEndEmitted) { + this._emit([this._makeEvent('agent_end', { reason, data })]); + this._agentEndEmitted = true; + } else { + this._ensureAgentEnd(reason); + } + this._clearActiveStream(); + } + + private _emitErrorAndAgentEnd(err: unknown): void { + const message = err instanceof Error ? err.message : String(err); + this._ensureAgentStart(); + + const meta: Record = {}; + if (err instanceof Error) { + meta['errorName'] = err.constructor.name; + if ('exitCode' in err && typeof err.exitCode === 'number') { + meta['exitCode'] = err.exitCode; + } + if ('code' in err) { + meta['code'] = err.code; + } + } + + this._emit([ + this._makeEvent('error', { + status: 'INTERNAL', + message, + fatal: true, + ...(Object.keys(meta).length > 0 ? { _meta: meta } : {}), + }), + ]); + this._ensureAgentEnd('failed'); + } + + private _nextEventFields() { + return { + id: `${this._streamId}-${this._eventCounter++}`, + timestamp: new Date().toISOString(), + streamId: this._streamId, + }; + } + + private _makeEvent( + type: T, + payload: Omit, 'id' | 'timestamp' | 'streamId' | 'type'>, + ): AgentEvent { + // eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion + return { + ...this._nextEventFields(), + type, + ...payload, + } as AgentEvent; + } +} + +// --------------------------------------------------------------------------- +// Public export +// --------------------------------------------------------------------------- + +export class LocalSubagentSession extends AgentSession { + private readonly _localProtocol: LocalSubagentProtocol; + + constructor( + definition: LocalAgentDefinition, + context: AgentLoopContext, + messageBus: MessageBus, + rawActivityCallback?: (activity: SubagentActivityEvent) => void, + ) { + const protocol = new LocalSubagentProtocol( + definition, + context, + messageBus, + rawActivityCallback, + ); + super(protocol); + this._localProtocol = protocol; + } + + /** + * Returns the raw executor OutputObject once execution completes. + * Used by LocalSubagentInvocation to build the ToolResult. + */ + getResult(): Promise { + return this._localProtocol.getResult(); + } +} diff --git a/packages/core/src/agents/remote-invocation.ts b/packages/core/src/agents/remote-invocation.ts index 7dda4b0ee0..918591b6d9 100644 --- a/packages/core/src/agents/remote-invocation.ts +++ b/packages/core/src/agents/remote-invocation.ts @@ -9,6 +9,7 @@ import { type ToolConfirmationOutcome, type ToolResult, type ToolCallConfirmationDetails, + type ToolLiveOutput, } from '../tools/tools.js'; import { DEFAULT_QUERY_STRING, @@ -16,44 +17,23 @@ import { type RemoteAgentDefinition, type AgentInputs, type SubagentProgress, - getAgentCardLoadOptions, - getRemoteAgentTargetUrl, } from './types.js'; import { type AgentLoopContext } from '../config/agent-loop-context.js'; import type { MessageBus } from '../confirmation-bus/message-bus.js'; -import type { - A2AClientManager, - SendMessageResult, -} from './a2a-client-manager.js'; -import { extractIdsFromResponse, A2AResultReassembler } from './a2aUtils.js'; -import type { AuthenticationHandler } from '@a2a-js/sdk/client'; -import { debugLogger } from '../utils/debugLogger.js'; -import type { AnsiOutput } from '../utils/terminalSerializer.js'; -import { A2AAuthProviderFactory } from './auth-provider/factory.js'; import { A2AAgentError } from './a2a-errors.js'; +import { RemoteSubagentSession } from './remote-subagent-protocol.js'; +import type { AgentEvent } from '../agent/types.js'; /** * A tool invocation that proxies to a remote A2A agent. * - * This implementation bypasses the local `LocalAgentExecutor` loop and directly - * invokes the configured A2A tool. + * This implementation delegates execution to {@link RemoteSubagentSession}, + * which wraps the A2A client streaming behind the AgentProtocol interface. */ export class RemoteAgentInvocation extends BaseToolInvocation< RemoteAgentInputs, ToolResult > { - // Persist state across ephemeral invocation instances. - private static readonly sessionState = new Map< - string, - { contextId?: string; taskId?: string } - >(); - // State for the ongoing conversation with the remote agent - private contextId: string | undefined; - private taskId: string | undefined; - - private readonly clientManager: A2AClientManager; - private authHandler: AuthenticationHandler | undefined; - constructor( private readonly definition: RemoteAgentDefinition, private readonly context: AgentLoopContext, @@ -61,6 +41,7 @@ export class RemoteAgentInvocation extends BaseToolInvocation< messageBus: MessageBus, _toolName?: string, _toolDisplayName?: string, + private readonly _onAgentEvent?: (event: AgentEvent) => void, ) { const query = params['query'] ?? DEFAULT_QUERY_STRING; if (typeof query !== 'string') { @@ -75,43 +56,19 @@ export class RemoteAgentInvocation extends BaseToolInvocation< _toolName ?? definition.name, _toolDisplayName ?? definition.displayName, ); - const clientManager = this.context.config.getA2AClientManager(); - if (!clientManager) { + + // Validate that A2AClientManager is available at construction time + if (!this.context.config.getA2AClientManager()) { throw new Error( `Failed to initialize RemoteAgentInvocation for '${definition.name}': A2AClientManager is not available.`, ); } - this.clientManager = clientManager; } getDescription(): string { return `Calling remote agent ${this.definition.displayName ?? this.definition.name}`; } - private async getAuthHandler(): Promise { - if (this.authHandler) { - return this.authHandler; - } - - if (this.definition.auth) { - const targetUrl = getRemoteAgentTargetUrl(this.definition); - const provider = await A2AAuthProviderFactory.create({ - authConfig: this.definition.auth, - agentName: this.definition.name, - targetUrl, - agentCardUrl: this.definition.agentCardUrl, - }); - if (!provider) { - throw new Error( - `Failed to create auth provider for agent '${this.definition.name}'`, - ); - } - this.authHandler = provider; - } - - return this.authHandler; - } - protected override async getConfirmationDetails( _abortSignal: AbortSignal, ): Promise { @@ -128,13 +85,33 @@ export class RemoteAgentInvocation extends BaseToolInvocation< async execute( _signal: AbortSignal, - updateOutput?: (output: string | AnsiOutput | SubagentProgress) => void, + updateOutput?: (output: ToolLiveOutput) => void, ): Promise { - // 1. Ensure the agent is loaded (cached by manager) - // We assume the user has provided an access token via some mechanism (TODO), - // or we rely on ADC. - const reassembler = new A2AResultReassembler(); const agentName = this.definition.displayName ?? this.definition.name; + const session = new RemoteSubagentSession( + this.definition, + this.context, + this.messageBus, + ); + + // Wire external abort signal to session abort + const abortListener = () => void session.abort(); + _signal.addEventListener('abort', abortListener, { once: true }); + + // Subscribe for parent session observability (future use) + let unsubscribeParent: (() => void) | undefined; + if (this._onAgentEvent) { + unsubscribeParent = session.subscribe(this._onAgentEvent); + } + + // Subscribe to message events for live SubagentProgress updates + const unsubscribeProgress = session.subscribe((event: AgentEvent) => { + if (event.type === 'message' && updateOutput) { + const currentProgress = session.getLatestProgress(); + if (currentProgress) updateOutput(currentProgress); + } + }); + try { if (updateOutput) { updateOutput({ @@ -152,97 +129,25 @@ export class RemoteAgentInvocation extends BaseToolInvocation< }); } - const priorState = RemoteAgentInvocation.sessionState.get( - this.definition.name, - ); - if (priorState) { - this.contextId = priorState.contextId; - this.taskId = priorState.taskId; - } + await session.send({ + message: [{ type: 'text', text: this.params.query }], + }); - const authHandler = await this.getAuthHandler(); - - if (!this.clientManager.getClient(this.definition.name)) { - await this.clientManager.loadAgent( - this.definition.name, - getAgentCardLoadOptions(this.definition), - authHandler, - ); - } - - const message = this.params.query; - - const stream = this.clientManager.sendMessageStream( - this.definition.name, - message, - { - contextId: this.contextId, - taskId: this.taskId, - signal: _signal, - }, - ); - - let finalResponse: SendMessageResult | undefined; - - for await (const chunk of stream) { - if (_signal.aborted) { - throw new Error('Operation aborted'); - } - finalResponse = chunk; - reassembler.update(chunk); - - if (updateOutput) { - updateOutput({ - isSubagentProgress: true, - agentName, - state: 'running', - recentActivity: reassembler.toActivityItems(), - result: reassembler.toString(), - }); - } - - const { - contextId: newContextId, - taskId: newTaskId, - clearTaskId, - } = extractIdsFromResponse(chunk); - - if (newContextId) { - this.contextId = newContextId; - } - - this.taskId = clearTaskId ? undefined : (newTaskId ?? this.taskId); - } - - if (!finalResponse) { - throw new Error('No response from remote agent.'); - } - - const finalOutput = reassembler.toString(); - - debugLogger.debug( - `[RemoteAgent] Final response from ${this.definition.name}:\n${JSON.stringify(finalResponse, null, 2)}`, - ); - - const finalProgress: SubagentProgress = { - isSubagentProgress: true, - agentName, - state: 'completed', - result: finalOutput, - recentActivity: reassembler.toActivityItems(), - }; + const result = await session.getResult(); + // Emit final completed progress if (updateOutput) { - updateOutput(finalProgress); + const finalProgress = session.getLatestProgress(); + if (finalProgress) updateOutput(finalProgress); } - return { - llmContent: [{ text: finalOutput }], - returnDisplay: finalProgress, - }; + return result; } catch (error: unknown) { - const partialOutput = reassembler.toString(); - // Surface structured, user-friendly error messages. + const partialProgress = session.getLatestProgress(); + const partialOutput = + typeof partialProgress?.result === 'string' + ? partialProgress.result + : ''; const errorMessage = this.formatExecutionError(error); const fullDisplay = partialOutput ? `${partialOutput}\n\n${errorMessage}` @@ -253,7 +158,7 @@ export class RemoteAgentInvocation extends BaseToolInvocation< agentName, state: 'error', result: fullDisplay, - recentActivity: reassembler.toActivityItems(), + recentActivity: partialProgress?.recentActivity ?? [], }; if (updateOutput) { @@ -265,11 +170,9 @@ export class RemoteAgentInvocation extends BaseToolInvocation< returnDisplay: errorProgress, }; } finally { - // Persist state even on partial failures or aborts to maintain conversational continuity. - RemoteAgentInvocation.sessionState.set(this.definition.name, { - contextId: this.contextId, - taskId: this.taskId, - }); + _signal.removeEventListener('abort', abortListener); + unsubscribeProgress(); + unsubscribeParent?.(); } } diff --git a/packages/core/src/agents/remote-subagent-protocol.ts b/packages/core/src/agents/remote-subagent-protocol.ts new file mode 100644 index 0000000000..faeb29051d --- /dev/null +++ b/packages/core/src/agents/remote-subagent-protocol.ts @@ -0,0 +1,452 @@ +/** + * @license + * Copyright 2026 Google LLC + * SPDX-License-Identifier: Apache-2.0 + */ + +/** + * @fileoverview RemoteSubagentProtocol — wraps A2A remote agent streaming + * behind the AgentProtocol interface. + * + * Pattern mirrors LocalSubagentProtocol and LegacyAgentProtocol, but the loop + * body drives A2AClientManager instead of LocalAgentExecutor. + */ + +import { randomUUID } from 'node:crypto'; +import type { AgentLoopContext } from '../config/agent-loop-context.js'; +import { AgentSession } from '../agent/agent-session.js'; +import type { + AgentProtocol, + AgentSend, + AgentEvent, + StreamEndReason, + Unsubscribe, + ContentPart, +} from '../agent/types.js'; +import type { ToolResult } from '../tools/tools.js'; +import { + DEFAULT_QUERY_STRING, + type RemoteAgentDefinition, + type SubagentProgress, + getRemoteAgentTargetUrl, + getAgentCardLoadOptions, +} from './types.js'; +import { A2AResultReassembler, extractIdsFromResponse } from './a2aUtils.js'; +import type { AuthenticationHandler } from '@a2a-js/sdk/client'; +import { A2AAuthProviderFactory } from './auth-provider/factory.js'; +import { A2AAgentError } from './a2a-errors.js'; +import { debugLogger } from '../utils/debugLogger.js'; +import type { MessageBus } from '../confirmation-bus/message-bus.js'; + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +function isAbortLikeError(err: unknown): boolean { + return err instanceof Error && err.name === 'AbortError'; +} + +// --------------------------------------------------------------------------- +// RemoteSubagentProtocol +// --------------------------------------------------------------------------- + +class RemoteSubagentProtocol implements AgentProtocol { + private _events: AgentEvent[] = []; + private _subscribers = new Set<(event: AgentEvent) => void>(); + private _streamId: string = randomUUID(); + private _eventCounter = 0; + private _agentStartEmitted = false; + private _agentEndEmitted = false; + private _activeStreamId: string | undefined; + private _abortController = new AbortController(); + + // Session state persisted across sends (mirrors RemoteAgentInvocation) + private contextId: string | undefined; + private taskId: string | undefined; + private authHandler: AuthenticationHandler | undefined; + + // Agent display name (for SubagentProgress construction) + private readonly _agentName: string; + + // Latest SubagentProgress — updated per chunk, used for error recovery + private _latestProgress: SubagentProgress | undefined; + + // Result promise wiring + private _resultResolve!: (result: ToolResult) => void; + private _resultReject!: (err: unknown) => void; + private readonly _resultPromise: Promise; + + constructor( + private readonly definition: RemoteAgentDefinition, + private readonly context: AgentLoopContext, + _messageBus: MessageBus, + ) { + this._agentName = definition.displayName ?? definition.name; + this._resultPromise = new Promise((resolve, reject) => { + this._resultResolve = resolve; + this._resultReject = reject; + }); + + // Restore persisted session state (mirrors static map in RemoteAgentInvocation) + const priorState = RemoteSubagentProtocol._sessionState.get( + definition.name, + ); + if (priorState) { + this.contextId = priorState.contextId; + this.taskId = priorState.taskId; + } + } + + // Per-agent session state, mirrors RemoteAgentInvocation.sessionState + private static readonly _sessionState = new Map< + string, + { contextId?: string; taskId?: string } + >(); + + // --------------------------------------------------------------------------- + // AgentProtocol interface + // --------------------------------------------------------------------------- + + get events(): readonly AgentEvent[] { + return this._events; + } + + subscribe(callback: (event: AgentEvent) => void): Unsubscribe { + this._subscribers.add(callback); + return () => { + this._subscribers.delete(callback); + }; + } + + async send(payload: AgentSend): Promise<{ streamId: string | null }> { + if ('message' in payload && payload.message) { + if (this._activeStreamId) { + throw new Error( + 'RemoteSubagentProtocol.send() cannot be called while a stream is active.', + ); + } + + const query = + payload.message + .filter((p): p is ContentPart & { type: 'text' } => p.type === 'text') + .map((p) => p.text) + .join('') || DEFAULT_QUERY_STRING; + + this._beginNewStream(); + const streamId = this._streamId; + + setTimeout(() => { + void this._runStreamInBackground(query); + }, 0); + + return { streamId }; + } + + // update/action/elicitations not used for remote agents + return { streamId: null }; + } + + async abort(): Promise { + this._abortController.abort(); + } + + // --------------------------------------------------------------------------- + // Protocol-specific: result access + // --------------------------------------------------------------------------- + + getResult(): Promise { + return this._resultPromise; + } + + getLatestProgress(): SubagentProgress | undefined { + return this._latestProgress; + } + + // --------------------------------------------------------------------------- + // Core: A2A streaming + // --------------------------------------------------------------------------- + + private _beginNewStream(): void { + this._streamId = randomUUID(); + this._eventCounter = 0; + this._abortController = new AbortController(); + this._agentStartEmitted = false; + this._agentEndEmitted = false; + this._activeStreamId = this._streamId; + } + + private async _runStreamInBackground(query: string): Promise { + this._ensureAgentStart(); + try { + await this._runStream(query); + } catch (err: unknown) { + if (this._abortController.signal.aborted || isAbortLikeError(err)) { + this._ensureAgentEnd('aborted'); + this._resultReject(err); + } else { + this._emitErrorAndAgentEnd(err); + this._resultReject(err); + } + this._clearActiveStream(); + } finally { + RemoteSubagentProtocol._sessionState.set(this.definition.name, { + contextId: this.contextId, + taskId: this.taskId, + }); + } + } + + private async _runStream(query: string): Promise { + const clientManager = this.context.config.getA2AClientManager(); + if (!clientManager) { + throw new Error( + `RemoteSubagentProtocol: A2AClientManager not available for '${this.definition.name}'.`, + ); + } + + const authHandler = await this._getAuthHandler(); + if (!clientManager.getClient(this.definition.name)) { + await clientManager.loadAgent( + this.definition.name, + getAgentCardLoadOptions(this.definition), + authHandler, + ); + } + + const reassembler = new A2AResultReassembler(); + let prevText = ''; + + const stream = clientManager.sendMessageStream( + this.definition.name, + query, + { + contextId: this.contextId, + taskId: this.taskId, + signal: this._abortController.signal, + }, + ); + + for await (const chunk of stream) { + if (this._abortController.signal.aborted) { + this._finishStream('aborted'); + this._resultReject(new Error('Operation aborted')); + this._clearActiveStream(); + return; + } + + reassembler.update(chunk); + + const { + contextId: newContextId, + taskId: newTaskId, + clearTaskId, + } = extractIdsFromResponse(chunk); + if (newContextId) this.contextId = newContextId; + this.taskId = clearTaskId ? undefined : (newTaskId ?? this.taskId); + + // Update latest progress snapshot (for invocation's error recovery) + this._latestProgress = { + isSubagentProgress: true, + agentName: this._agentName, + state: 'running', + recentActivity: reassembler.toActivityItems(), + result: reassembler.toString(), + }; + + // Emit delta as a message event + const currentText = reassembler.toString(); + const delta = currentText.slice(prevText.length); + if (delta) { + this._emit([ + this._makeEvent('message', { + role: 'agent', + content: [{ type: 'text', text: currentText }], + }), + ]); + prevText = currentText; + } + } + + const finalOutput = reassembler.toString(); + debugLogger.debug( + `[RemoteSubagentProtocol] ${this.definition.name} finished, output length: ${finalOutput.length}`, + ); + + const finalProgress: SubagentProgress = { + isSubagentProgress: true, + agentName: this._agentName, + state: 'completed', + result: finalOutput, + recentActivity: reassembler.toActivityItems(), + }; + this._latestProgress = finalProgress; + + this._finishStream('completed'); + + this._resultResolve({ + llmContent: [{ text: finalOutput }], + returnDisplay: finalProgress, + }); + } + + private async _getAuthHandler(): Promise { + if (this.authHandler) return this.authHandler; + if (!this.definition.auth) return undefined; + + const targetUrl = getRemoteAgentTargetUrl(this.definition); + const provider = await A2AAuthProviderFactory.create({ + authConfig: this.definition.auth, + agentName: this.definition.name, + targetUrl, + agentCardUrl: this.definition.agentCardUrl, + }); + if (!provider) { + throw new Error( + `Failed to create auth provider for agent '${this.definition.name}'`, + ); + } + this.authHandler = provider; + return this.authHandler; + } + + // --------------------------------------------------------------------------- + // Internal helpers + // --------------------------------------------------------------------------- + + private _emit(events: AgentEvent[]): void { + if (events.length === 0) return; + const subscribers = [...this._subscribers]; + for (const event of events) { + this._events.push(event); + if (event.type === 'agent_end') { + this._agentEndEmitted = true; + } + for (const sub of subscribers) { + sub(event); + } + } + } + + private _clearActiveStream(): void { + this._activeStreamId = undefined; + } + + private _ensureAgentStart(): void { + if (!this._agentStartEmitted) { + this._agentStartEmitted = true; + this._emit([this._makeEvent('agent_start', {})]); + } + } + + private _ensureAgentEnd(reason: StreamEndReason = 'completed'): void { + if (!this._agentEndEmitted && this._agentStartEmitted) { + this._emit([this._makeEvent('agent_end', { reason })]); + } + } + + private _finishStream( + reason: StreamEndReason, + data?: Record, + ): void { + if (data && !this._agentEndEmitted) { + this._emit([this._makeEvent('agent_end', { reason, data })]); + this._agentEndEmitted = true; + } else { + this._ensureAgentEnd(reason); + } + this._clearActiveStream(); + } + + private _emitErrorAndAgentEnd(err: unknown): void { + const message = this._formatError(err); + this._ensureAgentStart(); + + const meta: Record = {}; + if (err instanceof Error) { + meta['errorName'] = err.constructor.name; + } + + this._emit([ + this._makeEvent('error', { + status: 'INTERNAL', + message, + fatal: true, + ...(Object.keys(meta).length > 0 ? { _meta: meta } : {}), + }), + ]); + this._ensureAgentEnd('failed'); + } + + private _formatError(error: unknown): string { + if (error instanceof A2AAgentError) { + return error.userMessage; + } + return `Error calling remote agent: ${error instanceof Error ? error.message : String(error)}`; + } + + private _nextEventFields() { + return { + id: `${this._streamId}-${this._eventCounter++}`, + timestamp: new Date().toISOString(), + streamId: this._streamId, + }; + } + + private _makeEvent( + type: T, + payload: Omit, 'id' | 'timestamp' | 'streamId' | 'type'>, + ): AgentEvent { + // eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion + return { + ...this._nextEventFields(), + type, + ...payload, + } as AgentEvent; + } +} + +// --------------------------------------------------------------------------- +// Public export +// --------------------------------------------------------------------------- + +export class RemoteSubagentSession extends AgentSession { + private readonly _remoteProtocol: RemoteSubagentProtocol; + + constructor( + definition: RemoteAgentDefinition, + context: AgentLoopContext, + messageBus: MessageBus, + ) { + const protocol = new RemoteSubagentProtocol( + definition, + context, + messageBus, + ); + super(protocol); + this._remoteProtocol = protocol; + } + + /** + * Returns the ToolResult once the remote agent stream completes. + * Used by RemoteAgentInvocation to return the result. + */ + getResult(): Promise { + return this._remoteProtocol.getResult(); + } + + /** + * Returns the most recent SubagentProgress snapshot, updated per streaming + * chunk. Useful for constructing error progress when getResult() rejects. + */ + getLatestProgress(): SubagentProgress | undefined { + return this._remoteProtocol.getLatestProgress(); + } + + /** + * Convenience: start execution with a query string. + * Equivalent to send({message: [{type:'text', text: query}]}). + */ + async startWithQuery(query: string): Promise<{ streamId: string | null }> { + return this.send({ message: [{ type: 'text', text: query }] }); + } +} diff --git a/packages/core/src/agents/subagent-tool-wrapper.test.ts b/packages/core/src/agents/subagent-tool-wrapper.test.ts index 4e2cdb64e6..c474c209b2 100644 --- a/packages/core/src/agents/subagent-tool-wrapper.test.ts +++ b/packages/core/src/agents/subagent-tool-wrapper.test.ts @@ -139,6 +139,7 @@ describe('SubagentToolWrapper', () => { mockMessageBus, mockDefinition.name, mockDefinition.displayName, + undefined, // onAgentEvent (not set when wrapper has no onAgentEvent) ); }); @@ -164,6 +165,7 @@ describe('SubagentToolWrapper', () => { specificMessageBus, mockDefinition.name, mockDefinition.displayName, + undefined, // onAgentEvent (not set when wrapper has no onAgentEvent) ); }); diff --git a/packages/core/src/agents/subagent-tool-wrapper.ts b/packages/core/src/agents/subagent-tool-wrapper.ts index 30a30d76d0..53408f02e6 100644 --- a/packages/core/src/agents/subagent-tool-wrapper.ts +++ b/packages/core/src/agents/subagent-tool-wrapper.ts @@ -18,6 +18,7 @@ import { RemoteAgentInvocation } from './remote-invocation.js'; import { BrowserAgentInvocation } from './browser/browserAgentInvocation.js'; import { BROWSER_AGENT_NAME } from './browser/browserAgentDefinition.js'; import type { MessageBus } from '../confirmation-bus/message-bus.js'; +import type { AgentEvent } from '../agent/types.js'; /** * A tool wrapper that dynamically exposes a subagent as a standard, @@ -41,6 +42,7 @@ export class SubagentToolWrapper extends BaseDeclarativeTool< private readonly definition: AgentDefinition, private readonly context: AgentLoopContext, messageBus: MessageBus, + private readonly onAgentEvent?: (event: AgentEvent) => void, ) { super( definition.name, @@ -80,6 +82,7 @@ export class SubagentToolWrapper extends BaseDeclarativeTool< effectiveMessageBus, _toolName, _toolDisplayName, + this.onAgentEvent, ); } @@ -101,6 +104,7 @@ export class SubagentToolWrapper extends BaseDeclarativeTool< effectiveMessageBus, _toolName, _toolDisplayName, + this.onAgentEvent, ); } } diff --git a/packages/core/src/agents/subagent-tool.test.ts b/packages/core/src/agents/subagent-tool.test.ts index e184558f81..8e220dfefb 100644 --- a/packages/core/src/agents/subagent-tool.test.ts +++ b/packages/core/src/agents/subagent-tool.test.ts @@ -121,6 +121,7 @@ describe('SubAgentInvocation', () => { testDefinition, mockConfig, mockMessageBus, + undefined, // onAgentEvent (not set on SubAgentInvocation) ); }); @@ -165,6 +166,7 @@ describe('SubAgentInvocation', () => { testRemoteDefinition, mockConfig, mockMessageBus, + undefined, // onAgentEvent (not set on SubAgentInvocation) ); }); diff --git a/packages/core/src/agents/subagent-tool.ts b/packages/core/src/agents/subagent-tool.ts index 3ef9f0aa86..f13c4abfe0 100644 --- a/packages/core/src/agents/subagent-tool.ts +++ b/packages/core/src/agents/subagent-tool.ts @@ -19,6 +19,7 @@ import { type AgentLoopContext } from '../config/agent-loop-context.js'; import type { MessageBus } from '../confirmation-bus/message-bus.js'; import type { AgentDefinition, AgentInputs } from './types.js'; import { SubagentToolWrapper } from './subagent-tool-wrapper.js'; +import type { AgentEvent } from '../agent/types.js'; import { SchemaValidator } from '../utils/schemaValidator.js'; import { formatUserHintsForModel } from '../utils/fastAckHelper.js'; import { runInDevTraceSpan } from '../telemetry/trace.js'; @@ -33,6 +34,7 @@ export class SubagentTool extends BaseDeclarativeTool { private readonly definition: AgentDefinition, private readonly context: AgentLoopContext, messageBus: MessageBus, + private readonly onAgentEvent?: (event: AgentEvent) => void, ) { const inputSchema = definition.inputConfig.inputSchema; @@ -116,6 +118,7 @@ export class SubagentTool extends BaseDeclarativeTool { messageBus, _toolName, _toolDisplayName, + this.onAgentEvent, ); } } @@ -130,6 +133,7 @@ class SubAgentInvocation extends BaseToolInvocation { messageBus: MessageBus, _toolName?: string, _toolDisplayName?: string, + private readonly onAgentEvent?: (event: AgentEvent) => void, ) { super( params, @@ -229,6 +233,7 @@ class SubAgentInvocation extends BaseToolInvocation { definition, this.context, this.messageBus, + this.onAgentEvent, ); return wrapper.build(agentArgs); diff --git a/packages/core/src/index.ts b/packages/core/src/index.ts index 2d48eeffe9..3eb7485889 100644 --- a/packages/core/src/index.ts +++ b/packages/core/src/index.ts @@ -187,6 +187,8 @@ export * from './agents/agent-scheduler.js'; // Export agent session interface export * from './agent/agent-session.js'; export * from './agent/legacy-agent-session.js'; +export { LocalSubagentSession } from './agents/local-subagent-protocol.js'; +export { RemoteSubagentSession } from './agents/remote-subagent-protocol.js'; export * from './agent/event-translator.js'; export * from './agent/content-utils.js'; // Agent event types — namespaced to avoid collisions with existing exports