mirror of
https://github.com/google-gemini/gemini-cli.git
synced 2026-03-11 22:51:00 -07:00
1146 lines
35 KiB
TypeScript
1146 lines
35 KiB
TypeScript
/**
|
|
* @license
|
|
* Copyright 2025 Google LLC
|
|
* SPDX-License-Identifier: Apache-2.0
|
|
*/
|
|
|
|
import {
|
|
CoreToolScheduler,
|
|
type GeminiClient,
|
|
GeminiEventType,
|
|
ToolConfirmationOutcome,
|
|
ApprovalMode,
|
|
getAllMCPServerStatuses,
|
|
MCPServerStatus,
|
|
isNodeError,
|
|
getErrorMessage,
|
|
parseAndFormatApiError,
|
|
safeLiteralReplace,
|
|
DEFAULT_GUI_EDITOR,
|
|
type AnyDeclarativeTool,
|
|
type ToolCall,
|
|
type ToolConfirmationPayload,
|
|
type CompletedToolCall,
|
|
type ToolCallRequestInfo,
|
|
type ServerGeminiErrorEvent,
|
|
type ServerGeminiStreamEvent,
|
|
type ToolCallConfirmationDetails,
|
|
type Config,
|
|
type UserTierId,
|
|
type AnsiOutput,
|
|
EDIT_TOOL_NAMES,
|
|
processRestorableToolCalls,
|
|
} from '@google/gemini-cli-core';
|
|
import type { RequestContext, ExecutionEventBus } from '@a2a-js/sdk/server';
|
|
import type {
|
|
TaskStatusUpdateEvent,
|
|
TaskArtifactUpdateEvent,
|
|
TaskState,
|
|
Message,
|
|
Part,
|
|
Artifact,
|
|
} from '@a2a-js/sdk';
|
|
import { v4 as uuidv4 } from 'uuid';
|
|
import { logger } from '../utils/logger.js';
|
|
import * as fs from 'node:fs/promises';
|
|
import * as path from 'node:path';
|
|
import { CoderAgentEvent } from '../types.js';
|
|
import type {
|
|
CoderAgentMessage,
|
|
StateChange,
|
|
ToolCallUpdate,
|
|
TextContent,
|
|
TaskMetadata,
|
|
Thought,
|
|
ThoughtSummary,
|
|
Citation,
|
|
} from '../types.js';
|
|
import type { PartUnion, Part as genAiPart } from '@google/genai';
|
|
|
|
type UnionKeys<T> = T extends T ? keyof T : never;
|
|
|
|
type ConfirmationType = ToolCallConfirmationDetails['type'];
|
|
|
|
const VALID_CONFIRMATION_TYPES: readonly ConfirmationType[] = [
|
|
'edit',
|
|
'exec',
|
|
'mcp',
|
|
'info',
|
|
'ask_user',
|
|
'exit_plan_mode',
|
|
] as const;
|
|
|
|
function isToolCallConfirmationDetails(
|
|
value: unknown,
|
|
): value is ToolCallConfirmationDetails {
|
|
if (
|
|
typeof value !== 'object' ||
|
|
value === null ||
|
|
!('onConfirm' in value) ||
|
|
typeof value.onConfirm !== 'function' ||
|
|
!('type' in value) ||
|
|
typeof value.type !== 'string'
|
|
) {
|
|
return false;
|
|
}
|
|
return (VALID_CONFIRMATION_TYPES as readonly string[]).includes(value.type);
|
|
}
|
|
|
|
export class Task {
|
|
id: string;
|
|
contextId: string;
|
|
scheduler: CoreToolScheduler;
|
|
config: Config;
|
|
geminiClient: GeminiClient;
|
|
pendingToolConfirmationDetails: Map<string, ToolCallConfirmationDetails>;
|
|
taskState: TaskState;
|
|
eventBus?: ExecutionEventBus;
|
|
completedToolCalls: CompletedToolCall[];
|
|
skipFinalTrueAfterInlineEdit = false;
|
|
modelInfo?: string;
|
|
currentPromptId: string | undefined;
|
|
promptCount = 0;
|
|
autoExecute: boolean;
|
|
|
|
// For tool waiting logic
|
|
private pendingToolCalls: Map<string, string> = new Map(); //toolCallId --> status
|
|
private toolCompletionPromise?: Promise<void>;
|
|
private toolCompletionNotifier?: {
|
|
resolve: () => void;
|
|
reject: (reason?: Error) => void;
|
|
};
|
|
|
|
private constructor(
|
|
id: string,
|
|
contextId: string,
|
|
config: Config,
|
|
eventBus?: ExecutionEventBus,
|
|
autoExecute = false,
|
|
) {
|
|
this.id = id;
|
|
this.contextId = contextId;
|
|
this.config = config;
|
|
this.scheduler = this.createScheduler();
|
|
this.geminiClient = this.config.getGeminiClient();
|
|
this.pendingToolConfirmationDetails = new Map();
|
|
this.taskState = 'submitted';
|
|
this.eventBus = eventBus;
|
|
this.completedToolCalls = [];
|
|
this._resetToolCompletionPromise();
|
|
this.autoExecute = autoExecute;
|
|
this.config.setFallbackModelHandler(
|
|
// For a2a-server, we want to automatically switch to the fallback model
|
|
// for future requests without retrying the current one. The 'stop'
|
|
// intent achieves this.
|
|
async () => 'stop',
|
|
);
|
|
}
|
|
|
|
static async create(
|
|
id: string,
|
|
contextId: string,
|
|
config: Config,
|
|
eventBus?: ExecutionEventBus,
|
|
autoExecute?: boolean,
|
|
): Promise<Task> {
|
|
return new Task(id, contextId, config, eventBus, autoExecute);
|
|
}
|
|
|
|
// Note: `getAllMCPServerStatuses` retrieves the status of all MCP servers for the entire
|
|
// process. This is not scoped to the individual task but reflects the global connection
|
|
// state managed within the @gemini-cli/core module.
|
|
async getMetadata(): Promise<TaskMetadata> {
|
|
const toolRegistry = this.config.getToolRegistry();
|
|
const mcpServers = this.config.getMcpClientManager()?.getMcpServers() || {};
|
|
const serverStatuses = getAllMCPServerStatuses();
|
|
const servers = Object.keys(mcpServers).map((serverName) => ({
|
|
name: serverName,
|
|
status: serverStatuses.get(serverName) || MCPServerStatus.DISCONNECTED,
|
|
tools: toolRegistry.getToolsByServer(serverName).map((tool) => ({
|
|
name: tool.name,
|
|
description: tool.description,
|
|
parameterSchema: tool.schema.parameters,
|
|
})),
|
|
}));
|
|
|
|
const availableTools = toolRegistry.getAllTools().map((tool) => ({
|
|
name: tool.name,
|
|
description: tool.description,
|
|
parameterSchema: tool.schema.parameters,
|
|
}));
|
|
|
|
const metadata: TaskMetadata = {
|
|
id: this.id,
|
|
contextId: this.contextId,
|
|
taskState: this.taskState,
|
|
model: this.modelInfo || this.config.getModel(),
|
|
mcpServers: servers,
|
|
availableTools,
|
|
};
|
|
return metadata;
|
|
}
|
|
|
|
private _resetToolCompletionPromise(): void {
|
|
this.toolCompletionPromise = new Promise((resolve, reject) => {
|
|
this.toolCompletionNotifier = { resolve, reject };
|
|
});
|
|
// If there are no pending calls when reset, resolve immediately.
|
|
if (this.pendingToolCalls.size === 0 && this.toolCompletionNotifier) {
|
|
this.toolCompletionNotifier.resolve();
|
|
}
|
|
}
|
|
|
|
private _registerToolCall(toolCallId: string, status: string): void {
|
|
const wasEmpty = this.pendingToolCalls.size === 0;
|
|
this.pendingToolCalls.set(toolCallId, status);
|
|
if (wasEmpty) {
|
|
this._resetToolCompletionPromise();
|
|
}
|
|
logger.info(
|
|
`[Task] Registered tool call: ${toolCallId}. Pending: ${this.pendingToolCalls.size}`,
|
|
);
|
|
}
|
|
|
|
private _resolveToolCall(toolCallId: string): void {
|
|
if (this.pendingToolCalls.has(toolCallId)) {
|
|
this.pendingToolCalls.delete(toolCallId);
|
|
logger.info(
|
|
`[Task] Resolved tool call: ${toolCallId}. Pending: ${this.pendingToolCalls.size}`,
|
|
);
|
|
if (this.pendingToolCalls.size === 0 && this.toolCompletionNotifier) {
|
|
this.toolCompletionNotifier.resolve();
|
|
}
|
|
}
|
|
}
|
|
|
|
async waitForPendingTools(): Promise<void> {
|
|
if (this.pendingToolCalls.size === 0) {
|
|
return Promise.resolve();
|
|
}
|
|
logger.info(
|
|
`[Task] Waiting for ${this.pendingToolCalls.size} pending tool(s)...`,
|
|
);
|
|
return this.toolCompletionPromise;
|
|
}
|
|
|
|
cancelPendingTools(reason: string): void {
|
|
if (this.pendingToolCalls.size > 0) {
|
|
logger.info(
|
|
`[Task] Cancelling all ${this.pendingToolCalls.size} pending tool calls. Reason: ${reason}`,
|
|
);
|
|
}
|
|
if (this.toolCompletionNotifier) {
|
|
this.toolCompletionNotifier.reject(new Error(reason));
|
|
}
|
|
this.pendingToolCalls.clear();
|
|
// Reset the promise for any future operations, ensuring it's in a clean state.
|
|
this._resetToolCompletionPromise();
|
|
}
|
|
|
|
private _createTextMessage(
|
|
text: string,
|
|
role: 'agent' | 'user' = 'agent',
|
|
): Message {
|
|
return {
|
|
kind: 'message',
|
|
role,
|
|
parts: [{ kind: 'text', text }],
|
|
messageId: uuidv4(),
|
|
taskId: this.id,
|
|
contextId: this.contextId,
|
|
};
|
|
}
|
|
|
|
private _createStatusUpdateEvent(
|
|
stateToReport: TaskState,
|
|
coderAgentMessage: CoderAgentMessage,
|
|
message?: Message,
|
|
final = false,
|
|
timestamp?: string,
|
|
metadataError?: string,
|
|
traceId?: string,
|
|
): TaskStatusUpdateEvent {
|
|
const metadata: {
|
|
coderAgent: CoderAgentMessage;
|
|
model: string;
|
|
userTier?: UserTierId;
|
|
error?: string;
|
|
traceId?: string;
|
|
} = {
|
|
coderAgent: coderAgentMessage,
|
|
model: this.modelInfo || this.config.getModel(),
|
|
userTier: this.config.getUserTier(),
|
|
};
|
|
|
|
if (metadataError) {
|
|
metadata.error = metadataError;
|
|
}
|
|
|
|
if (traceId) {
|
|
metadata.traceId = traceId;
|
|
}
|
|
|
|
return {
|
|
kind: 'status-update',
|
|
taskId: this.id,
|
|
contextId: this.contextId,
|
|
status: {
|
|
state: stateToReport,
|
|
message, // Shorthand property
|
|
timestamp: timestamp || new Date().toISOString(),
|
|
},
|
|
final,
|
|
metadata,
|
|
};
|
|
}
|
|
|
|
setTaskStateAndPublishUpdate(
|
|
newState: TaskState,
|
|
coderAgentMessage: CoderAgentMessage,
|
|
messageText?: string,
|
|
messageParts?: Part[], // For more complex messages
|
|
final = false,
|
|
metadataError?: string,
|
|
traceId?: string,
|
|
): void {
|
|
this.taskState = newState;
|
|
let message: Message | undefined;
|
|
|
|
if (messageText) {
|
|
message = this._createTextMessage(messageText);
|
|
} else if (messageParts) {
|
|
message = {
|
|
kind: 'message',
|
|
role: 'agent',
|
|
parts: messageParts,
|
|
messageId: uuidv4(),
|
|
taskId: this.id,
|
|
contextId: this.contextId,
|
|
};
|
|
}
|
|
|
|
const event = this._createStatusUpdateEvent(
|
|
this.taskState,
|
|
coderAgentMessage,
|
|
message,
|
|
final,
|
|
undefined,
|
|
metadataError,
|
|
traceId,
|
|
);
|
|
this.eventBus?.publish(event);
|
|
}
|
|
|
|
private _schedulerOutputUpdate(
|
|
toolCallId: string,
|
|
outputChunk: string | AnsiOutput,
|
|
): void {
|
|
let outputAsText: string;
|
|
if (typeof outputChunk === 'string') {
|
|
outputAsText = outputChunk;
|
|
} else {
|
|
outputAsText = outputChunk
|
|
.map((line) => line.map((token) => token.text).join(''))
|
|
.join('\n');
|
|
}
|
|
|
|
logger.info(
|
|
'[Task] Scheduler output update for tool call ' +
|
|
toolCallId +
|
|
': ' +
|
|
outputAsText,
|
|
);
|
|
const artifact: Artifact = {
|
|
artifactId: `tool-${toolCallId}-output`,
|
|
parts: [
|
|
{
|
|
kind: 'text',
|
|
text: outputAsText,
|
|
} as Part,
|
|
],
|
|
};
|
|
const artifactEvent: TaskArtifactUpdateEvent = {
|
|
kind: 'artifact-update',
|
|
taskId: this.id,
|
|
contextId: this.contextId,
|
|
artifact,
|
|
append: true,
|
|
lastChunk: false,
|
|
};
|
|
this.eventBus?.publish(artifactEvent);
|
|
}
|
|
|
|
private async _schedulerAllToolCallsComplete(
|
|
completedToolCalls: CompletedToolCall[],
|
|
): Promise<void> {
|
|
logger.info(
|
|
'[Task] All tool calls completed by scheduler (batch):',
|
|
completedToolCalls.map((tc) => tc.request.callId),
|
|
);
|
|
this.completedToolCalls.push(...completedToolCalls);
|
|
completedToolCalls.forEach((tc) => {
|
|
this._resolveToolCall(tc.request.callId);
|
|
});
|
|
}
|
|
|
|
private _schedulerToolCallsUpdate(toolCalls: ToolCall[]): void {
|
|
logger.info(
|
|
'[Task] Scheduler tool calls updated:',
|
|
toolCalls.map((tc) => `${tc.request.callId} (${tc.status})`),
|
|
);
|
|
|
|
// Update state and send continuous, non-final updates
|
|
toolCalls.forEach((tc) => {
|
|
const previousStatus = this.pendingToolCalls.get(tc.request.callId);
|
|
const hasChanged = previousStatus !== tc.status;
|
|
|
|
// Resolve tool call if it has reached a terminal state
|
|
if (['success', 'error', 'cancelled'].includes(tc.status)) {
|
|
this._resolveToolCall(tc.request.callId);
|
|
} else {
|
|
// This will update the map
|
|
this._registerToolCall(tc.request.callId, tc.status);
|
|
}
|
|
|
|
if (tc.status === 'awaiting_approval' && tc.confirmationDetails) {
|
|
const details = tc.confirmationDetails;
|
|
if (isToolCallConfirmationDetails(details)) {
|
|
this.pendingToolConfirmationDetails.set(tc.request.callId, details);
|
|
}
|
|
}
|
|
|
|
// Only send an update if the status has actually changed.
|
|
if (hasChanged) {
|
|
const coderAgentMessage: CoderAgentMessage =
|
|
tc.status === 'awaiting_approval'
|
|
? { kind: CoderAgentEvent.ToolCallConfirmationEvent }
|
|
: { kind: CoderAgentEvent.ToolCallUpdateEvent };
|
|
const message = this.toolStatusMessage(tc, this.id, this.contextId);
|
|
|
|
const event = this._createStatusUpdateEvent(
|
|
this.taskState,
|
|
coderAgentMessage,
|
|
message,
|
|
false, // Always false for these continuous updates
|
|
);
|
|
this.eventBus?.publish(event);
|
|
}
|
|
});
|
|
|
|
if (
|
|
this.autoExecute ||
|
|
this.config.getApprovalMode() === ApprovalMode.YOLO
|
|
) {
|
|
logger.info(
|
|
'[Task] ' +
|
|
(this.autoExecute ? '' : 'YOLO mode enabled. ') +
|
|
'Auto-approving all tool calls.',
|
|
);
|
|
toolCalls.forEach((tc: ToolCall) => {
|
|
if (tc.status === 'awaiting_approval' && tc.confirmationDetails) {
|
|
const details = tc.confirmationDetails;
|
|
if (isToolCallConfirmationDetails(details)) {
|
|
// eslint-disable-next-line @typescript-eslint/no-floating-promises
|
|
details.onConfirm(ToolConfirmationOutcome.ProceedOnce);
|
|
this.pendingToolConfirmationDetails.delete(tc.request.callId);
|
|
}
|
|
}
|
|
});
|
|
return;
|
|
}
|
|
|
|
const allPendingStatuses = Array.from(this.pendingToolCalls.values());
|
|
const isAwaitingApproval = allPendingStatuses.some(
|
|
(status) => status === 'awaiting_approval',
|
|
);
|
|
const isExecuting = allPendingStatuses.some(
|
|
(status) => status === 'executing',
|
|
);
|
|
|
|
// The turn is complete and requires user input if at least one tool
|
|
// is waiting for the user's decision, and no other tool is actively
|
|
// running in the background.
|
|
if (
|
|
isAwaitingApproval &&
|
|
!isExecuting &&
|
|
!this.skipFinalTrueAfterInlineEdit
|
|
) {
|
|
this.skipFinalTrueAfterInlineEdit = false;
|
|
|
|
// We don't need to send another message, just a final status update.
|
|
this.setTaskStateAndPublishUpdate(
|
|
'input-required',
|
|
{ kind: CoderAgentEvent.StateChangeEvent },
|
|
undefined,
|
|
undefined,
|
|
/*final*/ true,
|
|
);
|
|
}
|
|
}
|
|
|
|
private createScheduler(): CoreToolScheduler {
|
|
const scheduler = new CoreToolScheduler({
|
|
outputUpdateHandler: this._schedulerOutputUpdate.bind(this),
|
|
onAllToolCallsComplete: this._schedulerAllToolCallsComplete.bind(this),
|
|
onToolCallsUpdate: this._schedulerToolCallsUpdate.bind(this),
|
|
getPreferredEditor: () => DEFAULT_GUI_EDITOR,
|
|
config: this.config,
|
|
});
|
|
return scheduler;
|
|
}
|
|
|
|
private _pickFields<
|
|
T extends ToolCall | AnyDeclarativeTool,
|
|
K extends UnionKeys<T>,
|
|
>(from: T, ...fields: K[]): Partial<T> {
|
|
const ret: Partial<T> = {};
|
|
for (const field of fields) {
|
|
if (field in from && from[field] !== undefined) {
|
|
ret[field] = from[field];
|
|
}
|
|
}
|
|
return ret;
|
|
}
|
|
|
|
private toolStatusMessage(
|
|
tc: ToolCall,
|
|
taskId: string,
|
|
contextId: string,
|
|
): Message {
|
|
const messageParts: Part[] = [];
|
|
|
|
// Create a serializable version of the ToolCall (pick necessary
|
|
// properties/avoid methods causing circular reference errors).
|
|
// Type allows tool to be Partial<AnyDeclarativeTool> for serialization.
|
|
const serializableToolCall: Partial<Omit<ToolCall, 'tool'>> & {
|
|
tool?: Partial<AnyDeclarativeTool>;
|
|
} = this._pickFields(
|
|
tc,
|
|
'request',
|
|
'status',
|
|
'confirmationDetails',
|
|
'liveOutput',
|
|
'response',
|
|
);
|
|
|
|
if (tc.tool) {
|
|
const toolFields = this._pickFields(
|
|
tc.tool,
|
|
'name',
|
|
'displayName',
|
|
'description',
|
|
'kind',
|
|
'isOutputMarkdown',
|
|
'canUpdateOutput',
|
|
'schema',
|
|
'parameterSchema',
|
|
);
|
|
serializableToolCall.tool = toolFields;
|
|
}
|
|
|
|
messageParts.push({
|
|
kind: 'data',
|
|
data: serializableToolCall,
|
|
} as Part);
|
|
|
|
return {
|
|
kind: 'message',
|
|
role: 'agent',
|
|
parts: messageParts,
|
|
messageId: uuidv4(),
|
|
taskId,
|
|
contextId,
|
|
};
|
|
}
|
|
|
|
private async getProposedContent(
|
|
file_path: string,
|
|
old_string: string,
|
|
new_string: string,
|
|
): Promise<string> {
|
|
// Validate path to prevent path traversal vulnerabilities
|
|
const resolvedPath = path.resolve(this.config.getTargetDir(), file_path);
|
|
const pathError = this.config.validatePathAccess(resolvedPath, 'read');
|
|
if (pathError) {
|
|
throw new Error(`Path validation failed: ${pathError}`);
|
|
}
|
|
|
|
try {
|
|
const currentContent = await fs.readFile(resolvedPath, 'utf8');
|
|
return this._applyReplacement(
|
|
currentContent,
|
|
old_string,
|
|
new_string,
|
|
old_string === '' && currentContent === '',
|
|
);
|
|
} catch (err) {
|
|
if (!isNodeError(err) || err.code !== 'ENOENT') throw err;
|
|
return '';
|
|
}
|
|
}
|
|
|
|
private _applyReplacement(
|
|
currentContent: string | null,
|
|
oldString: string,
|
|
newString: string,
|
|
isNewFile: boolean,
|
|
): string {
|
|
if (isNewFile) {
|
|
return newString;
|
|
}
|
|
if (currentContent === null) {
|
|
// Should not happen if not a new file, but defensively return empty or newString if oldString is also empty
|
|
return oldString === '' ? newString : '';
|
|
}
|
|
// If oldString is empty and it's not a new file, do not modify the content.
|
|
if (oldString === '' && !isNewFile) {
|
|
return currentContent;
|
|
}
|
|
|
|
// Use intelligent replacement that handles $ sequences safely
|
|
return safeLiteralReplace(currentContent, oldString, newString);
|
|
}
|
|
|
|
async scheduleToolCalls(
|
|
requests: ToolCallRequestInfo[],
|
|
abortSignal: AbortSignal,
|
|
): Promise<void> {
|
|
if (requests.length === 0) {
|
|
return;
|
|
}
|
|
|
|
// Set checkpoint file before any file modification tool executes
|
|
const restorableToolCalls = requests.filter((request) =>
|
|
EDIT_TOOL_NAMES.has(request.name),
|
|
);
|
|
|
|
if (
|
|
restorableToolCalls.length > 0 &&
|
|
this.config.getCheckpointingEnabled()
|
|
) {
|
|
const gitService = await this.config.getGitService();
|
|
if (gitService) {
|
|
const { checkpointsToWrite, toolCallToCheckpointMap, errors } =
|
|
await processRestorableToolCalls(
|
|
restorableToolCalls,
|
|
gitService,
|
|
this.geminiClient,
|
|
);
|
|
|
|
if (errors.length > 0) {
|
|
errors.forEach((error) => logger.error(error));
|
|
}
|
|
|
|
if (checkpointsToWrite.size > 0) {
|
|
const checkpointDir =
|
|
this.config.storage.getProjectTempCheckpointsDir();
|
|
await fs.mkdir(checkpointDir, { recursive: true });
|
|
for (const [fileName, content] of checkpointsToWrite) {
|
|
const filePath = path.join(checkpointDir, fileName);
|
|
await fs.writeFile(filePath, content);
|
|
}
|
|
}
|
|
|
|
for (const request of requests) {
|
|
const checkpoint = toolCallToCheckpointMap.get(request.callId);
|
|
if (checkpoint) {
|
|
request.checkpoint = checkpoint;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
const updatedRequests = await Promise.all(
|
|
requests.map(async (request) => {
|
|
if (
|
|
request.name === 'replace' &&
|
|
request.args &&
|
|
!request.args['newContent'] &&
|
|
request.args['file_path'] &&
|
|
request.args['old_string'] &&
|
|
request.args['new_string']
|
|
) {
|
|
const filePath = request.args['file_path'];
|
|
const oldString = request.args['old_string'];
|
|
const newString = request.args['new_string'];
|
|
if (
|
|
typeof filePath === 'string' &&
|
|
typeof oldString === 'string' &&
|
|
typeof newString === 'string'
|
|
) {
|
|
// Resolve and validate path to prevent path traversal (user-controlled file_path).
|
|
const resolvedPath = path.resolve(
|
|
this.config.getTargetDir(),
|
|
filePath,
|
|
);
|
|
const pathError = this.config.validatePathAccess(
|
|
resolvedPath,
|
|
'read',
|
|
);
|
|
if (!pathError) {
|
|
const newContent = await this.getProposedContent(
|
|
resolvedPath,
|
|
oldString,
|
|
newString,
|
|
);
|
|
return { ...request, args: { ...request.args, newContent } };
|
|
}
|
|
}
|
|
}
|
|
return request;
|
|
}),
|
|
);
|
|
|
|
logger.info(
|
|
`[Task] Scheduling batch of ${updatedRequests.length} tool calls.`,
|
|
);
|
|
const stateChange: StateChange = {
|
|
kind: CoderAgentEvent.StateChangeEvent,
|
|
};
|
|
this.setTaskStateAndPublishUpdate('working', stateChange);
|
|
|
|
await this.scheduler.schedule(updatedRequests, abortSignal);
|
|
}
|
|
|
|
async acceptAgentMessage(event: ServerGeminiStreamEvent): Promise<void> {
|
|
const stateChange: StateChange = {
|
|
kind: CoderAgentEvent.StateChangeEvent,
|
|
};
|
|
const traceId =
|
|
'traceId' in event && event.traceId ? event.traceId : undefined;
|
|
|
|
switch (event.type) {
|
|
case GeminiEventType.Content:
|
|
logger.info('[Task] Sending agent message content...');
|
|
this._sendTextContent(event.value, traceId);
|
|
break;
|
|
case GeminiEventType.ToolCallRequest:
|
|
// This is now handled by the agent loop, which collects all requests
|
|
// and calls scheduleToolCalls once.
|
|
logger.warn(
|
|
'[Task] A single tool call request was passed to acceptAgentMessage. This should be handled in a batch by the agent. Ignoring.',
|
|
);
|
|
break;
|
|
case GeminiEventType.ToolCallResponse:
|
|
// This event type from ServerGeminiStreamEvent might be for when LLM *generates* a tool response part.
|
|
// The actual execution result comes via user message.
|
|
logger.info(
|
|
'[Task] Received tool call response from LLM (part of generation):',
|
|
event.value,
|
|
);
|
|
break;
|
|
case GeminiEventType.ToolCallConfirmation:
|
|
// This is when LLM requests confirmation, not when user provides it.
|
|
logger.info(
|
|
'[Task] Received tool call confirmation request from LLM:',
|
|
event.value.request.callId,
|
|
);
|
|
this.pendingToolConfirmationDetails.set(
|
|
event.value.request.callId,
|
|
event.value.details,
|
|
);
|
|
// This will be handled by the scheduler and _schedulerToolCallsUpdate will set InputRequired if needed.
|
|
// No direct state change here, scheduler drives it.
|
|
break;
|
|
case GeminiEventType.UserCancelled:
|
|
logger.info('[Task] Received user cancelled event from LLM stream.');
|
|
this.cancelPendingTools('User cancelled via LLM stream event');
|
|
this.setTaskStateAndPublishUpdate(
|
|
'input-required',
|
|
stateChange,
|
|
'Task cancelled by user',
|
|
undefined,
|
|
true,
|
|
undefined,
|
|
traceId,
|
|
);
|
|
break;
|
|
case GeminiEventType.Thought:
|
|
logger.info('[Task] Sending agent thought...');
|
|
this._sendThought(event.value, traceId);
|
|
break;
|
|
case GeminiEventType.Citation:
|
|
logger.info('[Task] Received citation from LLM stream.');
|
|
this._sendCitation(event.value);
|
|
break;
|
|
case GeminiEventType.ChatCompressed:
|
|
break;
|
|
case GeminiEventType.Finished:
|
|
logger.info(`[Task ${this.id}] Agent finished its turn.`);
|
|
break;
|
|
case GeminiEventType.ModelInfo:
|
|
this.modelInfo = event.value;
|
|
break;
|
|
case GeminiEventType.Retry:
|
|
case GeminiEventType.InvalidStream:
|
|
// An invalid stream should trigger a retry, which requires no action from the user.
|
|
break;
|
|
case GeminiEventType.Error:
|
|
default: {
|
|
// Use type guard instead of unsafe type assertion
|
|
let errorEvent: ServerGeminiErrorEvent | undefined;
|
|
if (
|
|
event.type === GeminiEventType.Error &&
|
|
event.value &&
|
|
typeof event.value === 'object' &&
|
|
'error' in event.value
|
|
) {
|
|
errorEvent = event;
|
|
}
|
|
const errorMessage = errorEvent?.value?.error
|
|
? getErrorMessage(errorEvent.value.error)
|
|
: 'Unknown error from LLM stream';
|
|
logger.error(
|
|
'[Task] Received error event from LLM stream:',
|
|
errorMessage,
|
|
);
|
|
|
|
let errMessage = `Unknown error from LLM stream: ${JSON.stringify(event)}`;
|
|
if (errorEvent?.value?.error) {
|
|
errMessage = parseAndFormatApiError(errorEvent.value.error);
|
|
}
|
|
this.cancelPendingTools(`LLM stream error: ${errorMessage}`);
|
|
this.setTaskStateAndPublishUpdate(
|
|
this.taskState,
|
|
stateChange,
|
|
`Agent Error, unknown agent message: ${errorMessage}`,
|
|
undefined,
|
|
false,
|
|
errMessage,
|
|
traceId,
|
|
);
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
private async _handleToolConfirmationPart(part: Part): Promise<boolean> {
|
|
if (
|
|
part.kind !== 'data' ||
|
|
!part.data ||
|
|
typeof part.data['callId'] !== 'string' ||
|
|
typeof part.data['outcome'] !== 'string'
|
|
) {
|
|
return false;
|
|
}
|
|
|
|
const callId = part.data['callId'];
|
|
const outcomeString = part.data['outcome'];
|
|
let confirmationOutcome: ToolConfirmationOutcome | undefined;
|
|
|
|
if (outcomeString === 'proceed_once') {
|
|
confirmationOutcome = ToolConfirmationOutcome.ProceedOnce;
|
|
} else if (outcomeString === 'cancel') {
|
|
confirmationOutcome = ToolConfirmationOutcome.Cancel;
|
|
} else if (outcomeString === 'proceed_always') {
|
|
confirmationOutcome = ToolConfirmationOutcome.ProceedAlways;
|
|
} else if (outcomeString === 'proceed_always_server') {
|
|
confirmationOutcome = ToolConfirmationOutcome.ProceedAlwaysServer;
|
|
} else if (outcomeString === 'proceed_always_tool') {
|
|
confirmationOutcome = ToolConfirmationOutcome.ProceedAlwaysTool;
|
|
} else if (outcomeString === 'modify_with_editor') {
|
|
confirmationOutcome = ToolConfirmationOutcome.ModifyWithEditor;
|
|
} else {
|
|
logger.warn(
|
|
`[Task] Unknown tool confirmation outcome: "${outcomeString}" for callId: ${callId}`,
|
|
);
|
|
return false;
|
|
}
|
|
|
|
const confirmationDetails = this.pendingToolConfirmationDetails.get(callId);
|
|
|
|
if (!confirmationDetails) {
|
|
logger.warn(
|
|
`[Task] Received tool confirmation for unknown or already processed callId: ${callId}`,
|
|
);
|
|
return false;
|
|
}
|
|
|
|
logger.info(
|
|
`[Task] Handling tool confirmation for callId: ${callId} with outcome: ${outcomeString}`,
|
|
);
|
|
try {
|
|
// Temporarily unset GCP environment variables so they do not leak into
|
|
// tool calls.
|
|
const gcpProject = process.env['GOOGLE_CLOUD_PROJECT'];
|
|
const gcpCreds = process.env['GOOGLE_APPLICATION_CREDENTIALS'];
|
|
try {
|
|
delete process.env['GOOGLE_CLOUD_PROJECT'];
|
|
delete process.env['GOOGLE_APPLICATION_CREDENTIALS'];
|
|
|
|
// This will trigger the scheduler to continue or cancel the specific tool.
|
|
// The scheduler's onToolCallsUpdate will then reflect the new state (e.g., executing or cancelled).
|
|
|
|
// If `edit` tool call, pass updated payload if presesent
|
|
if (confirmationDetails.type === 'edit') {
|
|
const newContent = part.data['newContent'];
|
|
const payload =
|
|
typeof newContent === 'string'
|
|
? ({ newContent } as ToolConfirmationPayload)
|
|
: undefined;
|
|
this.skipFinalTrueAfterInlineEdit = !!payload;
|
|
try {
|
|
await confirmationDetails.onConfirm(confirmationOutcome, payload);
|
|
} finally {
|
|
// Once confirmationDetails.onConfirm finishes (or fails) with a payload,
|
|
// reset skipFinalTrueAfterInlineEdit so that external callers receive
|
|
// their call has been completed.
|
|
this.skipFinalTrueAfterInlineEdit = false;
|
|
}
|
|
} else {
|
|
await confirmationDetails.onConfirm(confirmationOutcome);
|
|
}
|
|
} finally {
|
|
if (gcpProject) {
|
|
process.env['GOOGLE_CLOUD_PROJECT'] = gcpProject;
|
|
}
|
|
if (gcpCreds) {
|
|
process.env['GOOGLE_APPLICATION_CREDENTIALS'] = gcpCreds;
|
|
}
|
|
}
|
|
|
|
// Do not delete if modifying, a subsequent tool confirmation for the same
|
|
// callId will be passed with ProceedOnce/Cancel/etc
|
|
// Note !== ToolConfirmationOutcome.ModifyWithEditor does not work!
|
|
if (confirmationOutcome !== 'modify_with_editor') {
|
|
this.pendingToolConfirmationDetails.delete(callId);
|
|
}
|
|
|
|
// If outcome is Cancel, scheduler should update status to 'cancelled', which then resolves the tool.
|
|
// If ProceedOnce, scheduler updates to 'executing', then eventually 'success'/'error', which resolves.
|
|
return true;
|
|
} catch (error) {
|
|
logger.error(
|
|
`[Task] Error during tool confirmation for callId ${callId}:`,
|
|
error,
|
|
);
|
|
// If confirming fails, we should probably mark this tool as failed
|
|
this._resolveToolCall(callId); // Resolve it as it won't proceed.
|
|
const errorMessageText =
|
|
error instanceof Error
|
|
? error.message
|
|
: `Error processing tool confirmation for ${callId}`;
|
|
const message = this._createTextMessage(errorMessageText);
|
|
const toolCallUpdate: ToolCallUpdate = {
|
|
kind: CoderAgentEvent.ToolCallUpdateEvent,
|
|
};
|
|
const event = this._createStatusUpdateEvent(
|
|
this.taskState,
|
|
toolCallUpdate,
|
|
message,
|
|
false,
|
|
);
|
|
this.eventBus?.publish(event);
|
|
return false;
|
|
}
|
|
}
|
|
|
|
getAndClearCompletedTools(): CompletedToolCall[] {
|
|
const tools = [...this.completedToolCalls];
|
|
this.completedToolCalls = [];
|
|
return tools;
|
|
}
|
|
|
|
addToolResponsesToHistory(completedTools: CompletedToolCall[]): void {
|
|
logger.info(
|
|
`[Task] Adding ${completedTools.length} tool responses to history without generating a new response.`,
|
|
);
|
|
const responsesToAdd = completedTools.flatMap(
|
|
(toolCall) => toolCall.response.responseParts,
|
|
);
|
|
|
|
for (const response of responsesToAdd) {
|
|
let parts: genAiPart[];
|
|
if (Array.isArray(response)) {
|
|
parts = response;
|
|
} else if (typeof response === 'string') {
|
|
parts = [{ text: response }];
|
|
} else {
|
|
parts = [response];
|
|
}
|
|
// eslint-disable-next-line @typescript-eslint/no-floating-promises
|
|
this.geminiClient.addHistory({
|
|
role: 'user',
|
|
parts,
|
|
});
|
|
}
|
|
}
|
|
|
|
async *sendCompletedToolsToLlm(
|
|
completedToolCalls: CompletedToolCall[],
|
|
aborted: AbortSignal,
|
|
): AsyncGenerator<ServerGeminiStreamEvent> {
|
|
if (completedToolCalls.length === 0) {
|
|
yield* (async function* () {})(); // Yield nothing
|
|
return;
|
|
}
|
|
|
|
const llmParts: PartUnion[] = [];
|
|
logger.info(
|
|
`[Task] Feeding ${completedToolCalls.length} tool responses to LLM.`,
|
|
);
|
|
for (const completedToolCall of completedToolCalls) {
|
|
logger.info(
|
|
`[Task] Adding tool response for "${completedToolCall.request.name}" (callId: ${completedToolCall.request.callId}) to LLM input.`,
|
|
);
|
|
const responseParts = completedToolCall.response.responseParts;
|
|
if (Array.isArray(responseParts)) {
|
|
llmParts.push(...responseParts);
|
|
} else {
|
|
llmParts.push(responseParts);
|
|
}
|
|
}
|
|
|
|
logger.info('[Task] Sending new parts to agent.');
|
|
const stateChange: StateChange = {
|
|
kind: CoderAgentEvent.StateChangeEvent,
|
|
};
|
|
// Set task state to working as we are about to call LLM
|
|
this.setTaskStateAndPublishUpdate('working', stateChange);
|
|
yield* this.geminiClient.sendMessageStream(
|
|
llmParts,
|
|
aborted,
|
|
completedToolCalls[0]?.request.prompt_id ?? '',
|
|
);
|
|
}
|
|
|
|
async *acceptUserMessage(
|
|
requestContext: RequestContext,
|
|
aborted: AbortSignal,
|
|
): AsyncGenerator<ServerGeminiStreamEvent> {
|
|
const userMessage = requestContext.userMessage;
|
|
const llmParts: PartUnion[] = [];
|
|
let anyConfirmationHandled = false;
|
|
let hasContentForLlm = false;
|
|
|
|
for (const part of userMessage.parts) {
|
|
const confirmationHandled = await this._handleToolConfirmationPart(part);
|
|
if (confirmationHandled) {
|
|
anyConfirmationHandled = true;
|
|
// If a confirmation was handled, the scheduler will now run the tool (or cancel it).
|
|
// We don't send anything to the LLM for this part.
|
|
// The subsequent tool execution will eventually lead to resolveToolCall.
|
|
continue;
|
|
}
|
|
|
|
if (part.kind === 'text') {
|
|
llmParts.push({ text: part.text });
|
|
hasContentForLlm = true;
|
|
}
|
|
}
|
|
|
|
if (hasContentForLlm) {
|
|
this.currentPromptId =
|
|
this.config.getSessionId() + '########' + this.promptCount++;
|
|
logger.info('[Task] Sending new parts to LLM.');
|
|
const stateChange: StateChange = {
|
|
kind: CoderAgentEvent.StateChangeEvent,
|
|
};
|
|
// Set task state to working as we are about to call LLM
|
|
this.setTaskStateAndPublishUpdate('working', stateChange);
|
|
yield* this.geminiClient.sendMessageStream(
|
|
llmParts,
|
|
aborted,
|
|
this.currentPromptId,
|
|
);
|
|
} else if (anyConfirmationHandled) {
|
|
logger.info(
|
|
'[Task] User message only contained tool confirmations. Scheduler is active. No new input for LLM this turn.',
|
|
);
|
|
// Ensure task state reflects that scheduler might be working due to confirmation.
|
|
// If scheduler is active, it will emit its own status updates.
|
|
// If all pending tools were just confirmed, waitForPendingTools will handle the wait.
|
|
// If some tools are still pending approval, scheduler would have set InputRequired.
|
|
// If not, and no new text, we are just waiting.
|
|
if (
|
|
this.pendingToolCalls.size > 0 &&
|
|
this.taskState !== 'input-required'
|
|
) {
|
|
const stateChange: StateChange = {
|
|
kind: CoderAgentEvent.StateChangeEvent,
|
|
};
|
|
this.setTaskStateAndPublishUpdate('working', stateChange); // Reflect potential background activity
|
|
}
|
|
yield* (async function* () {})(); // Yield nothing
|
|
} else {
|
|
logger.info(
|
|
'[Task] No relevant parts in user message for LLM interaction or tool confirmation.',
|
|
);
|
|
// If there's no new text and no confirmations, and no pending tools,
|
|
// it implies we might need to signal input required if nothing else is happening.
|
|
// However, the agent.ts will make this determination after waitForPendingTools.
|
|
yield* (async function* () {})(); // Yield nothing
|
|
}
|
|
}
|
|
|
|
_sendTextContent(content: string, traceId?: string): void {
|
|
if (content === '') {
|
|
return;
|
|
}
|
|
logger.info('[Task] Sending text content to event bus.');
|
|
const message = this._createTextMessage(content);
|
|
const textContent: TextContent = {
|
|
kind: CoderAgentEvent.TextContentEvent,
|
|
};
|
|
this.eventBus?.publish(
|
|
this._createStatusUpdateEvent(
|
|
this.taskState,
|
|
textContent,
|
|
message,
|
|
false,
|
|
undefined,
|
|
undefined,
|
|
traceId,
|
|
),
|
|
);
|
|
}
|
|
|
|
_sendThought(content: ThoughtSummary, traceId?: string): void {
|
|
if (!content.subject && !content.description) {
|
|
return;
|
|
}
|
|
logger.info('[Task] Sending thought to event bus.');
|
|
const message: Message = {
|
|
kind: 'message',
|
|
role: 'agent',
|
|
parts: [
|
|
{
|
|
kind: 'data',
|
|
data: content,
|
|
} as Part,
|
|
],
|
|
messageId: uuidv4(),
|
|
taskId: this.id,
|
|
contextId: this.contextId,
|
|
};
|
|
const thought: Thought = {
|
|
kind: CoderAgentEvent.ThoughtEvent,
|
|
};
|
|
this.eventBus?.publish(
|
|
this._createStatusUpdateEvent(
|
|
this.taskState,
|
|
thought,
|
|
message,
|
|
false,
|
|
undefined,
|
|
undefined,
|
|
traceId,
|
|
),
|
|
);
|
|
}
|
|
|
|
_sendCitation(citation: string) {
|
|
if (!citation || citation.trim() === '') {
|
|
return;
|
|
}
|
|
logger.info('[Task] Sending citation to event bus.');
|
|
const message = this._createTextMessage(citation);
|
|
const citationEvent: Citation = {
|
|
kind: CoderAgentEvent.CitationEvent,
|
|
};
|
|
this.eventBus?.publish(
|
|
this._createStatusUpdateEvent(this.taskState, citationEvent, message),
|
|
);
|
|
}
|
|
}
|