From 6f7658cb17f3bd020279ca62716300c101005b97 Mon Sep 17 00:00:00 2001 From: Adam Weidman Date: Thu, 12 Mar 2026 00:29:19 -0400 Subject: [PATCH] feat(core): integrate remote agents with ExecutionLifecycleService for backgrounding Wire RemoteAgentInvocation to use ExecutionLifecycleService.createExecution() so remote agents can be backgrounded (Ctrl+B), subscribed to, and killed through the same lifecycle system as shell commands. - Create virtual execution on execute(), report ID via setExecutionIdCallback - Stream output deltas via appendOutput() so lifecycle subscribers see live data - Separate streaming into detached processStream() that settles the lifecycle - Support backgrounding: handle.result resolves with backgrounded=true, stream continues running, returns BackgroundExecutionData - Support kill: lifecycle onKill aborts the stream via AbortController --- .../core/src/agents/remote-invocation.test.ts | 260 ++++++++++++++++++ packages/core/src/agents/remote-invocation.ts | 155 +++++++++-- 2 files changed, 389 insertions(+), 26 deletions(-) diff --git a/packages/core/src/agents/remote-invocation.test.ts b/packages/core/src/agents/remote-invocation.test.ts index e186cc7aa9..beeeea7a63 100644 --- a/packages/core/src/agents/remote-invocation.test.ts +++ b/packages/core/src/agents/remote-invocation.test.ts @@ -22,6 +22,7 @@ import type { RemoteAgentDefinition } from './types.js'; import { createMockMessageBus } from '../test-utils/mock-message-bus.js'; import { A2AAuthProviderFactory } from './auth-provider/factory.js'; import type { A2AAuthProvider } from './auth-provider/types.js'; +import { ExecutionLifecycleService } from '../services/executionLifecycleService.js'; // Mock A2AClientManager vi.mock('./a2a-client-manager.js', () => ({ @@ -58,6 +59,7 @@ describe('RemoteAgentInvocation', () => { beforeEach(() => { vi.clearAllMocks(); + ExecutionLifecycleService.resetForTest(); (A2AClientManager.getInstance as Mock).mockReturnValue(mockClientManager); ( RemoteAgentInvocation as unknown as { @@ -587,6 +589,264 @@ describe('RemoteAgentInvocation', () => { }); }); + describe('Lifecycle Integration', () => { + it('should call setExecutionIdCallback with lifecycle execution ID', async () => { + mockClientManager.getClient.mockReturnValue({}); + mockClientManager.sendMessageStream.mockImplementation( + async function* () { + yield { + kind: 'message', + messageId: 'msg-1', + role: 'agent', + parts: [{ kind: 'text', text: 'Response' }], + }; + }, + ); + + const callback = vi.fn(); + const invocation = new RemoteAgentInvocation( + mockDefinition, + { query: 'hi' }, + mockMessageBus, + ); + await invocation.execute( + new AbortController().signal, + undefined, + undefined, + callback, + ); + + expect(callback).toHaveBeenCalledExactlyOnceWith(expect.any(Number)); + }); + + it('should feed output deltas to lifecycle service subscribers', async () => { + mockClientManager.getClient.mockReturnValue({}); + mockClientManager.sendMessageStream.mockImplementation( + async function* () { + yield { + kind: 'message', + messageId: 'msg-1', + role: 'agent', + parts: [{ kind: 'text', text: 'Hello' }], + }; + yield { + kind: 'message', + messageId: 'msg-1', + role: 'agent', + parts: [{ kind: 'text', text: 'Hello World' }], + }; + }, + ); + + const receivedChunks: string[] = []; + const invocation = new RemoteAgentInvocation( + mockDefinition, + { query: 'hi' }, + mockMessageBus, + ); + + await invocation.execute( + new AbortController().signal, + undefined, + undefined, + (id) => { + // Subscribe immediately when we get the execution ID + ExecutionLifecycleService.subscribe(id, (event) => { + if (event.type === 'data' && typeof event.chunk === 'string') { + receivedChunks.push(event.chunk); + } + }); + }, + ); + + // Lifecycle subscribers should have received output deltas + expect(receivedChunks.length).toBeGreaterThan(0); + expect(receivedChunks.join('')).toContain('Hello'); + expect(receivedChunks.join('')).toContain('World'); + }); + + it('should support backgrounding via lifecycle service (Ctrl+B)', async () => { + mockClientManager.getClient.mockReturnValue({}); + + // Create a controllable stream that blocks between chunks + let resolveStream!: () => void; + const streamBlocked = new Promise((r) => { + resolveStream = r; + }); + + mockClientManager.sendMessageStream.mockImplementation( + async function* () { + yield { + kind: 'message', + messageId: 'msg-1', + role: 'agent', + parts: [{ kind: 'text', text: 'Working...' }], + }; + await streamBlocked; + yield { + kind: 'message', + messageId: 'msg-2', + role: 'agent', + parts: [{ kind: 'text', text: 'Done' }], + }; + }, + ); + + let capturedId: number | undefined; + const invocation = new RemoteAgentInvocation( + mockDefinition, + { query: 'hi' }, + mockMessageBus, + ); + + // Start execution (don't await — we need to background mid-stream) + const resultPromise = invocation.execute( + new AbortController().signal, + undefined, + undefined, + (id) => { + capturedId = id; + }, + ); + + // setExecutionIdCallback is called synchronously before first await + expect(capturedId).toBeDefined(); + + // Flush microtasks so processStream processes the first chunk + await new Promise((resolve) => setTimeout(resolve, 0)); + + // Background the execution (simulates Ctrl+B) + ExecutionLifecycleService.background(capturedId!); + + const result = await resultPromise; + + // Should return backgrounded result with execution data + expect(result.data).toBeDefined(); + expect((result.data as Record)['pid']).toBe(capturedId); + expect(result.returnDisplay).toContain('background'); + expect((result.llmContent as Array<{ text: string }>)[0].text).toContain( + 'background', + ); + + // Let the stream finish cleanly in the background + resolveStream(); + await new Promise((resolve) => setTimeout(resolve, 0)); + }); + + it('should abort stream when killed via lifecycle service', async () => { + mockClientManager.getClient.mockReturnValue({}); + + let resolveStream!: () => void; + const streamBlocked = new Promise((r) => { + resolveStream = r; + }); + + mockClientManager.sendMessageStream.mockImplementation( + async function* () { + yield { + kind: 'message', + messageId: 'msg-1', + role: 'agent', + parts: [{ kind: 'text', text: 'Working...' }], + }; + await streamBlocked; + yield { + kind: 'message', + messageId: 'msg-2', + role: 'agent', + parts: [{ kind: 'text', text: 'Should not reach' }], + }; + }, + ); + + let capturedId: number | undefined; + const invocation = new RemoteAgentInvocation( + mockDefinition, + { query: 'hi' }, + mockMessageBus, + ); + const resultPromise = invocation.execute( + new AbortController().signal, + undefined, + undefined, + (id) => { + capturedId = id; + }, + ); + + // Flush microtasks so processStream processes first chunk + await new Promise((resolve) => setTimeout(resolve, 0)); + + // Kill via lifecycle service + ExecutionLifecycleService.kill(capturedId!); + + // Unblock stream so processStream can finish cleanup + resolveStream(); + + const result = await resultPromise; + + // Kill produces an error result + expect(result.error).toBeDefined(); + expect(result.error?.message).toContain('cancelled'); + + // Give processStream time to finish cleanup + await new Promise((resolve) => setTimeout(resolve, 0)); + }); + + it('should report execution as active while stream is running', async () => { + mockClientManager.getClient.mockReturnValue({}); + + let resolveStream!: () => void; + const streamBlocked = new Promise((r) => { + resolveStream = r; + }); + + mockClientManager.sendMessageStream.mockImplementation( + async function* () { + yield { + kind: 'message', + messageId: 'msg-1', + role: 'agent', + parts: [{ kind: 'text', text: 'Running' }], + }; + await streamBlocked; + }, + ); + + let capturedId: number | undefined; + const invocation = new RemoteAgentInvocation( + mockDefinition, + { query: 'hi' }, + mockMessageBus, + ); + const resultPromise = invocation.execute( + new AbortController().signal, + undefined, + undefined, + (id) => { + capturedId = id; + }, + ); + + await new Promise((resolve) => setTimeout(resolve, 0)); + + // While stream is running, execution should be active + expect(ExecutionLifecycleService.isActive(capturedId!)).toBe(true); + + // Complete the stream + resolveStream(); + // Wait for processStream to complete — it calls completeExecution + // which emits 'exit' and cleans up. Need to let the for-await-of + // loop process the generator's {done: true} and the catch/finally + // blocks to run. Multiple microtask ticks may be needed. + await new Promise((resolve) => setTimeout(resolve, 0)); + await resultPromise; + + // After completion, execution should no longer be active + expect(ExecutionLifecycleService.isActive(capturedId!)).toBe(false); + }); + }); + describe('Confirmations', () => { it('should return info confirmation details', async () => { const invocation = new RemoteAgentInvocation( diff --git a/packages/core/src/agents/remote-invocation.ts b/packages/core/src/agents/remote-invocation.ts index 489f0f91cc..f38ff54809 100644 --- a/packages/core/src/agents/remote-invocation.ts +++ b/packages/core/src/agents/remote-invocation.ts @@ -9,6 +9,7 @@ import { type ToolConfirmationOutcome, type ToolResult, type ToolCallConfirmationDetails, + type BackgroundExecutionData, } from '../tools/tools.js'; import { DEFAULT_QUERY_STRING, @@ -28,6 +29,8 @@ import { safeJsonToMarkdown } from '../utils/markdownUtils.js'; import type { AnsiOutput } from '../utils/terminalSerializer.js'; import { A2AAuthProviderFactory } from './auth-provider/factory.js'; import { A2AAgentError } from './a2a-errors.js'; +import { ExecutionLifecycleService } from '../services/executionLifecycleService.js'; +import type { ShellExecutionConfig } from '../services/shellExecutionService.js'; /** * A tool invocation that proxies to a remote A2A agent. @@ -116,13 +119,115 @@ export class RemoteAgentInvocation extends BaseToolInvocation< } async execute( - _signal: AbortSignal, + signal: AbortSignal, updateOutput?: (output: string | AnsiOutput) => void, + _shellExecutionConfig?: ShellExecutionConfig, + setExecutionIdCallback?: (executionId: number) => void, ): Promise { - // 1. Ensure the agent is loaded (cached by manager) - // We assume the user has provided an access token via some mechanism (TODO), - // or we rely on ADC. + // Create an AbortController for lifecycle kill support. + // Parent abort and lifecycle kill both funnel through this controller. + const executionAbortController = new AbortController(); + if (signal.aborted) { + executionAbortController.abort(); + } else { + signal.addEventListener('abort', () => executionAbortController.abort(), { + once: true, + }); + } + + // Register with lifecycle service as a virtual execution so this + // invocation can be backgrounded, subscribed to, and killed. + const handle = ExecutionLifecycleService.createExecution( + '', + () => executionAbortController.abort(), + 'remote_agent', + ); + // createExecution always produces a valid numeric ID + const executionId = handle.pid!; + + if (setExecutionIdCallback) { + setExecutionIdCallback(executionId); + } + + // Guard: stop calling updateOutput after backgrounding since the + // tool call has already returned from the scheduler's perspective. + let backgrounded = false; + + // Fire-and-forget: stream processing runs concurrently and settles the + // lifecycle execution on completion or error. + const streamingPromise = this.processStream( + executionId, + executionAbortController.signal, + (output) => { + if (!backgrounded && updateOutput) { + updateOutput(output); + } + }, + ); + // Errors are handled internally via completeExecution; prevent + // unhandled-rejection noise. + streamingPromise.catch(() => {}); + + // Resolves when either: (a) processStream completes/errors, or + // (b) the execution is backgrounded externally. + const result = await handle.result; + + if (result.backgrounded) { + backgrounded = true; + const agentLabel = this.definition.displayName ?? this.definition.name; + const data: BackgroundExecutionData = { + pid: executionId, + command: `Remote agent: ${agentLabel}`, + initialOutput: result.output, + }; + return { + llmContent: [ + { + text: `Remote agent '${agentLabel}' moved to background (ID: ${executionId}). Use subscribe to view output.`, + }, + ], + returnDisplay: `Remote agent moved to background (ID: ${executionId}).`, + data, + }; + } + + // Error path — the lifecycle result carries the original Error instance. + if (result.error) { + const errorMessage = this.formatExecutionError(result.error); + const fullDisplay = result.output + ? `${result.output}\n\n${errorMessage}` + : errorMessage; + return { + llmContent: [{ text: fullDisplay }], + returnDisplay: fullDisplay, + error: { message: errorMessage }, + }; + } + + // Normal completion. + const finalOutput = result.output; + debugLogger.debug( + `[RemoteAgent] Final output from ${this.definition.name}: ${finalOutput.substring(0, 200)}`, + ); + return { + llmContent: [{ text: finalOutput }], + returnDisplay: safeJsonToMarkdown(finalOutput), + }; + } + + /** + * Runs the A2A stream, feeding output deltas into the lifecycle service. + * On completion (or error) it settles the lifecycle execution so + * {@link execute}'s `handle.result` resolves. + */ + private async processStream( + executionId: number, + signal: AbortSignal, + updateOutput?: (output: string | AnsiOutput) => void, + ): Promise { const reassembler = new A2AResultReassembler(); + let previousOutputLength = 0; + try { const priorState = RemoteAgentInvocation.sessionState.get( this.definition.name, @@ -150,21 +255,30 @@ export class RemoteAgentInvocation extends BaseToolInvocation< { contextId: this.contextId, taskId: this.taskId, - signal: _signal, + signal, }, ); let finalResponse: SendMessageResult | undefined; for await (const chunk of stream) { - if (_signal.aborted) { + if (signal.aborted) { throw new Error('Operation aborted'); } finalResponse = chunk; reassembler.update(chunk); + // Compute delta so lifecycle subscribers see incremental chunks. + const currentOutput = reassembler.toString(); + const delta = currentOutput.substring(previousOutputLength); + previousOutputLength = currentOutput.length; + + if (delta) { + ExecutionLifecycleService.appendOutput(executionId, delta); + } + if (updateOutput) { - updateOutput(reassembler.toString()); + updateOutput(currentOutput); } const { @@ -184,33 +298,22 @@ export class RemoteAgentInvocation extends BaseToolInvocation< throw new Error('No response from remote agent.'); } - const finalOutput = reassembler.toString(); - debugLogger.debug( `[RemoteAgent] Final response from ${this.definition.name}:\n${JSON.stringify(finalResponse, null, 2)}`, ); - return { - llmContent: [{ text: finalOutput }], - returnDisplay: safeJsonToMarkdown(finalOutput), - }; + ExecutionLifecycleService.completeExecution(executionId); } catch (error: unknown) { - const partialOutput = reassembler.toString(); - // Surface structured, user-friendly error messages. - const errorMessage = this.formatExecutionError(error); - const fullDisplay = partialOutput - ? `${partialOutput}\n\n${errorMessage}` - : errorMessage; - return { - llmContent: [{ text: fullDisplay }], - returnDisplay: fullDisplay, - error: { message: errorMessage }, - }; + ExecutionLifecycleService.completeExecution(executionId, { + error: error instanceof Error ? error : new Error(String(error)), + }); } finally { - // Persist state even on partial failures or aborts to maintain conversational continuity. + // Persist conversational state. On abort/kill the task was interrupted + // so clear taskId (next invocation starts a fresh task), but keep + // contextId to maintain the conversation with the remote agent. RemoteAgentInvocation.sessionState.set(this.definition.name, { contextId: this.contextId, - taskId: this.taskId, + taskId: signal.aborted ? undefined : this.taskId, }); } }