Files
gemini-cli/packages/core/src/agents/local-executor.ts
T
Akhilesh Kumar 5a020e7720 fix(core): avoid restarting subagent MCP servers
I've tactically refactored the `LocalAgentExecutor` so that it avoids shutting down and restarting subagent MCP servers for every agent execution, which mitigates the performance overhead caused by long startup times.

1. Leveraging the Global McpClientManager:
Instead of instantiating an entirely new `McpClientManager` instance within the `LocalAgentExecutor` per execution (and shutting it down in its `finally` block), we now use the single global `McpClientManager` available on `context.config`. Since the global manager deduplicates connection attempts by checking if the server is already active, subagent MCP servers will now naturally stay alive after their initial initialization.

2. Prefixing to Avoid Polluting the Global Namespace:
To isolate the agent-specific tools, we now register the subagent's MCP servers with a unique prefix: `__agent__${definition.name}__${name}`.

3. Strict Filtering for True Isolation (ToolRegistry):
- Main CLI context: Added a block in the global `ToolRegistry.getFunctionDeclarations()` that strictly hides any tool belonging to a server prefixed with `__agent__` if the registry `isMainRegistry`. This prevents internal subagent tools from leaking to the main agent.
- Subagent context (`LocalAgentExecutor`): When inheriting tools from the parent registry (the fallback when an agent doesn't explicitly define `tools: []`), the agent now ignores `__agent__` prefixed tools that belong to *other* agents, ensuring strict tool isolation while keeping the actual underlying server processes alive and reusable.
2026-03-11 20:43:25 +00:00

1364 lines
45 KiB
TypeScript

/**
* @license
* Copyright 2025 Google LLC
* SPDX-License-Identifier: Apache-2.0
*/
import type { Config } from '../config/config.js';
import { type AgentLoopContext } from '../config/agent-loop-context.js';
import { reportError } from '../utils/errorReporting.js';
import { GeminiChat, StreamEventType } from '../core/geminiChat.js';
import {
Type,
type Content,
type Part,
type FunctionCall,
type FunctionDeclaration,
type Schema,
} from '@google/genai';
import { ToolRegistry } from '../tools/tool-registry.js';
import { DiscoveredMCPTool } from '../tools/mcp-tool.js';
import type { McpClientManager } from '../tools/mcp-client-manager.js';
import { CompressionStatus } from '../core/turn.js';
import { type ToolCallRequestInfo } from '../scheduler/types.js';
import { type Message } from '../confirmation-bus/types.js';
import { ChatCompressionService } from '../services/chatCompressionService.js';
import { getDirectoryContextString } from '../utils/environmentContext.js';
import { promptIdContext } from '../utils/promptIdContext.js';
import {
logAgentStart,
logAgentFinish,
logRecoveryAttempt,
} from '../telemetry/loggers.js';
import {
AgentStartEvent,
AgentFinishEvent,
LlmRole,
RecoveryAttemptEvent,
} from '../telemetry/types.js';
import {
AgentTerminateMode,
DEFAULT_QUERY_STRING,
DEFAULT_MAX_TURNS,
DEFAULT_MAX_TIME_MINUTES,
type LocalAgentDefinition,
type AgentInputs,
type OutputObject,
type SubagentActivityEvent,
} from './types.js';
import { getErrorMessage } from '../utils/errors.js';
import { templateString } from './utils.js';
import { DEFAULT_GEMINI_MODEL, isAutoModel } from '../config/models.js';
import type { RoutingContext } from '../routing/routingStrategy.js';
import { parseThought } from '../utils/thoughtUtils.js';
import { type z } from 'zod';
import { zodToJsonSchema } from 'zod-to-json-schema';
import { debugLogger } from '../utils/debugLogger.js';
import { getModelConfigAlias } from './registry.js';
import { getVersion } from '../utils/version.js';
import { getToolCallContext } from '../utils/toolCallContext.js';
import { scheduleAgentTools } from './agent-scheduler.js';
import { DeadlineTimer } from '../utils/deadlineTimer.js';
import { formatUserHintsForModel } from '../utils/fastAckHelper.js';
/** A callback function to report on agent activity. */
export type ActivityCallback = (activity: SubagentActivityEvent) => void;
const TASK_COMPLETE_TOOL_NAME = 'complete_task';
const GRACE_PERIOD_MS = 60 * 1000; // 1 min
/** The possible outcomes of a single agent turn. */
type AgentTurnResult =
| {
status: 'continue';
nextMessage: Content;
}
| {
status: 'stop';
terminateReason: AgentTerminateMode;
finalResult: string | null;
};
export function createUnauthorizedToolError(toolName: string): string {
return `Unauthorized tool call: '${toolName}' is not available to this agent.`;
}
/**
* Executes an agent loop based on an {@link AgentDefinition}.
*
* This executor runs the agent in a loop, calling tools until it calls the
* mandatory `complete_task` tool to signal completion.
*/
export class LocalAgentExecutor<TOutput extends z.ZodTypeAny> {
readonly definition: LocalAgentDefinition<TOutput>;
private readonly agentId: string;
private readonly toolRegistry: ToolRegistry;
private readonly context: AgentLoopContext;
private readonly onActivity?: ActivityCallback;
private readonly compressionService: ChatCompressionService;
private readonly parentCallId?: string;
private hasFailedCompressionAttempt = false;
private get config(): Config {
return this.context.config;
}
/**
* Creates and validates a new `AgentExecutor` instance.
*
* This method ensures that all tools specified in the agent's definition are
* safe for non-interactive use before creating the executor.
*
* @param definition The definition object for the agent.
* @param context The execution context.
* @param onActivity An optional callback to receive activity events.
* @returns A promise that resolves to a new `LocalAgentExecutor` instance.
*/
static async create<TOutput extends z.ZodTypeAny>(
definition: LocalAgentDefinition<TOutput>,
context: AgentLoopContext,
onActivity?: ActivityCallback,
): Promise<LocalAgentExecutor<TOutput>> {
const parentMessageBus = context.messageBus;
// Create an override object to inject the subagent name into tool confirmation requests
// eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion
const subagentMessageBus = Object.create(
parentMessageBus,
) as typeof parentMessageBus;
subagentMessageBus.publish = async (message: Message) => {
if (message.type === 'tool-confirmation-request') {
return parentMessageBus.publish({
...message,
subagent: definition.name,
});
}
return parentMessageBus.publish(message);
};
// Create an isolated tool registry for this agent instance.
const agentToolRegistry = new ToolRegistry(
context.config,
subagentMessageBus,
);
let mcpClientManager: McpClientManager | undefined;
if (definition.mcpServers) {
const globalMcpManager = context.config.getMcpClientManager();
if (globalMcpManager) {
for (const [name, config] of Object.entries(definition.mcpServers)) {
const prefixedName = `__agent__${definition.name}__${name}`;
await globalMcpManager.maybeDiscoverMcpServer(prefixedName, config);
}
}
}
const parentToolRegistry = context.toolRegistry;
const allAgentNames = new Set(
context.config.getAgentRegistry().getAllAgentNames(),
);
const registerToolByName = (toolName: string) => {
// Check if the tool is a subagent to prevent recursion.
// We do not allow agents to call other agents.
if (allAgentNames.has(toolName)) {
debugLogger.warn(
`[LocalAgentExecutor] Skipping subagent tool '${toolName}' for agent '${definition.name}' to prevent recursion.`,
);
return;
}
// If the tool is referenced by name, retrieve it from the parent
// registry and register it with the agent's isolated registry.
const tool = parentToolRegistry.getTool(toolName);
if (tool) {
// Clone the tool, so it gets its own state and subagent messageBus
const clonedTool = tool.clone(subagentMessageBus);
if (clonedTool instanceof DiscoveredMCPTool) {
// Subagents MUST use fully qualified names for MCP tools to ensure
// unambiguous tool calls and to comply with policy requirements.
// We automatically "upgrade" any MCP tool to its qualified version.
agentToolRegistry.registerTool(clonedTool.asFullyQualifiedTool());
} else {
agentToolRegistry.registerTool(clonedTool);
}
}
};
if (definition.toolConfig) {
for (const toolRef of definition.toolConfig.tools) {
if (typeof toolRef === 'string') {
registerToolByName(toolRef);
} else if (
typeof toolRef === 'object' &&
'name' in toolRef &&
'build' in toolRef
) {
agentToolRegistry.registerTool(toolRef);
}
// Note: Raw `FunctionDeclaration` objects in the config don't need to be
// registered; their schemas are passed directly to the model later.
}
} else {
// If no tools are explicitly configured, default to all available tools.
for (const toolName of parentToolRegistry.getAllToolNames()) {
const tool = parentToolRegistry.getTool(toolName);
if (
tool instanceof DiscoveredMCPTool &&
tool.serverName.startsWith('__agent__')
) {
if (!tool.serverName.startsWith(`__agent__${definition.name}__`)) {
continue; // Skip other agents' MCP tools
}
}
registerToolByName(toolName);
}
}
// Always ensure this agent's own MCP servers are included, even if toolConfig is restricted
parentToolRegistry.getActiveTools().forEach((tool) => {
if (
tool instanceof DiscoveredMCPTool &&
tool.serverName.startsWith(`__agent__${definition.name}__`)
) {
const qualifiedName = tool.asFullyQualifiedTool().name;
if (!agentToolRegistry.getTool(qualifiedName)) {
registerToolByName(qualifiedName);
}
}
});
agentToolRegistry.sortTools();
// Get the parent prompt ID from context
const parentPromptId = context.promptId;
// Get the parent tool call ID from context
const toolContext = getToolCallContext();
const parentCallId = toolContext?.callId;
return new LocalAgentExecutor(
definition,
context,
agentToolRegistry,
parentPromptId,
parentCallId,
onActivity,
mcpClientManager,
);
}
/**
* Constructs a new AgentExecutor instance.
*
* @private This constructor is private. Use the static `create` method to
* instantiate the class.
*/
private constructor(
definition: LocalAgentDefinition<TOutput>,
context: AgentLoopContext,
toolRegistry: ToolRegistry,
parentPromptId: string | undefined,
parentCallId: string | undefined,
onActivity?: ActivityCallback,
) {
this.definition = definition;
this.context = context;
this.toolRegistry = toolRegistry;
this.onActivity = onActivity;
this.compressionService = new ChatCompressionService();
this.parentCallId = parentCallId;
const randomIdPart = Math.random().toString(36).slice(2, 8);
// parentPromptId will be undefined if this agent is invoked directly
// (top-level), rather than as a sub-agent.
const parentPrefix = parentPromptId ? `${parentPromptId}-` : '';
this.agentId = `${parentPrefix}${this.definition.name}-${randomIdPart}`;
}
/**
* Executes a single turn of the agent's logic, from calling the model
* to processing its response.
*
* @returns An {@link AgentTurnResult} object indicating whether to continue
* or stop the agent loop.
*/
private async executeTurn(
chat: GeminiChat,
currentMessage: Content,
turnCounter: number,
combinedSignal: AbortSignal,
timeoutSignal: AbortSignal, // Pass the timeout controller's signal
onWaitingForConfirmation?: (waiting: boolean) => void,
): Promise<AgentTurnResult> {
const promptId = `${this.agentId}#${turnCounter}`;
await this.tryCompressChat(chat, promptId);
const { functionCalls } = await promptIdContext.run(promptId, async () =>
this.callModel(chat, currentMessage, combinedSignal, promptId),
);
if (combinedSignal.aborted) {
const terminateReason = timeoutSignal.aborted
? AgentTerminateMode.TIMEOUT
: AgentTerminateMode.ABORTED;
return {
status: 'stop',
terminateReason,
finalResult: null, // 'run' method will set the final timeout string
};
}
// If the model stops calling tools without calling complete_task, it's an error.
if (functionCalls.length === 0) {
this.emitActivity('ERROR', {
error: `Agent stopped calling tools but did not call '${TASK_COMPLETE_TOOL_NAME}' to finalize the session.`,
context: 'protocol_violation',
});
return {
status: 'stop',
terminateReason: AgentTerminateMode.ERROR_NO_COMPLETE_TASK_CALL,
finalResult: null,
};
}
const { nextMessage, submittedOutput, taskCompleted, aborted } =
await this.processFunctionCalls(
functionCalls,
combinedSignal,
promptId,
onWaitingForConfirmation,
);
if (aborted) {
return {
status: 'stop',
terminateReason: AgentTerminateMode.ABORTED,
finalResult: null,
};
}
if (taskCompleted) {
const finalResult = submittedOutput ?? 'Task completed successfully.';
return {
status: 'stop',
terminateReason: AgentTerminateMode.GOAL,
finalResult,
};
}
// Task is not complete, continue to the next turn.
return {
status: 'continue',
nextMessage,
};
}
/**
* Generates a specific warning message for the agent's final turn.
*/
private getFinalWarningMessage(
reason:
| AgentTerminateMode.TIMEOUT
| AgentTerminateMode.MAX_TURNS
| AgentTerminateMode.ERROR_NO_COMPLETE_TASK_CALL,
): string {
let explanation = '';
switch (reason) {
case AgentTerminateMode.TIMEOUT:
explanation = 'You have exceeded the time limit.';
break;
case AgentTerminateMode.MAX_TURNS:
explanation = 'You have exceeded the maximum number of turns.';
break;
case AgentTerminateMode.ERROR_NO_COMPLETE_TASK_CALL:
explanation = 'You have stopped calling tools without finishing.';
break;
default:
throw new Error(`Unknown terminate reason: ${reason}`);
}
return `${explanation} You have one final chance to complete the task with a short grace period. You MUST call \`${TASK_COMPLETE_TOOL_NAME}\` immediately with your best answer and explain that your investigation was interrupted. Do not call any other tools.`;
}
/**
* Attempts a single, final recovery turn if the agent stops for a recoverable reason.
* Gives the agent a grace period to call `complete_task`.
*
* @returns The final result string if recovery was successful, or `null` if it failed.
*/
private async executeFinalWarningTurn(
chat: GeminiChat,
turnCounter: number,
reason:
| AgentTerminateMode.TIMEOUT
| AgentTerminateMode.MAX_TURNS
| AgentTerminateMode.ERROR_NO_COMPLETE_TASK_CALL,
externalSignal: AbortSignal, // The original signal passed to run()
onWaitingForConfirmation?: (waiting: boolean) => void,
): Promise<string | null> {
this.emitActivity('THOUGHT_CHUNK', {
text: `Execution limit reached (${reason}). Attempting one final recovery turn with a grace period.`,
});
const recoveryStartTime = Date.now();
let success = false;
const gracePeriodMs = GRACE_PERIOD_MS;
const graceTimeoutController = new AbortController();
const graceTimeoutId = setTimeout(
() => graceTimeoutController.abort(new Error('Grace period timed out.')),
gracePeriodMs,
);
try {
const recoveryMessage: Content = {
role: 'user',
parts: [{ text: this.getFinalWarningMessage(reason) }],
};
// We monitor both the external signal and our new grace period timeout
const combinedSignal = AbortSignal.any([
externalSignal,
graceTimeoutController.signal,
]);
const turnResult = await this.executeTurn(
chat,
recoveryMessage,
turnCounter, // This will be the "last" turn number
combinedSignal,
graceTimeoutController.signal, // Pass grace signal to identify a *grace* timeout
onWaitingForConfirmation,
);
if (
turnResult.status === 'stop' &&
turnResult.terminateReason === AgentTerminateMode.GOAL
) {
// Success!
this.emitActivity('THOUGHT_CHUNK', {
text: 'Graceful recovery succeeded.',
});
success = true;
return turnResult.finalResult ?? 'Task completed during grace period.';
}
// Any other outcome (continue, error, non-GOAL stop) is a failure.
this.emitActivity('ERROR', {
error: `Graceful recovery attempt failed. Reason: ${turnResult.status}`,
context: 'recovery_turn',
});
return null;
} catch (error) {
// This catch block will likely catch the 'Grace period timed out' error.
this.emitActivity('ERROR', {
error: `Graceful recovery attempt failed: ${String(error)}`,
context: 'recovery_turn',
});
return null;
} finally {
clearTimeout(graceTimeoutId);
logRecoveryAttempt(
this.config,
new RecoveryAttemptEvent(
this.agentId,
this.definition.name,
reason,
Date.now() - recoveryStartTime,
success,
turnCounter,
),
);
}
}
/**
* Runs the agent.
*
* @param inputs The validated input parameters for this invocation.
* @param signal An `AbortSignal` for cancellation.
* @returns A promise that resolves to the agent's final output.
*/
async run(inputs: AgentInputs, signal: AbortSignal): Promise<OutputObject> {
const startTime = Date.now();
let turnCounter = 0;
let terminateReason: AgentTerminateMode = AgentTerminateMode.ERROR;
let finalResult: string | null = null;
const maxTimeMinutes =
this.definition.runConfig.maxTimeMinutes ?? DEFAULT_MAX_TIME_MINUTES;
const maxTurns = this.definition.runConfig.maxTurns ?? DEFAULT_MAX_TURNS;
const deadlineTimer = new DeadlineTimer(
maxTimeMinutes * 60 * 1000,
'Agent timed out.',
);
// Track time spent waiting for user confirmation to credit it back to the agent.
const onWaitingForConfirmation = (waiting: boolean) => {
if (waiting) {
deadlineTimer.pause();
} else {
deadlineTimer.resume();
}
};
// Combine the external signal with the internal timeout signal.
const combinedSignal = AbortSignal.any([signal, deadlineTimer.signal]);
logAgentStart(
this.config,
new AgentStartEvent(this.agentId, this.definition.name),
);
let chat: GeminiChat | undefined;
let tools: FunctionDeclaration[] | undefined;
try {
// Inject standard runtime context into inputs
const augmentedInputs = {
...inputs,
cliVersion: await getVersion(),
activeModel: this.config.getActiveModel(),
today: new Date().toLocaleDateString(),
};
tools = this.prepareToolsList();
chat = await this.createChatObject(augmentedInputs, tools);
const query = this.definition.promptConfig.query
? templateString(this.definition.promptConfig.query, augmentedInputs)
: DEFAULT_QUERY_STRING;
const pendingHintsQueue: string[] = [];
const hintListener = (hint: string) => {
pendingHintsQueue.push(hint);
};
// Capture the index of the last hint before starting to avoid re-injecting old hints.
// NOTE: Hints added AFTER this point will be broadcast to all currently running
// local agents via the listener below.
const startIndex = this.config.userHintService.getLatestHintIndex();
this.config.userHintService.onUserHint(hintListener);
try {
const initialHints =
this.config.userHintService.getUserHintsAfter(startIndex);
const formattedInitialHints = formatUserHintsForModel(initialHints);
let currentMessage: Content = formattedInitialHints
? {
role: 'user',
parts: [{ text: formattedInitialHints }, { text: query }],
}
: { role: 'user', parts: [{ text: query }] };
while (true) {
// Check for termination conditions like max turns.
const reason = this.checkTermination(turnCounter, maxTurns);
if (reason) {
terminateReason = reason;
break;
}
// Check for timeout or external abort.
if (combinedSignal.aborted) {
// Determine which signal caused the abort.
terminateReason = deadlineTimer.signal.aborted
? AgentTerminateMode.TIMEOUT
: AgentTerminateMode.ABORTED;
break;
}
const turnResult = await this.executeTurn(
chat,
currentMessage,
turnCounter++,
combinedSignal,
deadlineTimer.signal,
onWaitingForConfirmation,
);
if (turnResult.status === 'stop') {
terminateReason = turnResult.terminateReason;
// Only set finalResult if the turn provided one (e.g., error or goal).
if (turnResult.finalResult) {
finalResult = turnResult.finalResult;
}
break; // Exit the loop for *any* stop reason.
}
// If status is 'continue', update message for the next loop
currentMessage = turnResult.nextMessage;
// Check for new user steering hints collected via subscription
if (pendingHintsQueue.length > 0) {
const hintsToProcess = [...pendingHintsQueue];
pendingHintsQueue.length = 0;
const formattedHints = formatUserHintsForModel(hintsToProcess);
if (formattedHints) {
// Append hints to the current message (next turn)
currentMessage.parts ??= [];
currentMessage.parts.unshift({ text: formattedHints });
}
}
}
} finally {
this.config.userHintService.offUserHint(hintListener);
}
// === UNIFIED RECOVERY BLOCK ===
// Only attempt recovery if it's a known recoverable reason.
// We don't recover from GOAL (already done) or ABORTED (user cancelled).
if (
terminateReason !== AgentTerminateMode.ERROR &&
terminateReason !== AgentTerminateMode.ABORTED &&
terminateReason !== AgentTerminateMode.GOAL
) {
const recoveryResult = await this.executeFinalWarningTurn(
chat,
turnCounter, // Use current turnCounter for the recovery attempt
terminateReason,
signal, // Pass the external signal
onWaitingForConfirmation,
);
if (recoveryResult !== null) {
// Recovery Succeeded
terminateReason = AgentTerminateMode.GOAL;
finalResult = recoveryResult;
} else {
// Recovery Failed. Set the final error message based on the *original* reason.
if (terminateReason === AgentTerminateMode.TIMEOUT) {
finalResult = `Agent timed out after ${maxTimeMinutes} minutes.`;
this.emitActivity('ERROR', {
error: finalResult,
context: 'timeout',
});
} else if (terminateReason === AgentTerminateMode.MAX_TURNS) {
finalResult = `Agent reached max turns limit (${maxTurns}).`;
this.emitActivity('ERROR', {
error: finalResult,
context: 'max_turns',
});
} else if (
terminateReason === AgentTerminateMode.ERROR_NO_COMPLETE_TASK_CALL
) {
// The finalResult was already set by executeTurn, but we re-emit just in case.
finalResult =
finalResult ||
`Agent stopped calling tools but did not call '${TASK_COMPLETE_TOOL_NAME}'.`;
this.emitActivity('ERROR', {
error: finalResult,
context: 'protocol_violation',
});
}
}
}
// === FINAL RETURN LOGIC ===
if (terminateReason === AgentTerminateMode.GOAL) {
return {
result: finalResult || 'Task completed.',
terminate_reason: terminateReason,
};
}
return {
result:
finalResult || 'Agent execution was terminated before completion.',
terminate_reason: terminateReason,
};
} catch (error) {
// Check if the error is an AbortError caused by our internal timeout.
if (
error instanceof Error &&
error.name === 'AbortError' &&
deadlineTimer.signal.aborted &&
!signal.aborted // Ensure the external signal was not the cause
) {
terminateReason = AgentTerminateMode.TIMEOUT;
// Also use the unified recovery logic here
if (chat && tools) {
const recoveryResult = await this.executeFinalWarningTurn(
chat,
turnCounter, // Use current turnCounter
AgentTerminateMode.TIMEOUT,
signal,
onWaitingForConfirmation,
);
if (recoveryResult !== null) {
// Recovery Succeeded
terminateReason = AgentTerminateMode.GOAL;
finalResult = recoveryResult;
return {
result: finalResult,
terminate_reason: terminateReason,
};
}
}
// Recovery failed or wasn't possible
finalResult = `Agent timed out after ${maxTimeMinutes} minutes.`;
this.emitActivity('ERROR', {
error: finalResult,
context: 'timeout',
});
return {
result: finalResult,
terminate_reason: terminateReason,
};
}
this.emitActivity('ERROR', { error: String(error) });
throw error; // Re-throw other errors or external aborts.
} finally {
deadlineTimer.abort();
logAgentFinish(
this.config,
new AgentFinishEvent(
this.agentId,
this.definition.name,
Date.now() - startTime,
turnCounter,
terminateReason,
),
);
}
}
private async tryCompressChat(
chat: GeminiChat,
prompt_id: string,
): Promise<void> {
const model = this.definition.modelConfig.model ?? DEFAULT_GEMINI_MODEL;
const { newHistory, info } = await this.compressionService.compress(
chat,
prompt_id,
false,
model,
this.config,
this.hasFailedCompressionAttempt,
);
if (
info.compressionStatus ===
CompressionStatus.COMPRESSION_FAILED_INFLATED_TOKEN_COUNT
) {
this.hasFailedCompressionAttempt = true;
} else if (info.compressionStatus === CompressionStatus.COMPRESSED) {
if (newHistory) {
chat.setHistory(newHistory);
this.hasFailedCompressionAttempt = false;
}
} else if (info.compressionStatus === CompressionStatus.CONTENT_TRUNCATED) {
if (newHistory) {
chat.setHistory(newHistory);
// Do NOT reset hasFailedCompressionAttempt.
// We only truncated content because summarization previously failed.
// We want to keep avoiding expensive summarization calls.
}
}
}
/**
* Calls the generative model with the current context and tools.
*
* @returns The model's response, including any tool calls or text.
*/
private async callModel(
chat: GeminiChat,
message: Content,
signal: AbortSignal,
promptId: string,
): Promise<{ functionCalls: FunctionCall[]; textResponse: string }> {
const modelConfigAlias = getModelConfigAlias(this.definition);
// Resolve the model config early to get the concrete model string (which may be `auto`).
const resolvedConfig = this.config.modelConfigService.getResolvedConfig({
model: modelConfigAlias,
overrideScope: this.definition.name,
});
const requestedModel = resolvedConfig.model;
let modelToUse: string;
if (isAutoModel(requestedModel)) {
// TODO(joshualitt): This try / catch is inconsistent with the routing
// behavior for the main agent. Ideally, we would have a universal
// policy for routing failure. Given routing failure does not necessarily
// mean generation will fail, we may want to share this logic with
// other places we use model routing.
try {
const routingContext: RoutingContext = {
history: chat.getHistory(/*curated=*/ true),
request: message.parts || [],
signal,
requestedModel,
};
const router = this.config.getModelRouterService();
const decision = await router.route(routingContext);
modelToUse = decision.model;
} catch (error) {
debugLogger.warn(`Error during model routing: ${error}`);
modelToUse = DEFAULT_GEMINI_MODEL;
}
} else {
modelToUse = requestedModel;
}
const role = LlmRole.SUBAGENT;
const responseStream = await chat.sendMessageStream(
{
model: modelToUse,
overrideScope: this.definition.name,
},
message.parts || [],
promptId,
signal,
role,
);
const functionCalls: FunctionCall[] = [];
let textResponse = '';
for await (const resp of responseStream) {
if (signal.aborted) break;
if (resp.type === StreamEventType.CHUNK) {
const chunk = resp.value;
const parts = chunk.candidates?.[0]?.content?.parts;
// Extract and emit any subject "thought" content from the model.
const { subject } = parseThought(
parts?.find((p) => p.thought)?.text || '',
);
if (subject) {
this.emitActivity('THOUGHT_CHUNK', { text: subject });
}
// Collect any function calls requested by the model.
if (chunk.functionCalls) {
functionCalls.push(...chunk.functionCalls);
}
// Handle text response (non-thought text)
const text =
parts
?.filter((p) => !p.thought && p.text)
.map((p) => p.text)
.join('') || '';
if (text) {
textResponse += text;
}
}
}
return { functionCalls, textResponse };
}
/** Initializes a `GeminiChat` instance for the agent run. */
private async createChatObject(
inputs: AgentInputs,
tools: FunctionDeclaration[],
): Promise<GeminiChat> {
const { promptConfig } = this.definition;
if (!promptConfig.systemPrompt && !promptConfig.initialMessages) {
throw new Error(
'PromptConfig must define either `systemPrompt` or `initialMessages`.',
);
}
const startHistory = this.applyTemplateToInitialMessages(
promptConfig.initialMessages ?? [],
inputs,
);
// Build system instruction from the templated prompt string.
const systemInstruction = promptConfig.systemPrompt
? await this.buildSystemPrompt(inputs)
: undefined;
try {
return new GeminiChat(
this.config,
systemInstruction,
[{ functionDeclarations: tools }],
startHistory,
undefined,
undefined,
'subagent',
);
} catch (e: unknown) {
await reportError(
e,
`Error initializing Gemini chat for agent ${this.definition.name}.`,
startHistory,
'startChat',
);
// Re-throw as a more specific error after reporting.
throw new Error(`Failed to create chat object: ${getErrorMessage(e)}`);
}
}
/**
* Executes function calls requested by the model and returns the results.
*
* @returns A new `Content` object for history, any submitted output, and completion status.
*/
private async processFunctionCalls(
functionCalls: FunctionCall[],
signal: AbortSignal,
promptId: string,
onWaitingForConfirmation?: (waiting: boolean) => void,
): Promise<{
nextMessage: Content;
submittedOutput: string | null;
taskCompleted: boolean;
aborted: boolean;
}> {
const allowedToolNames = new Set(this.toolRegistry.getAllToolNames());
// Always allow the completion tool
allowedToolNames.add(TASK_COMPLETE_TOOL_NAME);
let submittedOutput: string | null = null;
let taskCompleted = false;
let aborted = false;
// We'll separate complete_task from other tools
const toolRequests: ToolCallRequestInfo[] = [];
// Map to keep track of tool name by callId for activity emission
const toolNameMap = new Map<string, string>();
// Synchronous results (like complete_task or unauthorized calls)
const syncResults = new Map<string, Part>();
for (const [index, functionCall] of functionCalls.entries()) {
const callId = functionCall.id ?? `${promptId}-${index}`;
const args = functionCall.args ?? {};
// eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion
const toolName = functionCall.name as string;
let displayName = toolName;
let description: string | undefined = undefined;
try {
const tool = this.toolRegistry.getTool(toolName);
if (tool) {
displayName = tool.displayName ?? toolName;
const invocation = tool.build(args);
description = invocation.getDescription();
}
} catch {
// Ignore errors during formatting for activity emission
}
this.emitActivity('TOOL_CALL_START', {
name: toolName,
displayName,
description,
args,
callId,
});
if (toolName === TASK_COMPLETE_TOOL_NAME) {
if (taskCompleted) {
const error =
'Task already marked complete in this turn. Ignoring duplicate call.';
syncResults.set(callId, {
functionResponse: {
name: TASK_COMPLETE_TOOL_NAME,
response: { error },
id: callId,
},
});
this.emitActivity('ERROR', {
context: 'tool_call',
name: toolName,
error,
});
continue;
}
const { outputConfig } = this.definition;
taskCompleted = true; // Signal completion regardless of output presence
if (outputConfig) {
const outputName = outputConfig.outputName;
if (args[outputName] !== undefined) {
const outputValue = args[outputName];
const validationResult = outputConfig.schema.safeParse(outputValue);
if (!validationResult.success) {
taskCompleted = false; // Validation failed, revoke completion
const error = `Output validation failed: ${JSON.stringify(validationResult.error.flatten())}`;
syncResults.set(callId, {
functionResponse: {
name: TASK_COMPLETE_TOOL_NAME,
response: { error },
id: callId,
},
});
this.emitActivity('ERROR', {
context: 'tool_call',
name: toolName,
error,
});
continue;
}
// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment
const validatedOutput = validationResult.data;
if (this.definition.processOutput) {
submittedOutput = this.definition.processOutput(validatedOutput);
} else {
submittedOutput =
typeof outputValue === 'string'
? outputValue
: JSON.stringify(outputValue, null, 2);
}
syncResults.set(callId, {
functionResponse: {
name: TASK_COMPLETE_TOOL_NAME,
response: { result: 'Output submitted and task completed.' },
id: callId,
},
});
this.emitActivity('TOOL_CALL_END', {
name: toolName,
id: callId,
output: 'Output submitted and task completed.',
});
} else {
// Failed to provide required output.
taskCompleted = false; // Revoke completion status
const error = `Missing required argument '${outputName}' for completion.`;
syncResults.set(callId, {
functionResponse: {
name: TASK_COMPLETE_TOOL_NAME,
response: { error },
id: callId,
},
});
this.emitActivity('ERROR', {
context: 'tool_call',
name: toolName,
callId,
error,
});
}
} else {
// No outputConfig - use default 'result' parameter
const resultArg = args['result'];
if (
resultArg !== undefined &&
resultArg !== null &&
resultArg !== ''
) {
submittedOutput =
typeof resultArg === 'string'
? resultArg
: JSON.stringify(resultArg, null, 2);
syncResults.set(callId, {
functionResponse: {
name: TASK_COMPLETE_TOOL_NAME,
response: { status: 'Result submitted and task completed.' },
id: callId,
},
});
this.emitActivity('TOOL_CALL_END', {
name: toolName,
id: callId,
output: 'Result submitted and task completed.',
});
} else {
// No result provided - this is an error for agents expected to return results
taskCompleted = false; // Revoke completion
const error =
'Missing required "result" argument. You must provide your findings when calling complete_task.';
syncResults.set(callId, {
functionResponse: {
name: TASK_COMPLETE_TOOL_NAME,
response: { error },
id: callId,
},
});
this.emitActivity('ERROR', {
context: 'tool_call',
name: toolName,
callId,
error,
});
}
}
continue;
}
// Handle standard tools
if (!allowedToolNames.has(toolName)) {
const error = createUnauthorizedToolError(toolName);
debugLogger.warn(`[LocalAgentExecutor] Blocked call: ${error}`);
syncResults.set(callId, {
functionResponse: {
name: toolName,
id: callId,
response: { error },
},
});
this.emitActivity('ERROR', {
context: 'tool_call_unauthorized',
name: toolName,
callId,
error,
});
continue;
}
toolRequests.push({
callId,
name: toolName,
args,
isClientInitiated: false, // These are coming from the subagent (the "model")
prompt_id: promptId,
});
toolNameMap.set(callId, toolName);
}
// Execute standard tool calls using the new scheduler
if (toolRequests.length > 0) {
const completedCalls = await scheduleAgentTools(
this.config,
toolRequests,
{
schedulerId: this.agentId,
parentCallId: this.parentCallId,
toolRegistry: this.toolRegistry,
signal,
onWaitingForConfirmation,
},
);
for (const call of completedCalls) {
const toolName =
toolNameMap.get(call.request.callId) || call.request.name;
if (call.status === 'success') {
this.emitActivity('TOOL_CALL_END', {
name: toolName,
id: call.request.callId,
output: call.response.resultDisplay,
});
} else if (call.status === 'error') {
this.emitActivity('ERROR', {
context: 'tool_call',
name: toolName,
callId: call.request.callId,
error: call.response.error?.message || 'Unknown error',
});
} else if (call.status === 'cancelled') {
this.emitActivity('ERROR', {
context: 'tool_call',
name: toolName,
callId: call.request.callId,
error: 'Request cancelled.',
});
aborted = true;
}
// Add result to syncResults to preserve order later
syncResults.set(call.request.callId, call.response.responseParts[0]);
}
}
// Reconstruct toolResponseParts in the original order
const toolResponseParts: Part[] = [];
for (const [index, functionCall] of functionCalls.entries()) {
const callId = functionCall.id ?? `${promptId}-${index}`;
const part = syncResults.get(callId);
if (part) {
toolResponseParts.push(part);
}
}
// If all authorized tool calls failed (and task isn't complete), provide a generic error.
if (
functionCalls.length > 0 &&
toolResponseParts.length === 0 &&
!taskCompleted
) {
toolResponseParts.push({
text: 'All tool calls failed or were unauthorized. Please analyze the errors and try an alternative approach.',
});
}
return {
nextMessage: { role: 'user', parts: toolResponseParts },
submittedOutput,
taskCompleted,
aborted,
};
}
/**
* Prepares the list of tool function declarations to be sent to the model.
*/
private prepareToolsList(): FunctionDeclaration[] {
const toolsList: FunctionDeclaration[] = [];
const { toolConfig, outputConfig } = this.definition;
if (toolConfig) {
const toolNamesToLoad: string[] = [];
for (const toolRef of toolConfig.tools) {
if (typeof toolRef === 'string') {
toolNamesToLoad.push(toolRef);
} else if (typeof toolRef === 'object' && 'schema' in toolRef) {
// Tool instance with an explicit schema property.
toolsList.push(toolRef.schema);
} else {
// Raw `FunctionDeclaration` object.
toolsList.push(toolRef);
}
}
// Add schemas from tools that were registered by name.
toolsList.push(
...this.toolRegistry.getFunctionDeclarationsFiltered(toolNamesToLoad),
);
}
// Always inject complete_task.
// Configure its schema based on whether output is expected.
const completeTool: FunctionDeclaration = {
name: TASK_COMPLETE_TOOL_NAME,
description: outputConfig
? 'Call this tool to submit your final answer and complete the task. This is the ONLY way to finish.'
: 'Call this tool to submit your final findings and complete the task. This is the ONLY way to finish.',
parameters: {
type: Type.OBJECT,
properties: {},
required: [],
},
};
if (outputConfig) {
const jsonSchema = zodToJsonSchema(outputConfig.schema);
const {
$schema: _$schema,
definitions: _definitions,
...schema
} = jsonSchema;
completeTool.parameters!.properties![outputConfig.outputName] =
// eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion
schema as Schema;
completeTool.parameters!.required!.push(outputConfig.outputName);
} else {
completeTool.parameters!.properties!['result'] = {
type: Type.STRING,
description:
'Your final results or findings to return to the orchestrator. ' +
'Ensure this is comprehensive and follows any formatting requested in your instructions.',
};
completeTool.parameters!.required!.push('result');
}
toolsList.push(completeTool);
return toolsList;
}
/** Builds the system prompt from the agent definition and inputs. */
private async buildSystemPrompt(inputs: AgentInputs): Promise<string> {
const { promptConfig } = this.definition;
if (!promptConfig.systemPrompt) {
return '';
}
// Inject user inputs into the prompt template.
let finalPrompt = templateString(promptConfig.systemPrompt, inputs);
// Append environment context (CWD and folder structure).
const dirContext = await getDirectoryContextString(this.config);
finalPrompt += `\n\n# Environment Context\n${dirContext}`;
// Append standard rules for non-interactive execution.
finalPrompt += `
Important Rules:
* You are running in a non-interactive mode. You CANNOT ask the user for input or clarification.
* Work systematically using available tools to complete your task.
* Always use absolute paths for file operations. Construct them using the provided "Environment Context".`;
if (this.definition.outputConfig) {
finalPrompt += `
* When you have completed your task, you MUST call the \`${TASK_COMPLETE_TOOL_NAME}\` tool with your structured output.
* Do not call any other tools in the same turn as \`${TASK_COMPLETE_TOOL_NAME}\`.
* This is the ONLY way to complete your mission. If you stop calling tools without calling this, you have failed.`;
} else {
finalPrompt += `
* When you have completed your task, you MUST call the \`${TASK_COMPLETE_TOOL_NAME}\` tool.
* You MUST include your final findings in the "result" parameter. This is how you return the necessary results for the task to be marked complete.
* Ensure your findings are comprehensive and follow any specific formatting requirements provided in your instructions.
* Do not call any other tools in the same turn as \`${TASK_COMPLETE_TOOL_NAME}\`.
* This is the ONLY way to complete your mission. If you stop calling tools without calling this, you have failed.`;
}
return finalPrompt;
}
/**
* Applies template strings to initial messages.
*
* @param initialMessages The initial messages from the prompt config.
* @param inputs The validated input parameters for this invocation.
* @returns A new array of `Content` with templated strings.
*/
private applyTemplateToInitialMessages(
initialMessages: Content[],
inputs: AgentInputs,
): Content[] {
return initialMessages.map((content) => {
const newParts = (content.parts ?? []).map((part) => {
if ('text' in part && part.text !== undefined) {
return { text: templateString(part.text, inputs) };
}
return part;
});
return { ...content, parts: newParts };
});
}
/**
* Checks if the agent should terminate due to exceeding configured limits.
*
* @returns The reason for termination, or `null` if execution can continue.
*/
private checkTermination(
turnCounter: number,
maxTurns: number,
): AgentTerminateMode | null {
if (turnCounter >= maxTurns) {
return AgentTerminateMode.MAX_TURNS;
}
return null;
}
/** Emits an activity event to the configured callback. */
private emitActivity(
type: SubagentActivityEvent['type'],
data: Record<string, unknown>,
): void {
if (this.onActivity) {
const event: SubagentActivityEvent = {
isSubagentActivityEvent: true,
agentName: this.definition.name,
type,
data,
};
this.onActivity(event);
}
}
}