refactor: improve background execution readability

This commit is contained in:
Adam Weidman
2026-03-08 21:21:56 -04:00
parent db32a6185b
commit 8a90ecaf30
2 changed files with 115 additions and 85 deletions

View File

@@ -585,7 +585,7 @@ describe('RemoteAgentInvocation', () => {
);
});
it('should support Ctrl+B backgrounding through ShellExecutionService', async () => {
it('should support Ctrl+B backgrounding through execution lifecycle IDs', async () => {
mockClientManager.getClient.mockReturnValue({});
let releaseSecondChunk: (() => void) | undefined;
@@ -618,7 +618,7 @@ describe('RemoteAgentInvocation', () => {
let unsubscribeStream: (() => void) | undefined;
const updateOutput = vi.fn((output: unknown) => {
if (output === 'Chunk 1' && executionId) {
if (output === 'Chunk 1' && executionId !== undefined) {
ShellExecutionService.background(executionId);
unsubscribeStream = ShellExecutionService.subscribe(
executionId,

View File

@@ -28,7 +28,15 @@ import type { AuthenticationHandler } from '@a2a-js/sdk/client';
import { debugLogger } from '../utils/debugLogger.js';
import type { AnsiOutput } from '../utils/terminalSerializer.js';
import { A2AAuthProviderFactory } from './auth-provider/factory.js';
import { ExecutionLifecycleService } from '../services/executionLifecycleService.js';
import {
ExecutionLifecycleService,
type ExecutionResult,
} from '../services/executionLifecycleService.js';
const OUTPUT_RESYNC_MARKER = '\n\n[Output updated]\n';
const REMOTE_AGENT_ERROR_PREFIX = 'Error calling remote agent: ';
const MISSING_EXECUTION_ID_MESSAGE =
'Error calling remote agent: missing execution ID.';
/**
* Authentication handler implementation using Google Application Default Credentials (ADC).
@@ -147,6 +155,87 @@ export class RemoteAgentInvocation extends BaseToolInvocation<
};
}
private restoreSessionState(): void {
const priorState = RemoteAgentInvocation.sessionState.get(
this.definition.name,
);
if (!priorState) {
return;
}
this.contextId = priorState.contextId;
this.taskId = priorState.taskId;
}
private persistSessionState(): void {
// Persist state even on partial failures or aborts to maintain conversational continuity.
RemoteAgentInvocation.sessionState.set(this.definition.name, {
contextId: this.contextId,
taskId: this.taskId,
});
}
private updateSessionStateFromResponseChunk(chunk: SendMessageResult): void {
const { contextId: newContextId, taskId: newTaskId, clearTaskId } =
extractIdsFromResponse(chunk);
if (newContextId) {
this.contextId = newContextId;
}
this.taskId = clearTaskId ? undefined : (newTaskId ?? this.taskId);
}
private buildBackgroundResult(
executionId: number,
output: string,
): ToolResult {
const command = `${this.getDescription()}: ${this.params.query}`;
const backgroundMessage = `Remote agent moved to background (Execution ID: ${executionId}). Output hidden. Press Ctrl+B to view.`;
const data: BackgroundExecutionData = {
executionId,
pid: executionId,
command,
initialOutput: output,
};
return {
llmContent: [{ text: backgroundMessage }],
returnDisplay: backgroundMessage,
data,
};
}
private buildCompletionResult(
executionResult: ExecutionResult,
executionId: number,
): ToolResult {
if (executionResult.backgrounded) {
return this.buildBackgroundResult(executionId, executionResult.output);
}
if (executionResult.error) {
const fullDisplay = executionResult.output
? `${executionResult.output}\n\n${executionResult.error.message}`
: executionResult.error.message;
return {
llmContent: [{ text: fullDisplay }],
returnDisplay: fullDisplay,
error: { message: executionResult.error.message },
};
}
return {
llmContent: [{ text: executionResult.output }],
returnDisplay: executionResult.output,
};
}
private formatRemoteAgentError(error: unknown): string {
return `${REMOTE_AGENT_ERROR_PREFIX}${
error instanceof Error ? error.message : String(error)
}`;
}
private publishBackgroundDelta(
executionId: number,
previousOutput: string,
@@ -168,13 +257,13 @@ export class RemoteAgentInvocation extends BaseToolInvocation<
// the full latest snapshot with a clear separator.
ExecutionLifecycleService.appendOutput(
executionId,
`\n\n[Output updated]\n${nextOutput}`,
`${OUTPUT_RESYNC_MARKER}${nextOutput}`,
);
return nextOutput;
}
async execute(
_signal: AbortSignal,
signal: AbortSignal,
updateOutput?: (output: string | AnsiOutput) => void,
_shellExecutionConfig?: unknown,
setExecutionIdCallback?: (executionId: number) => void,
@@ -182,37 +271,28 @@ export class RemoteAgentInvocation extends BaseToolInvocation<
const reassembler = new A2AResultReassembler();
const executionController = new AbortController();
const onAbort = () => executionController.abort();
_signal.addEventListener('abort', onAbort, { once: true });
signal.addEventListener('abort', onAbort, { once: true });
const { pid, result } = ExecutionLifecycleService.createExecution(
const { pid: executionId, result } = ExecutionLifecycleService.createVirtualExecution(
'',
() => executionController.abort(),
);
if (pid === undefined) {
_signal.removeEventListener('abort', onAbort);
if (executionId === undefined) {
signal.removeEventListener('abort', onAbort);
return {
llmContent: [
{ text: 'Error calling remote agent: missing execution pid.' },
],
returnDisplay: 'Error calling remote agent: missing execution pid.',
llmContent: [{ text: MISSING_EXECUTION_ID_MESSAGE }],
returnDisplay: MISSING_EXECUTION_ID_MESSAGE,
error: {
message: 'Error calling remote agent: missing execution pid.',
message: MISSING_EXECUTION_ID_MESSAGE,
},
};
}
const backgroundExecutionId = pid;
setExecutionIdCallback?.(backgroundExecutionId);
setExecutionIdCallback?.(executionId);
const run = async () => {
const streamRemoteAgentExecution = async () => {
let lastOutput = '';
try {
const priorState = RemoteAgentInvocation.sessionState.get(
this.definition.name,
);
if (priorState) {
this.contextId = priorState.contextId;
this.taskId = priorState.taskId;
}
this.restoreSessionState();
const authHandler = await this.getAuthHandler();
@@ -245,25 +325,13 @@ export class RemoteAgentInvocation extends BaseToolInvocation<
const currentOutput = reassembler.toString();
lastOutput = this.publishBackgroundDelta(
backgroundExecutionId,
executionId,
lastOutput,
currentOutput,
);
if (updateOutput) {
updateOutput(currentOutput);
}
updateOutput?.(currentOutput);
const {
contextId: newContextId,
taskId: newTaskId,
clearTaskId,
} = extractIdsFromResponse(chunk);
if (newContextId) {
this.contextId = newContextId;
}
this.taskId = clearTaskId ? undefined : (newTaskId ?? this.taskId);
this.updateSessionStateFromResponseChunk(chunk);
}
if (!finalResponse) {
@@ -274,67 +342,29 @@ export class RemoteAgentInvocation extends BaseToolInvocation<
`[RemoteAgent] Final response from ${this.definition.name}:\n${JSON.stringify(finalResponse, null, 2)}`,
);
ExecutionLifecycleService.completeExecution(backgroundExecutionId, {
ExecutionLifecycleService.completeExecution(executionId, {
exitCode: 0,
});
} catch (error: unknown) {
const partialOutput = reassembler.toString();
lastOutput = this.publishBackgroundDelta(
backgroundExecutionId,
executionId,
lastOutput,
partialOutput,
);
const errorMessage = `Error calling remote agent: ${
error instanceof Error ? error.message : String(error)
}`;
ExecutionLifecycleService.completeExecution(backgroundExecutionId, {
error: new Error(errorMessage),
ExecutionLifecycleService.completeExecution(executionId, {
error: new Error(this.formatRemoteAgentError(error)),
aborted: executionController.signal.aborted,
exitCode: executionController.signal.aborted ? 130 : 1,
});
} finally {
_signal.removeEventListener('abort', onAbort);
// Persist state even on partial failures or aborts to maintain conversational continuity.
RemoteAgentInvocation.sessionState.set(this.definition.name, {
contextId: this.contextId,
taskId: this.taskId,
});
signal.removeEventListener('abort', onAbort);
this.persistSessionState();
}
};
void run();
void streamRemoteAgentExecution();
const executionResult = await result;
if (executionResult.backgrounded) {
const command = `${this.getDescription()}: ${this.params.query}`;
const backgroundMessage = `Remote agent moved to background (PID: ${backgroundExecutionId}). Output hidden. Press Ctrl+B to view.`;
const data: BackgroundExecutionData = {
executionId: backgroundExecutionId,
pid: backgroundExecutionId,
command,
initialOutput: executionResult.output,
};
return {
llmContent: [{ text: backgroundMessage }],
returnDisplay: backgroundMessage,
data,
};
}
if (executionResult.error) {
const fullDisplay = executionResult.output
? `${executionResult.output}\n\n${executionResult.error.message}`
: executionResult.error.message;
return {
llmContent: [{ text: fullDisplay }],
returnDisplay: fullDisplay,
error: { message: executionResult.error.message },
};
}
return {
llmContent: [{ text: executionResult.output }],
returnDisplay: executionResult.output,
};
return this.buildCompletionResult(executionResult, executionId);
}
}