/** * @license * Copyright 2025 Google LLC * SPDX-License-Identifier: Apache-2.0 */ import { type AgentLoopContext } from '../config/agent-loop-context.js'; import { MessageBusType } from '../confirmation-bus/types.js'; import { LocalAgentExecutor } from './local-executor.js'; import { BaseToolInvocation, type ToolResult, type ExecuteOptions, } from '../tools/tools.js'; import { type LocalAgentDefinition, type AgentInputs, type SubagentActivityEvent, type SubagentProgress, type SubagentActivityItem, AgentTerminateMode, SubagentActivityErrorType, SUBAGENT_REJECTED_ERROR_PREFIX, SUBAGENT_CANCELLED_ERROR_MESSAGE, isToolActivityError, } from './types.js'; import { randomUUID } from 'node:crypto'; import type { z } from 'zod'; import type { MessageBus } from '../confirmation-bus/message-bus.js'; import { sanitizeThoughtContent, sanitizeToolArgs, sanitizeErrorMessage, } from '../utils/agent-sanitization-utils.js'; import { debugLogger } from '../utils/debugLogger.js'; const INPUT_PREVIEW_MAX_LENGTH = 50; const DESCRIPTION_MAX_LENGTH = 200; /** * Represents a validated, executable instance of a subagent tool. * * This class orchestrates the execution of a defined agent by: * 1. Initializing the {@link LocalAgentExecutor}. * 2. Running the agent's execution loop. * 3. Bridging the agent's streaming activity (e.g., thoughts) to the tool's * live output stream. * 4. Formatting the final result into a {@link ToolResult}. */ export class LocalSubagentInvocation extends BaseToolInvocation< AgentInputs, ToolResult > { /** * @param definition The definition object that configures the agent. * @param context The agent loop context. * @param params The validated input parameters for the agent. * @param messageBus Message bus for policy enforcement. */ constructor( private readonly definition: LocalAgentDefinition, private readonly context: AgentLoopContext, params: AgentInputs, messageBus: MessageBus, _toolName?: string, _toolDisplayName?: string, ) { super( params, messageBus, _toolName ?? definition.name, _toolDisplayName ?? definition.displayName, ); } /** * Returns a concise, human-readable description of the invocation. * Used for logging and display purposes. */ getDescription(): string { const inputSummary = Object.entries(this.params) .map( ([key, value]) => `${key}: ${String(value).slice(0, INPUT_PREVIEW_MAX_LENGTH)}`, ) .join(', '); const description = `Running subagent '${this.definition.name}' with inputs: { ${inputSummary} }`; return description.slice(0, DESCRIPTION_MAX_LENGTH); } private publishActivity(activity: SubagentActivityItem): void { void this.messageBus.publish({ type: MessageBusType.SUBAGENT_ACTIVITY, subagentName: this.definition.displayName ?? this.definition.name, activity, }); } /** * Executes the subagent. * * @param signal An `AbortSignal` to cancel the agent's execution. * @param updateOutput A callback to stream intermediate output, such as the * agent's thoughts, to the user interface. * @returns A `Promise` that resolves with the final `ToolResult`. */ async execute(options: ExecuteOptions): Promise { const { abortSignal: signal, updateOutput } = options; const recentActivity: SubagentActivityItem[] = []; let executor: LocalAgentExecutor | undefined; try { if (updateOutput) { // Send initial state const initialProgress: SubagentProgress = { isSubagentProgress: true, agentName: this.definition.name, recentActivity: [], state: 'running', }; updateOutput(initialProgress); } // Create an activity callback to bridge the executor's events to the // tool's streaming output. const onActivity = (activity: SubagentActivityEvent): void => { if (!updateOutput) return; let updated = false; switch (activity.type) { case 'THOUGHT_CHUNK': { const text = String(activity.data['text']); const lastItem = recentActivity[recentActivity.length - 1]; if ( lastItem && lastItem.type === 'thought' && lastItem.status === 'running' ) { lastItem.content = sanitizeThoughtContent(text); } else { recentActivity.push({ id: randomUUID(), type: 'thought', content: sanitizeThoughtContent(text), status: 'running', }); } updated = true; const latestThought = recentActivity[recentActivity.length - 1]; if (latestThought) { this.publishActivity(latestThought); } break; } case 'TOOL_CALL_START': { const name = String(activity.data['name']); const displayName = activity.data['displayName'] ? sanitizeErrorMessage(String(activity.data['displayName'])) : undefined; const description = activity.data['description'] ? sanitizeErrorMessage(String(activity.data['description'])) : undefined; const args = JSON.stringify( sanitizeToolArgs(activity.data['args']), ); recentActivity.push({ id: randomUUID(), type: 'tool_call', content: name, displayName, description, args, status: 'running', }); updated = true; const latestTool = recentActivity[recentActivity.length - 1]; if (latestTool) { this.publishActivity(latestTool); } break; } case 'TOOL_CALL_END': { const name = String(activity.data['name']); const data = activity.data['data']; const isError = isToolActivityError(data); for (let i = recentActivity.length - 1; i >= 0; i--) { if ( recentActivity[i].type === 'tool_call' && recentActivity[i].content === name && recentActivity[i].status === 'running' ) { recentActivity[i].status = isError ? 'error' : 'completed'; updated = true; this.publishActivity(recentActivity[i]); break; } } break; } case 'ERROR': { const error = String(activity.data['error']); const errorType = activity.data['errorType']; const sanitizedError = sanitizeErrorMessage(error); const isCancellation = errorType === SubagentActivityErrorType.CANCELLED || error === SUBAGENT_CANCELLED_ERROR_MESSAGE; const isRejection = errorType === SubagentActivityErrorType.REJECTED || error.startsWith(SUBAGENT_REJECTED_ERROR_PREFIX); const toolName = activity.data['name'] ? String(activity.data['name']) : undefined; if (toolName && (isCancellation || isRejection)) { for (let i = recentActivity.length - 1; i >= 0; i--) { if ( recentActivity[i].type === 'tool_call' && recentActivity[i].content === toolName && recentActivity[i].status === 'running' ) { recentActivity[i].status = 'cancelled'; updated = true; break; } } } else if (toolName) { // Mark non-rejection/non-cancellation errors as 'error' for (let i = recentActivity.length - 1; i >= 0; i--) { if ( recentActivity[i].type === 'tool_call' && recentActivity[i].content === toolName && recentActivity[i].status === 'running' ) { recentActivity[i].status = 'error'; updated = true; break; } } } recentActivity.push({ id: randomUUID(), type: 'thought', content: isCancellation || isRejection ? sanitizedError : `Error: ${sanitizedError}`, status: isCancellation || isRejection ? 'cancelled' : 'error', }); updated = true; break; } default: break; } if (updated) { const progress: SubagentProgress = { isSubagentProgress: true, agentName: this.definition.name, recentActivity: [...recentActivity], // Copy to avoid mutation issues state: 'running', }; updateOutput(progress); } }; executor = await LocalAgentExecutor.create( this.definition, this.context, onActivity, ); const output = await executor.run(this.params, signal); if (output.terminate_reason === AgentTerminateMode.ABORTED) { const progress: SubagentProgress = { isSubagentProgress: true, agentName: this.definition.name, recentActivity: [...recentActivity], state: 'cancelled', }; if (updateOutput) { updateOutput(progress); } const cancelError = new Error('Operation cancelled by user'); cancelError.name = 'AbortError'; throw cancelError; } const progress: SubagentProgress = { isSubagentProgress: true, agentName: this.definition.name, recentActivity: [...recentActivity], state: 'completed', result: output.result, terminateReason: output.terminate_reason, }; if (updateOutput) { updateOutput(progress); } const resultContent = `Subagent '${this.definition.name}' finished. Termination Reason: ${output.terminate_reason} Result: ${output.result}`; return { llmContent: [{ text: resultContent }], returnDisplay: progress, data: { agentId: executor.agentId }, }; } catch (error) { const errorMessage = error instanceof Error ? error.message : String(error); debugLogger.error(`Subagent '${this.definition.name}' failed:`, error); const isAbort = (error instanceof Error && error.name === 'AbortError') || errorMessage.includes('Aborted'); // Mark any running items as error/cancelled for (const item of recentActivity) { if (item.status === 'running') { item.status = isAbort ? 'cancelled' : 'error'; } } // Ensure the error is reflected in the recent activity for display // But only if it's NOT an abort, or if we want to show "Cancelled" as a thought if (!isAbort) { const lastActivity = recentActivity[recentActivity.length - 1]; if (!lastActivity || lastActivity.status !== 'error') { recentActivity.push({ id: randomUUID(), type: 'thought', content: `Error: ${errorMessage}`, status: 'error', }); // Maintain size limit // No limit on UI events sent via bus } } const progress: SubagentProgress = { isSubagentProgress: true, agentName: this.definition.name, recentActivity: [...recentActivity], state: isAbort ? 'cancelled' : 'error', }; if (updateOutput) { updateOutput(progress); } if (isAbort) { throw error; } return { llmContent: `Subagent '${this.definition.name}' failed. Error: ${errorMessage}`, returnDisplay: progress, data: executor ? { agentId: executor.agentId } : undefined, // We omit the 'error' property so that the UI renders our rich returnDisplay // instead of the raw error message. The llmContent still informs the agent of the failure. }; } } }