diff --git a/packages/core/src/agents/remote-invocation.test.ts b/packages/core/src/agents/remote-invocation.test.ts index 5d712b5d71..ef37a1800a 100644 --- a/packages/core/src/agents/remote-invocation.test.ts +++ b/packages/core/src/agents/remote-invocation.test.ts @@ -585,7 +585,7 @@ describe('RemoteAgentInvocation', () => { ); }); - it('should support Ctrl+B backgrounding through ShellExecutionService', async () => { + it('should support Ctrl+B backgrounding through execution lifecycle IDs', async () => { mockClientManager.getClient.mockReturnValue({}); let releaseSecondChunk: (() => void) | undefined; @@ -618,7 +618,7 @@ describe('RemoteAgentInvocation', () => { let unsubscribeStream: (() => void) | undefined; const updateOutput = vi.fn((output: unknown) => { - if (output === 'Chunk 1' && executionId) { + if (output === 'Chunk 1' && executionId !== undefined) { ShellExecutionService.background(executionId); unsubscribeStream = ShellExecutionService.subscribe( executionId, diff --git a/packages/core/src/agents/remote-invocation.ts b/packages/core/src/agents/remote-invocation.ts index da1ff3128e..2b8b6268c7 100644 --- a/packages/core/src/agents/remote-invocation.ts +++ b/packages/core/src/agents/remote-invocation.ts @@ -28,7 +28,15 @@ import type { AuthenticationHandler } from '@a2a-js/sdk/client'; import { debugLogger } from '../utils/debugLogger.js'; import type { AnsiOutput } from '../utils/terminalSerializer.js'; import { A2AAuthProviderFactory } from './auth-provider/factory.js'; -import { ExecutionLifecycleService } from '../services/executionLifecycleService.js'; +import { + ExecutionLifecycleService, + type ExecutionResult, +} from '../services/executionLifecycleService.js'; + +const OUTPUT_RESYNC_MARKER = '\n\n[Output updated]\n'; +const REMOTE_AGENT_ERROR_PREFIX = 'Error calling remote agent: '; +const MISSING_EXECUTION_ID_MESSAGE = + 'Error calling remote agent: missing execution ID.'; /** * Authentication handler implementation using Google Application Default Credentials (ADC). @@ -147,6 +155,87 @@ export class RemoteAgentInvocation extends BaseToolInvocation< }; } + private restoreSessionState(): void { + const priorState = RemoteAgentInvocation.sessionState.get( + this.definition.name, + ); + if (!priorState) { + return; + } + this.contextId = priorState.contextId; + this.taskId = priorState.taskId; + } + + private persistSessionState(): void { + // Persist state even on partial failures or aborts to maintain conversational continuity. + RemoteAgentInvocation.sessionState.set(this.definition.name, { + contextId: this.contextId, + taskId: this.taskId, + }); + } + + private updateSessionStateFromResponseChunk(chunk: SendMessageResult): void { + const { contextId: newContextId, taskId: newTaskId, clearTaskId } = + extractIdsFromResponse(chunk); + + if (newContextId) { + this.contextId = newContextId; + } + + this.taskId = clearTaskId ? undefined : (newTaskId ?? this.taskId); + } + + private buildBackgroundResult( + executionId: number, + output: string, + ): ToolResult { + const command = `${this.getDescription()}: ${this.params.query}`; + const backgroundMessage = `Remote agent moved to background (Execution ID: ${executionId}). Output hidden. Press Ctrl+B to view.`; + const data: BackgroundExecutionData = { + executionId, + pid: executionId, + command, + initialOutput: output, + }; + + return { + llmContent: [{ text: backgroundMessage }], + returnDisplay: backgroundMessage, + data, + }; + } + + private buildCompletionResult( + executionResult: ExecutionResult, + executionId: number, + ): ToolResult { + if (executionResult.backgrounded) { + return this.buildBackgroundResult(executionId, executionResult.output); + } + + if (executionResult.error) { + const fullDisplay = executionResult.output + ? `${executionResult.output}\n\n${executionResult.error.message}` + : executionResult.error.message; + return { + llmContent: [{ text: fullDisplay }], + returnDisplay: fullDisplay, + error: { message: executionResult.error.message }, + }; + } + + return { + llmContent: [{ text: executionResult.output }], + returnDisplay: executionResult.output, + }; + } + + private formatRemoteAgentError(error: unknown): string { + return `${REMOTE_AGENT_ERROR_PREFIX}${ + error instanceof Error ? error.message : String(error) + }`; + } + private publishBackgroundDelta( executionId: number, previousOutput: string, @@ -168,13 +257,13 @@ export class RemoteAgentInvocation extends BaseToolInvocation< // the full latest snapshot with a clear separator. ExecutionLifecycleService.appendOutput( executionId, - `\n\n[Output updated]\n${nextOutput}`, + `${OUTPUT_RESYNC_MARKER}${nextOutput}`, ); return nextOutput; } async execute( - _signal: AbortSignal, + signal: AbortSignal, updateOutput?: (output: string | AnsiOutput) => void, _shellExecutionConfig?: unknown, setExecutionIdCallback?: (executionId: number) => void, @@ -182,37 +271,28 @@ export class RemoteAgentInvocation extends BaseToolInvocation< const reassembler = new A2AResultReassembler(); const executionController = new AbortController(); const onAbort = () => executionController.abort(); - _signal.addEventListener('abort', onAbort, { once: true }); + signal.addEventListener('abort', onAbort, { once: true }); - const { pid, result } = ExecutionLifecycleService.createExecution( + const { pid: executionId, result } = ExecutionLifecycleService.createVirtualExecution( '', () => executionController.abort(), ); - if (pid === undefined) { - _signal.removeEventListener('abort', onAbort); + if (executionId === undefined) { + signal.removeEventListener('abort', onAbort); return { - llmContent: [ - { text: 'Error calling remote agent: missing execution pid.' }, - ], - returnDisplay: 'Error calling remote agent: missing execution pid.', + llmContent: [{ text: MISSING_EXECUTION_ID_MESSAGE }], + returnDisplay: MISSING_EXECUTION_ID_MESSAGE, error: { - message: 'Error calling remote agent: missing execution pid.', + message: MISSING_EXECUTION_ID_MESSAGE, }, }; } - const backgroundExecutionId = pid; - setExecutionIdCallback?.(backgroundExecutionId); + setExecutionIdCallback?.(executionId); - const run = async () => { + const streamRemoteAgentExecution = async () => { let lastOutput = ''; try { - const priorState = RemoteAgentInvocation.sessionState.get( - this.definition.name, - ); - if (priorState) { - this.contextId = priorState.contextId; - this.taskId = priorState.taskId; - } + this.restoreSessionState(); const authHandler = await this.getAuthHandler(); @@ -245,25 +325,13 @@ export class RemoteAgentInvocation extends BaseToolInvocation< const currentOutput = reassembler.toString(); lastOutput = this.publishBackgroundDelta( - backgroundExecutionId, + executionId, lastOutput, currentOutput, ); - if (updateOutput) { - updateOutput(currentOutput); - } + updateOutput?.(currentOutput); - const { - contextId: newContextId, - taskId: newTaskId, - clearTaskId, - } = extractIdsFromResponse(chunk); - - if (newContextId) { - this.contextId = newContextId; - } - - this.taskId = clearTaskId ? undefined : (newTaskId ?? this.taskId); + this.updateSessionStateFromResponseChunk(chunk); } if (!finalResponse) { @@ -274,67 +342,29 @@ export class RemoteAgentInvocation extends BaseToolInvocation< `[RemoteAgent] Final response from ${this.definition.name}:\n${JSON.stringify(finalResponse, null, 2)}`, ); - ExecutionLifecycleService.completeExecution(backgroundExecutionId, { + ExecutionLifecycleService.completeExecution(executionId, { exitCode: 0, }); } catch (error: unknown) { const partialOutput = reassembler.toString(); lastOutput = this.publishBackgroundDelta( - backgroundExecutionId, + executionId, lastOutput, partialOutput, ); - const errorMessage = `Error calling remote agent: ${ - error instanceof Error ? error.message : String(error) - }`; - ExecutionLifecycleService.completeExecution(backgroundExecutionId, { - error: new Error(errorMessage), + ExecutionLifecycleService.completeExecution(executionId, { + error: new Error(this.formatRemoteAgentError(error)), aborted: executionController.signal.aborted, exitCode: executionController.signal.aborted ? 130 : 1, }); } finally { - _signal.removeEventListener('abort', onAbort); - // Persist state even on partial failures or aborts to maintain conversational continuity. - RemoteAgentInvocation.sessionState.set(this.definition.name, { - contextId: this.contextId, - taskId: this.taskId, - }); + signal.removeEventListener('abort', onAbort); + this.persistSessionState(); } }; - void run(); + void streamRemoteAgentExecution(); const executionResult = await result; - - if (executionResult.backgrounded) { - const command = `${this.getDescription()}: ${this.params.query}`; - const backgroundMessage = `Remote agent moved to background (PID: ${backgroundExecutionId}). Output hidden. Press Ctrl+B to view.`; - const data: BackgroundExecutionData = { - executionId: backgroundExecutionId, - pid: backgroundExecutionId, - command, - initialOutput: executionResult.output, - }; - return { - llmContent: [{ text: backgroundMessage }], - returnDisplay: backgroundMessage, - data, - }; - } - - if (executionResult.error) { - const fullDisplay = executionResult.output - ? `${executionResult.output}\n\n${executionResult.error.message}` - : executionResult.error.message; - return { - llmContent: [{ text: fullDisplay }], - returnDisplay: fullDisplay, - error: { message: executionResult.error.message }, - }; - } - - return { - llmContent: [{ text: executionResult.output }], - returnDisplay: executionResult.output, - }; + return this.buildCompletionResult(executionResult, executionId); } }