diff --git a/packages/core/src/agents/remote-invocation.test.ts b/packages/core/src/agents/remote-invocation.test.ts index 02c655ec27..8f2d3157de 100644 --- a/packages/core/src/agents/remote-invocation.test.ts +++ b/packages/core/src/agents/remote-invocation.test.ts @@ -22,6 +22,7 @@ import type { RemoteAgentDefinition } from './types.js'; import { createMockMessageBus } from '../test-utils/mock-message-bus.js'; import { A2AAuthProviderFactory } from './auth-provider/factory.js'; import type { A2AAuthProvider } from './auth-provider/types.js'; +import { ShellExecutionService } from '../services/shellExecutionService.js'; // Mock A2AClientManager vi.mock('./a2a-client-manager.js', () => ({ @@ -583,6 +584,88 @@ describe('RemoteAgentInvocation', () => { 'Generating...\n\nArtifact (Result):\nPart 1 Part 2', ); }); + + it('should support Ctrl+B backgrounding through ShellExecutionService', async () => { + mockClientManager.getClient.mockReturnValue({}); + + let releaseSecondChunk: (() => void) | undefined; + const secondChunkGate = new Promise((resolve) => { + releaseSecondChunk = resolve; + }); + + mockClientManager.sendMessageStream.mockImplementation( + async function* () { + yield { + kind: 'message', + messageId: 'msg-1', + role: 'agent', + parts: [{ kind: 'text', text: 'Chunk 1' }], + }; + await secondChunkGate; + yield { + kind: 'message', + messageId: 'msg-2', + role: 'agent', + parts: [{ kind: 'text', text: 'Chunk 2' }], + }; + }, + ); + + let pid: number | undefined; + const onExit = vi.fn(); + let unsubscribeOnExit: (() => void) | undefined; + const streamedOutputChunks: string[] = []; + let unsubscribeStream: (() => void) | undefined; + + const updateOutput = vi.fn((output: unknown) => { + if (output === 'Chunk 1' && pid) { + ShellExecutionService.background(pid); + unsubscribeStream = ShellExecutionService.subscribe(pid, (event) => { + if (event.type === 'data' && typeof event.chunk === 'string') { + streamedOutputChunks.push(event.chunk); + } + }); + } + }); + + const invocation = new RemoteAgentInvocation( + mockDefinition, + { query: 'long task' }, + mockMessageBus, + ); + + const resultPromise = invocation.execute( + new AbortController().signal, + updateOutput, + undefined, + (newPid) => { + pid = newPid; + unsubscribeOnExit = ShellExecutionService.onExit(newPid, onExit); + }, + ); + + const result = await resultPromise; + expect(pid).toBeDefined(); + expect(result.returnDisplay).toContain( + 'Remote agent moved to background', + ); + expect(result.data).toMatchObject({ + pid, + initialOutput: 'Chunk 1', + }); + + releaseSecondChunk?.(); + + await vi.waitFor(() => { + expect(onExit).toHaveBeenCalledWith(0, undefined); + }); + await vi.waitFor(() => { + expect(streamedOutputChunks.join('')).toContain('Chunk 2'); + }); + + unsubscribeStream?.(); + unsubscribeOnExit?.(); + }); }); describe('Confirmations', () => { diff --git a/packages/core/src/agents/remote-invocation.ts b/packages/core/src/agents/remote-invocation.ts index a8c75ec51c..728cdc8fb6 100644 --- a/packages/core/src/agents/remote-invocation.ts +++ b/packages/core/src/agents/remote-invocation.ts @@ -27,6 +27,7 @@ import type { AuthenticationHandler } from '@a2a-js/sdk/client'; import { debugLogger } from '../utils/debugLogger.js'; import type { AnsiOutput } from '../utils/terminalSerializer.js'; import { A2AAuthProviderFactory } from './auth-provider/factory.js'; +import { ExecutionLifecycleService } from '../services/executionLifecycleService.js'; /** * Authentication handler implementation using Google Application Default Credentials (ADC). @@ -145,102 +146,192 @@ export class RemoteAgentInvocation extends BaseToolInvocation< }; } + private publishBackgroundDelta( + pid: number, + previousOutput: string, + nextOutput: string, + ): string { + if (nextOutput.length === 0 || nextOutput === previousOutput) { + return previousOutput; + } + + if (nextOutput.startsWith(previousOutput)) { + ExecutionLifecycleService.appendOutput( + pid, + nextOutput.slice(previousOutput.length), + ); + return nextOutput; + } + + // If the reassembled output changes non-monotonically, resync by appending + // the full latest snapshot with a clear separator. + ExecutionLifecycleService.appendOutput( + pid, + `\n\n[Output updated]\n${nextOutput}`, + ); + return nextOutput; + } + async execute( _signal: AbortSignal, updateOutput?: (output: string | AnsiOutput) => void, + _shellExecutionConfig?: unknown, + setPidCallback?: (pid: number) => void, ): Promise { - // 1. Ensure the agent is loaded (cached by manager) - // We assume the user has provided an access token via some mechanism (TODO), - // or we rely on ADC. const reassembler = new A2AResultReassembler(); - try { - const priorState = RemoteAgentInvocation.sessionState.get( - this.definition.name, - ); - if (priorState) { - this.contextId = priorState.contextId; - this.taskId = priorState.taskId; - } + const executionController = new AbortController(); + const onAbort = () => executionController.abort(); + _signal.addEventListener('abort', onAbort, { once: true }); - const authHandler = await this.getAuthHandler(); + const { pid, result } = ExecutionLifecycleService.createExecution( + '', + () => executionController.abort(), + ); + if (pid === undefined) { + _signal.removeEventListener('abort', onAbort); + return { + llmContent: [ + { text: 'Error calling remote agent: missing execution pid.' }, + ], + returnDisplay: 'Error calling remote agent: missing execution pid.', + error: { + message: 'Error calling remote agent: missing execution pid.', + }, + }; + } + const backgroundPid = pid; + setPidCallback?.(backgroundPid); - if (!this.clientManager.getClient(this.definition.name)) { - await this.clientManager.loadAgent( + const run = async () => { + let lastOutput = ''; + try { + const priorState = RemoteAgentInvocation.sessionState.get( this.definition.name, - this.definition.agentCardUrl, - authHandler, ); - } + if (priorState) { + this.contextId = priorState.contextId; + this.taskId = priorState.taskId; + } - const message = this.params.query; + const authHandler = await this.getAuthHandler(); - const stream = this.clientManager.sendMessageStream( - this.definition.name, - message, - { + if (!this.clientManager.getClient(this.definition.name)) { + await this.clientManager.loadAgent( + this.definition.name, + this.definition.agentCardUrl, + authHandler, + ); + } + + const stream = this.clientManager.sendMessageStream( + this.definition.name, + this.params.query, + { + contextId: this.contextId, + taskId: this.taskId, + signal: executionController.signal, + }, + ); + + let finalResponse: SendMessageResult | undefined; + + for await (const chunk of stream) { + if (executionController.signal.aborted) { + throw new Error('Operation aborted'); + } + finalResponse = chunk; + reassembler.update(chunk); + + const currentOutput = reassembler.toString(); + lastOutput = this.publishBackgroundDelta( + backgroundPid, + lastOutput, + currentOutput, + ); + if (updateOutput) { + updateOutput(currentOutput); + } + + const { + contextId: newContextId, + taskId: newTaskId, + clearTaskId, + } = extractIdsFromResponse(chunk); + + if (newContextId) { + this.contextId = newContextId; + } + + this.taskId = clearTaskId ? undefined : (newTaskId ?? this.taskId); + } + + if (!finalResponse) { + throw new Error('No response from remote agent.'); + } + + debugLogger.debug( + `[RemoteAgent] Final response from ${this.definition.name}:\n${JSON.stringify(finalResponse, null, 2)}`, + ); + + ExecutionLifecycleService.completeExecution(backgroundPid, { + exitCode: 0, + }); + } catch (error: unknown) { + const partialOutput = reassembler.toString(); + lastOutput = this.publishBackgroundDelta( + backgroundPid, + lastOutput, + partialOutput, + ); + const errorMessage = `Error calling remote agent: ${ + error instanceof Error ? error.message : String(error) + }`; + ExecutionLifecycleService.completeExecution(backgroundPid, { + error: new Error(errorMessage), + aborted: executionController.signal.aborted, + exitCode: executionController.signal.aborted ? 130 : 1, + }); + } finally { + _signal.removeEventListener('abort', onAbort); + // Persist state even on partial failures or aborts to maintain conversational continuity. + RemoteAgentInvocation.sessionState.set(this.definition.name, { contextId: this.contextId, taskId: this.taskId, - signal: _signal, - }, - ); - - let finalResponse: SendMessageResult | undefined; - - for await (const chunk of stream) { - if (_signal.aborted) { - throw new Error('Operation aborted'); - } - finalResponse = chunk; - reassembler.update(chunk); - - if (updateOutput) { - updateOutput(reassembler.toString()); - } - - const { - contextId: newContextId, - taskId: newTaskId, - clearTaskId, - } = extractIdsFromResponse(chunk); - - if (newContextId) { - this.contextId = newContextId; - } - - this.taskId = clearTaskId ? undefined : (newTaskId ?? this.taskId); + }); } + }; - if (!finalResponse) { - throw new Error('No response from remote agent.'); - } - - const finalOutput = reassembler.toString(); - - debugLogger.debug( - `[RemoteAgent] Final response from ${this.definition.name}:\n${JSON.stringify(finalResponse, null, 2)}`, - ); + void run(); + const executionResult = await result; + if (executionResult.backgrounded) { + const command = `${this.getDescription()}: ${this.params.query}`; + const backgroundMessage = `Remote agent moved to background (PID: ${backgroundPid}). Output hidden. Press Ctrl+B to view.`; return { - llmContent: [{ text: finalOutput }], - returnDisplay: finalOutput, + llmContent: [{ text: backgroundMessage }], + returnDisplay: backgroundMessage, + data: { + pid: backgroundPid, + command, + initialOutput: executionResult.output, + }, }; - } catch (error: unknown) { - const partialOutput = reassembler.toString(); - const errorMessage = `Error calling remote agent: ${error instanceof Error ? error.message : String(error)}`; - const fullDisplay = partialOutput - ? `${partialOutput}\n\n${errorMessage}` - : errorMessage; + } + + if (executionResult.error) { + const fullDisplay = executionResult.output + ? `${executionResult.output}\n\n${executionResult.error.message}` + : executionResult.error.message; return { llmContent: [{ text: fullDisplay }], returnDisplay: fullDisplay, - error: { message: errorMessage }, + error: { message: executionResult.error.message }, }; - } finally { - // Persist state even on partial failures or aborts to maintain conversational continuity. - RemoteAgentInvocation.sessionState.set(this.definition.name, { - contextId: this.contextId, - taskId: this.taskId, - }); } + + return { + llmContent: [{ text: executionResult.output }], + returnDisplay: executionResult.output, + }; } }