feat(core): integrate remote agents with ExecutionLifecycleService for backgrounding

Wire RemoteAgentInvocation to use ExecutionLifecycleService.createExecution()
so remote agents can be backgrounded (Ctrl+B), subscribed to, and killed
through the same lifecycle system as shell commands.

- Create virtual execution on execute(), report ID via setExecutionIdCallback
- Stream output deltas via appendOutput() so lifecycle subscribers see live data
- Separate streaming into detached processStream() that settles the lifecycle
- Support backgrounding: handle.result resolves with backgrounded=true,
  stream continues running, returns BackgroundExecutionData
- Support kill: lifecycle onKill aborts the stream via AbortController
This commit is contained in:
Adam Weidman
2026-03-12 00:29:19 -04:00
parent 76c96cc4be
commit 6f7658cb17
2 changed files with 389 additions and 26 deletions
@@ -22,6 +22,7 @@ import type { RemoteAgentDefinition } from './types.js';
import { createMockMessageBus } from '../test-utils/mock-message-bus.js';
import { A2AAuthProviderFactory } from './auth-provider/factory.js';
import type { A2AAuthProvider } from './auth-provider/types.js';
import { ExecutionLifecycleService } from '../services/executionLifecycleService.js';
// Mock A2AClientManager
vi.mock('./a2a-client-manager.js', () => ({
@@ -58,6 +59,7 @@ describe('RemoteAgentInvocation', () => {
beforeEach(() => {
vi.clearAllMocks();
ExecutionLifecycleService.resetForTest();
(A2AClientManager.getInstance as Mock).mockReturnValue(mockClientManager);
(
RemoteAgentInvocation as unknown as {
@@ -587,6 +589,264 @@ describe('RemoteAgentInvocation', () => {
});
});
describe('Lifecycle Integration', () => {
it('should call setExecutionIdCallback with lifecycle execution ID', async () => {
mockClientManager.getClient.mockReturnValue({});
mockClientManager.sendMessageStream.mockImplementation(
async function* () {
yield {
kind: 'message',
messageId: 'msg-1',
role: 'agent',
parts: [{ kind: 'text', text: 'Response' }],
};
},
);
const callback = vi.fn();
const invocation = new RemoteAgentInvocation(
mockDefinition,
{ query: 'hi' },
mockMessageBus,
);
await invocation.execute(
new AbortController().signal,
undefined,
undefined,
callback,
);
expect(callback).toHaveBeenCalledExactlyOnceWith(expect.any(Number));
});
it('should feed output deltas to lifecycle service subscribers', async () => {
mockClientManager.getClient.mockReturnValue({});
mockClientManager.sendMessageStream.mockImplementation(
async function* () {
yield {
kind: 'message',
messageId: 'msg-1',
role: 'agent',
parts: [{ kind: 'text', text: 'Hello' }],
};
yield {
kind: 'message',
messageId: 'msg-1',
role: 'agent',
parts: [{ kind: 'text', text: 'Hello World' }],
};
},
);
const receivedChunks: string[] = [];
const invocation = new RemoteAgentInvocation(
mockDefinition,
{ query: 'hi' },
mockMessageBus,
);
await invocation.execute(
new AbortController().signal,
undefined,
undefined,
(id) => {
// Subscribe immediately when we get the execution ID
ExecutionLifecycleService.subscribe(id, (event) => {
if (event.type === 'data' && typeof event.chunk === 'string') {
receivedChunks.push(event.chunk);
}
});
},
);
// Lifecycle subscribers should have received output deltas
expect(receivedChunks.length).toBeGreaterThan(0);
expect(receivedChunks.join('')).toContain('Hello');
expect(receivedChunks.join('')).toContain('World');
});
it('should support backgrounding via lifecycle service (Ctrl+B)', async () => {
mockClientManager.getClient.mockReturnValue({});
// Create a controllable stream that blocks between chunks
let resolveStream!: () => void;
const streamBlocked = new Promise<void>((r) => {
resolveStream = r;
});
mockClientManager.sendMessageStream.mockImplementation(
async function* () {
yield {
kind: 'message',
messageId: 'msg-1',
role: 'agent',
parts: [{ kind: 'text', text: 'Working...' }],
};
await streamBlocked;
yield {
kind: 'message',
messageId: 'msg-2',
role: 'agent',
parts: [{ kind: 'text', text: 'Done' }],
};
},
);
let capturedId: number | undefined;
const invocation = new RemoteAgentInvocation(
mockDefinition,
{ query: 'hi' },
mockMessageBus,
);
// Start execution (don't await — we need to background mid-stream)
const resultPromise = invocation.execute(
new AbortController().signal,
undefined,
undefined,
(id) => {
capturedId = id;
},
);
// setExecutionIdCallback is called synchronously before first await
expect(capturedId).toBeDefined();
// Flush microtasks so processStream processes the first chunk
await new Promise((resolve) => setTimeout(resolve, 0));
// Background the execution (simulates Ctrl+B)
ExecutionLifecycleService.background(capturedId!);
const result = await resultPromise;
// Should return backgrounded result with execution data
expect(result.data).toBeDefined();
expect((result.data as Record<string, unknown>)['pid']).toBe(capturedId);
expect(result.returnDisplay).toContain('background');
expect((result.llmContent as Array<{ text: string }>)[0].text).toContain(
'background',
);
// Let the stream finish cleanly in the background
resolveStream();
await new Promise((resolve) => setTimeout(resolve, 0));
});
it('should abort stream when killed via lifecycle service', async () => {
mockClientManager.getClient.mockReturnValue({});
let resolveStream!: () => void;
const streamBlocked = new Promise<void>((r) => {
resolveStream = r;
});
mockClientManager.sendMessageStream.mockImplementation(
async function* () {
yield {
kind: 'message',
messageId: 'msg-1',
role: 'agent',
parts: [{ kind: 'text', text: 'Working...' }],
};
await streamBlocked;
yield {
kind: 'message',
messageId: 'msg-2',
role: 'agent',
parts: [{ kind: 'text', text: 'Should not reach' }],
};
},
);
let capturedId: number | undefined;
const invocation = new RemoteAgentInvocation(
mockDefinition,
{ query: 'hi' },
mockMessageBus,
);
const resultPromise = invocation.execute(
new AbortController().signal,
undefined,
undefined,
(id) => {
capturedId = id;
},
);
// Flush microtasks so processStream processes first chunk
await new Promise((resolve) => setTimeout(resolve, 0));
// Kill via lifecycle service
ExecutionLifecycleService.kill(capturedId!);
// Unblock stream so processStream can finish cleanup
resolveStream();
const result = await resultPromise;
// Kill produces an error result
expect(result.error).toBeDefined();
expect(result.error?.message).toContain('cancelled');
// Give processStream time to finish cleanup
await new Promise((resolve) => setTimeout(resolve, 0));
});
it('should report execution as active while stream is running', async () => {
mockClientManager.getClient.mockReturnValue({});
let resolveStream!: () => void;
const streamBlocked = new Promise<void>((r) => {
resolveStream = r;
});
mockClientManager.sendMessageStream.mockImplementation(
async function* () {
yield {
kind: 'message',
messageId: 'msg-1',
role: 'agent',
parts: [{ kind: 'text', text: 'Running' }],
};
await streamBlocked;
},
);
let capturedId: number | undefined;
const invocation = new RemoteAgentInvocation(
mockDefinition,
{ query: 'hi' },
mockMessageBus,
);
const resultPromise = invocation.execute(
new AbortController().signal,
undefined,
undefined,
(id) => {
capturedId = id;
},
);
await new Promise((resolve) => setTimeout(resolve, 0));
// While stream is running, execution should be active
expect(ExecutionLifecycleService.isActive(capturedId!)).toBe(true);
// Complete the stream
resolveStream();
// Wait for processStream to complete — it calls completeExecution
// which emits 'exit' and cleans up. Need to let the for-await-of
// loop process the generator's {done: true} and the catch/finally
// blocks to run. Multiple microtask ticks may be needed.
await new Promise((resolve) => setTimeout(resolve, 0));
await resultPromise;
// After completion, execution should no longer be active
expect(ExecutionLifecycleService.isActive(capturedId!)).toBe(false);
});
});
describe('Confirmations', () => {
it('should return info confirmation details', async () => {
const invocation = new RemoteAgentInvocation(
+129 -26
View File
@@ -9,6 +9,7 @@ import {
type ToolConfirmationOutcome,
type ToolResult,
type ToolCallConfirmationDetails,
type BackgroundExecutionData,
} from '../tools/tools.js';
import {
DEFAULT_QUERY_STRING,
@@ -28,6 +29,8 @@ import { safeJsonToMarkdown } from '../utils/markdownUtils.js';
import type { AnsiOutput } from '../utils/terminalSerializer.js';
import { A2AAuthProviderFactory } from './auth-provider/factory.js';
import { A2AAgentError } from './a2a-errors.js';
import { ExecutionLifecycleService } from '../services/executionLifecycleService.js';
import type { ShellExecutionConfig } from '../services/shellExecutionService.js';
/**
* A tool invocation that proxies to a remote A2A agent.
@@ -116,13 +119,115 @@ export class RemoteAgentInvocation extends BaseToolInvocation<
}
async execute(
_signal: AbortSignal,
signal: AbortSignal,
updateOutput?: (output: string | AnsiOutput) => void,
_shellExecutionConfig?: ShellExecutionConfig,
setExecutionIdCallback?: (executionId: number) => void,
): Promise<ToolResult> {
// 1. Ensure the agent is loaded (cached by manager)
// We assume the user has provided an access token via some mechanism (TODO),
// or we rely on ADC.
// Create an AbortController for lifecycle kill support.
// Parent abort and lifecycle kill both funnel through this controller.
const executionAbortController = new AbortController();
if (signal.aborted) {
executionAbortController.abort();
} else {
signal.addEventListener('abort', () => executionAbortController.abort(), {
once: true,
});
}
// Register with lifecycle service as a virtual execution so this
// invocation can be backgrounded, subscribed to, and killed.
const handle = ExecutionLifecycleService.createExecution(
'',
() => executionAbortController.abort(),
'remote_agent',
);
// createExecution always produces a valid numeric ID
const executionId = handle.pid!;
if (setExecutionIdCallback) {
setExecutionIdCallback(executionId);
}
// Guard: stop calling updateOutput after backgrounding since the
// tool call has already returned from the scheduler's perspective.
let backgrounded = false;
// Fire-and-forget: stream processing runs concurrently and settles the
// lifecycle execution on completion or error.
const streamingPromise = this.processStream(
executionId,
executionAbortController.signal,
(output) => {
if (!backgrounded && updateOutput) {
updateOutput(output);
}
},
);
// Errors are handled internally via completeExecution; prevent
// unhandled-rejection noise.
streamingPromise.catch(() => {});
// Resolves when either: (a) processStream completes/errors, or
// (b) the execution is backgrounded externally.
const result = await handle.result;
if (result.backgrounded) {
backgrounded = true;
const agentLabel = this.definition.displayName ?? this.definition.name;
const data: BackgroundExecutionData = {
pid: executionId,
command: `Remote agent: ${agentLabel}`,
initialOutput: result.output,
};
return {
llmContent: [
{
text: `Remote agent '${agentLabel}' moved to background (ID: ${executionId}). Use subscribe to view output.`,
},
],
returnDisplay: `Remote agent moved to background (ID: ${executionId}).`,
data,
};
}
// Error path — the lifecycle result carries the original Error instance.
if (result.error) {
const errorMessage = this.formatExecutionError(result.error);
const fullDisplay = result.output
? `${result.output}\n\n${errorMessage}`
: errorMessage;
return {
llmContent: [{ text: fullDisplay }],
returnDisplay: fullDisplay,
error: { message: errorMessage },
};
}
// Normal completion.
const finalOutput = result.output;
debugLogger.debug(
`[RemoteAgent] Final output from ${this.definition.name}: ${finalOutput.substring(0, 200)}`,
);
return {
llmContent: [{ text: finalOutput }],
returnDisplay: safeJsonToMarkdown(finalOutput),
};
}
/**
* Runs the A2A stream, feeding output deltas into the lifecycle service.
* On completion (or error) it settles the lifecycle execution so
* {@link execute}'s `handle.result` resolves.
*/
private async processStream(
executionId: number,
signal: AbortSignal,
updateOutput?: (output: string | AnsiOutput) => void,
): Promise<void> {
const reassembler = new A2AResultReassembler();
let previousOutputLength = 0;
try {
const priorState = RemoteAgentInvocation.sessionState.get(
this.definition.name,
@@ -150,21 +255,30 @@ export class RemoteAgentInvocation extends BaseToolInvocation<
{
contextId: this.contextId,
taskId: this.taskId,
signal: _signal,
signal,
},
);
let finalResponse: SendMessageResult | undefined;
for await (const chunk of stream) {
if (_signal.aborted) {
if (signal.aborted) {
throw new Error('Operation aborted');
}
finalResponse = chunk;
reassembler.update(chunk);
// Compute delta so lifecycle subscribers see incremental chunks.
const currentOutput = reassembler.toString();
const delta = currentOutput.substring(previousOutputLength);
previousOutputLength = currentOutput.length;
if (delta) {
ExecutionLifecycleService.appendOutput(executionId, delta);
}
if (updateOutput) {
updateOutput(reassembler.toString());
updateOutput(currentOutput);
}
const {
@@ -184,33 +298,22 @@ export class RemoteAgentInvocation extends BaseToolInvocation<
throw new Error('No response from remote agent.');
}
const finalOutput = reassembler.toString();
debugLogger.debug(
`[RemoteAgent] Final response from ${this.definition.name}:\n${JSON.stringify(finalResponse, null, 2)}`,
);
return {
llmContent: [{ text: finalOutput }],
returnDisplay: safeJsonToMarkdown(finalOutput),
};
ExecutionLifecycleService.completeExecution(executionId);
} catch (error: unknown) {
const partialOutput = reassembler.toString();
// Surface structured, user-friendly error messages.
const errorMessage = this.formatExecutionError(error);
const fullDisplay = partialOutput
? `${partialOutput}\n\n${errorMessage}`
: errorMessage;
return {
llmContent: [{ text: fullDisplay }],
returnDisplay: fullDisplay,
error: { message: errorMessage },
};
ExecutionLifecycleService.completeExecution(executionId, {
error: error instanceof Error ? error : new Error(String(error)),
});
} finally {
// Persist state even on partial failures or aborts to maintain conversational continuity.
// Persist conversational state. On abort/kill the task was interrupted
// so clear taskId (next invocation starts a fresh task), but keep
// contextId to maintain the conversation with the remote agent.
RemoteAgentInvocation.sessionState.set(this.definition.name, {
contextId: this.contextId,
taskId: this.taskId,
taskId: signal.aborted ? undefined : this.taskId,
});
}
}