Support backgrounding remote agent executions via Ctrl+B

This commit is contained in:
Adam Weidman
2026-03-08 17:37:38 -04:00
parent c77fd3fc7a
commit b3850edb8b
2 changed files with 248 additions and 74 deletions

View File

@@ -22,6 +22,7 @@ import type { RemoteAgentDefinition } from './types.js';
import { createMockMessageBus } from '../test-utils/mock-message-bus.js';
import { A2AAuthProviderFactory } from './auth-provider/factory.js';
import type { A2AAuthProvider } from './auth-provider/types.js';
import { ShellExecutionService } from '../services/shellExecutionService.js';
// Mock A2AClientManager
vi.mock('./a2a-client-manager.js', () => ({
@@ -583,6 +584,88 @@ describe('RemoteAgentInvocation', () => {
'Generating...\n\nArtifact (Result):\nPart 1 Part 2',
);
});
it('should support Ctrl+B backgrounding through ShellExecutionService', async () => {
mockClientManager.getClient.mockReturnValue({});
let releaseSecondChunk: (() => void) | undefined;
const secondChunkGate = new Promise<void>((resolve) => {
releaseSecondChunk = resolve;
});
mockClientManager.sendMessageStream.mockImplementation(
async function* () {
yield {
kind: 'message',
messageId: 'msg-1',
role: 'agent',
parts: [{ kind: 'text', text: 'Chunk 1' }],
};
await secondChunkGate;
yield {
kind: 'message',
messageId: 'msg-2',
role: 'agent',
parts: [{ kind: 'text', text: 'Chunk 2' }],
};
},
);
let pid: number | undefined;
const onExit = vi.fn();
let unsubscribeOnExit: (() => void) | undefined;
const streamedOutputChunks: string[] = [];
let unsubscribeStream: (() => void) | undefined;
const updateOutput = vi.fn((output: unknown) => {
if (output === 'Chunk 1' && pid) {
ShellExecutionService.background(pid);
unsubscribeStream = ShellExecutionService.subscribe(pid, (event) => {
if (event.type === 'data' && typeof event.chunk === 'string') {
streamedOutputChunks.push(event.chunk);
}
});
}
});
const invocation = new RemoteAgentInvocation(
mockDefinition,
{ query: 'long task' },
mockMessageBus,
);
const resultPromise = invocation.execute(
new AbortController().signal,
updateOutput,
undefined,
(newPid) => {
pid = newPid;
unsubscribeOnExit = ShellExecutionService.onExit(newPid, onExit);
},
);
const result = await resultPromise;
expect(pid).toBeDefined();
expect(result.returnDisplay).toContain(
'Remote agent moved to background',
);
expect(result.data).toMatchObject({
pid,
initialOutput: 'Chunk 1',
});
releaseSecondChunk?.();
await vi.waitFor(() => {
expect(onExit).toHaveBeenCalledWith(0, undefined);
});
await vi.waitFor(() => {
expect(streamedOutputChunks.join('')).toContain('Chunk 2');
});
unsubscribeStream?.();
unsubscribeOnExit?.();
});
});
describe('Confirmations', () => {

View File

@@ -27,6 +27,7 @@ import type { AuthenticationHandler } from '@a2a-js/sdk/client';
import { debugLogger } from '../utils/debugLogger.js';
import type { AnsiOutput } from '../utils/terminalSerializer.js';
import { A2AAuthProviderFactory } from './auth-provider/factory.js';
import { ExecutionLifecycleService } from '../services/executionLifecycleService.js';
/**
* Authentication handler implementation using Google Application Default Credentials (ADC).
@@ -145,102 +146,192 @@ export class RemoteAgentInvocation extends BaseToolInvocation<
};
}
private publishBackgroundDelta(
pid: number,
previousOutput: string,
nextOutput: string,
): string {
if (nextOutput.length === 0 || nextOutput === previousOutput) {
return previousOutput;
}
if (nextOutput.startsWith(previousOutput)) {
ExecutionLifecycleService.appendOutput(
pid,
nextOutput.slice(previousOutput.length),
);
return nextOutput;
}
// If the reassembled output changes non-monotonically, resync by appending
// the full latest snapshot with a clear separator.
ExecutionLifecycleService.appendOutput(
pid,
`\n\n[Output updated]\n${nextOutput}`,
);
return nextOutput;
}
async execute(
_signal: AbortSignal,
updateOutput?: (output: string | AnsiOutput) => void,
_shellExecutionConfig?: unknown,
setPidCallback?: (pid: number) => void,
): Promise<ToolResult> {
// 1. Ensure the agent is loaded (cached by manager)
// We assume the user has provided an access token via some mechanism (TODO),
// or we rely on ADC.
const reassembler = new A2AResultReassembler();
try {
const priorState = RemoteAgentInvocation.sessionState.get(
this.definition.name,
);
if (priorState) {
this.contextId = priorState.contextId;
this.taskId = priorState.taskId;
}
const executionController = new AbortController();
const onAbort = () => executionController.abort();
_signal.addEventListener('abort', onAbort, { once: true });
const authHandler = await this.getAuthHandler();
const { pid, result } = ExecutionLifecycleService.createExecution(
'',
() => executionController.abort(),
);
if (pid === undefined) {
_signal.removeEventListener('abort', onAbort);
return {
llmContent: [
{ text: 'Error calling remote agent: missing execution pid.' },
],
returnDisplay: 'Error calling remote agent: missing execution pid.',
error: {
message: 'Error calling remote agent: missing execution pid.',
},
};
}
const backgroundPid = pid;
setPidCallback?.(backgroundPid);
if (!this.clientManager.getClient(this.definition.name)) {
await this.clientManager.loadAgent(
const run = async () => {
let lastOutput = '';
try {
const priorState = RemoteAgentInvocation.sessionState.get(
this.definition.name,
this.definition.agentCardUrl,
authHandler,
);
}
if (priorState) {
this.contextId = priorState.contextId;
this.taskId = priorState.taskId;
}
const message = this.params.query;
const authHandler = await this.getAuthHandler();
const stream = this.clientManager.sendMessageStream(
this.definition.name,
message,
{
if (!this.clientManager.getClient(this.definition.name)) {
await this.clientManager.loadAgent(
this.definition.name,
this.definition.agentCardUrl,
authHandler,
);
}
const stream = this.clientManager.sendMessageStream(
this.definition.name,
this.params.query,
{
contextId: this.contextId,
taskId: this.taskId,
signal: executionController.signal,
},
);
let finalResponse: SendMessageResult | undefined;
for await (const chunk of stream) {
if (executionController.signal.aborted) {
throw new Error('Operation aborted');
}
finalResponse = chunk;
reassembler.update(chunk);
const currentOutput = reassembler.toString();
lastOutput = this.publishBackgroundDelta(
backgroundPid,
lastOutput,
currentOutput,
);
if (updateOutput) {
updateOutput(currentOutput);
}
const {
contextId: newContextId,
taskId: newTaskId,
clearTaskId,
} = extractIdsFromResponse(chunk);
if (newContextId) {
this.contextId = newContextId;
}
this.taskId = clearTaskId ? undefined : (newTaskId ?? this.taskId);
}
if (!finalResponse) {
throw new Error('No response from remote agent.');
}
debugLogger.debug(
`[RemoteAgent] Final response from ${this.definition.name}:\n${JSON.stringify(finalResponse, null, 2)}`,
);
ExecutionLifecycleService.completeExecution(backgroundPid, {
exitCode: 0,
});
} catch (error: unknown) {
const partialOutput = reassembler.toString();
lastOutput = this.publishBackgroundDelta(
backgroundPid,
lastOutput,
partialOutput,
);
const errorMessage = `Error calling remote agent: ${
error instanceof Error ? error.message : String(error)
}`;
ExecutionLifecycleService.completeExecution(backgroundPid, {
error: new Error(errorMessage),
aborted: executionController.signal.aborted,
exitCode: executionController.signal.aborted ? 130 : 1,
});
} finally {
_signal.removeEventListener('abort', onAbort);
// Persist state even on partial failures or aborts to maintain conversational continuity.
RemoteAgentInvocation.sessionState.set(this.definition.name, {
contextId: this.contextId,
taskId: this.taskId,
signal: _signal,
},
);
let finalResponse: SendMessageResult | undefined;
for await (const chunk of stream) {
if (_signal.aborted) {
throw new Error('Operation aborted');
}
finalResponse = chunk;
reassembler.update(chunk);
if (updateOutput) {
updateOutput(reassembler.toString());
}
const {
contextId: newContextId,
taskId: newTaskId,
clearTaskId,
} = extractIdsFromResponse(chunk);
if (newContextId) {
this.contextId = newContextId;
}
this.taskId = clearTaskId ? undefined : (newTaskId ?? this.taskId);
});
}
};
if (!finalResponse) {
throw new Error('No response from remote agent.');
}
const finalOutput = reassembler.toString();
debugLogger.debug(
`[RemoteAgent] Final response from ${this.definition.name}:\n${JSON.stringify(finalResponse, null, 2)}`,
);
void run();
const executionResult = await result;
if (executionResult.backgrounded) {
const command = `${this.getDescription()}: ${this.params.query}`;
const backgroundMessage = `Remote agent moved to background (PID: ${backgroundPid}). Output hidden. Press Ctrl+B to view.`;
return {
llmContent: [{ text: finalOutput }],
returnDisplay: finalOutput,
llmContent: [{ text: backgroundMessage }],
returnDisplay: backgroundMessage,
data: {
pid: backgroundPid,
command,
initialOutput: executionResult.output,
},
};
} catch (error: unknown) {
const partialOutput = reassembler.toString();
const errorMessage = `Error calling remote agent: ${error instanceof Error ? error.message : String(error)}`;
const fullDisplay = partialOutput
? `${partialOutput}\n\n${errorMessage}`
: errorMessage;
}
if (executionResult.error) {
const fullDisplay = executionResult.output
? `${executionResult.output}\n\n${executionResult.error.message}`
: executionResult.error.message;
return {
llmContent: [{ text: fullDisplay }],
returnDisplay: fullDisplay,
error: { message: errorMessage },
error: { message: executionResult.error.message },
};
} finally {
// Persist state even on partial failures or aborts to maintain conversational continuity.
RemoteAgentInvocation.sessionState.set(this.definition.name, {
contextId: this.contextId,
taskId: this.taskId,
});
}
return {
llmContent: [{ text: executionResult.output }],
returnDisplay: executionResult.output,
};
}
}