refactor(core): extract ExecutionLifecycleService for tool backgrounding (#21717)

This commit is contained in:
Adam Weidman
2026-03-12 00:03:54 -04:00
committed by GitHub
parent 949e85ca55
commit 10ab958378
13 changed files with 1580 additions and 802 deletions
@@ -0,0 +1,298 @@
/**
* @license
* Copyright 2025 Google LLC
* SPDX-License-Identifier: Apache-2.0
*/
import { beforeEach, describe, expect, it, vi } from 'vitest';
import {
ExecutionLifecycleService,
type ExecutionHandle,
type ExecutionResult,
} from './executionLifecycleService.js';
function createResult(
overrides: Partial<ExecutionResult> = {},
): ExecutionResult {
return {
rawOutput: Buffer.from(''),
output: '',
exitCode: 0,
signal: null,
error: null,
aborted: false,
pid: 123,
executionMethod: 'child_process',
...overrides,
};
}
describe('ExecutionLifecycleService', () => {
beforeEach(() => {
ExecutionLifecycleService.resetForTest();
});
it('completes managed executions in the foreground and notifies exit subscribers', async () => {
const handle = ExecutionLifecycleService.createExecution();
if (handle.pid === undefined) {
throw new Error('Expected execution ID.');
}
const onExit = vi.fn();
const unsubscribe = ExecutionLifecycleService.onExit(handle.pid, onExit);
ExecutionLifecycleService.appendOutput(handle.pid, 'Hello');
ExecutionLifecycleService.appendOutput(handle.pid, ' World');
ExecutionLifecycleService.completeExecution(handle.pid, {
exitCode: 0,
});
const result = await handle.result;
expect(result.output).toBe('Hello World');
expect(result.executionMethod).toBe('none');
expect(result.backgrounded).toBeUndefined();
await vi.waitFor(() => {
expect(onExit).toHaveBeenCalledWith(0, undefined);
});
unsubscribe();
});
it('supports explicit execution methods for managed executions', async () => {
const handle = ExecutionLifecycleService.createExecution(
'',
undefined,
'remote_agent',
);
if (handle.pid === undefined) {
throw new Error('Expected execution ID.');
}
ExecutionLifecycleService.completeExecution(handle.pid, {
exitCode: 0,
});
const result = await handle.result;
expect(result.executionMethod).toBe('remote_agent');
});
it('supports backgrounding managed executions and continues streaming updates', async () => {
const handle = ExecutionLifecycleService.createExecution();
if (handle.pid === undefined) {
throw new Error('Expected execution ID.');
}
const chunks: string[] = [];
const onExit = vi.fn();
const unsubscribeStream = ExecutionLifecycleService.subscribe(
handle.pid,
(event) => {
if (event.type === 'data' && typeof event.chunk === 'string') {
chunks.push(event.chunk);
}
},
);
const unsubscribeExit = ExecutionLifecycleService.onExit(
handle.pid,
onExit,
);
ExecutionLifecycleService.appendOutput(handle.pid, 'Chunk 1');
ExecutionLifecycleService.background(handle.pid);
const backgroundResult = await handle.result;
expect(backgroundResult.backgrounded).toBe(true);
expect(backgroundResult.output).toBe('Chunk 1');
ExecutionLifecycleService.appendOutput(handle.pid, '\nChunk 2');
ExecutionLifecycleService.completeExecution(handle.pid, {
exitCode: 0,
});
await vi.waitFor(() => {
expect(chunks.join('')).toContain('Chunk 2');
expect(onExit).toHaveBeenCalledWith(0, undefined);
});
unsubscribeStream();
unsubscribeExit();
});
it('kills managed executions and resolves with aborted result', async () => {
const onKill = vi.fn();
const handle = ExecutionLifecycleService.createExecution('', onKill);
if (handle.pid === undefined) {
throw new Error('Expected execution ID.');
}
ExecutionLifecycleService.appendOutput(handle.pid, 'work');
ExecutionLifecycleService.kill(handle.pid);
const result = await handle.result;
expect(onKill).toHaveBeenCalledTimes(1);
expect(result.aborted).toBe(true);
expect(result.exitCode).toBe(130);
expect(result.error?.message).toContain('Operation cancelled by user');
});
it('does not probe OS process state for completed non-process execution IDs', async () => {
const handle = ExecutionLifecycleService.createExecution();
if (handle.pid === undefined) {
throw new Error('Expected execution ID.');
}
ExecutionLifecycleService.completeExecution(handle.pid, { exitCode: 0 });
await handle.result;
const processKillSpy = vi.spyOn(process, 'kill');
expect(ExecutionLifecycleService.isActive(handle.pid)).toBe(false);
expect(processKillSpy).not.toHaveBeenCalled();
processKillSpy.mockRestore();
});
it('manages external executions through registration hooks', async () => {
const writeInput = vi.fn();
const isActive = vi.fn().mockReturnValue(true);
const exitListener = vi.fn();
const chunks: string[] = [];
let output = 'seed';
const handle: ExecutionHandle = ExecutionLifecycleService.attachExecution(
4321,
{
executionMethod: 'child_process',
getBackgroundOutput: () => output,
getSubscriptionSnapshot: () => output,
writeInput,
isActive,
},
);
const unsubscribe = ExecutionLifecycleService.subscribe(4321, (event) => {
if (event.type === 'data' && typeof event.chunk === 'string') {
chunks.push(event.chunk);
}
});
ExecutionLifecycleService.onExit(4321, exitListener);
ExecutionLifecycleService.writeInput(4321, 'stdin');
expect(writeInput).toHaveBeenCalledWith('stdin');
expect(ExecutionLifecycleService.isActive(4321)).toBe(true);
const firstChunk = { type: 'data', chunk: ' +delta' } as const;
ExecutionLifecycleService.emitEvent(4321, firstChunk);
output += firstChunk.chunk;
ExecutionLifecycleService.background(4321);
const backgroundResult = await handle.result;
expect(backgroundResult.backgrounded).toBe(true);
expect(backgroundResult.output).toBe('seed +delta');
expect(backgroundResult.executionMethod).toBe('child_process');
ExecutionLifecycleService.completeWithResult(
4321,
createResult({
pid: 4321,
output: 'seed +delta done',
rawOutput: Buffer.from('seed +delta done'),
executionMethod: 'child_process',
}),
);
await vi.waitFor(() => {
expect(exitListener).toHaveBeenCalledWith(0, undefined);
});
const lateExit = vi.fn();
ExecutionLifecycleService.onExit(4321, lateExit);
expect(lateExit).toHaveBeenCalledWith(0, undefined);
unsubscribe();
});
it('supports late subscription catch-up after backgrounding an external execution', async () => {
let output = 'seed';
const onExit = vi.fn();
const handle = ExecutionLifecycleService.attachExecution(4322, {
executionMethod: 'child_process',
getBackgroundOutput: () => output,
getSubscriptionSnapshot: () => output,
});
ExecutionLifecycleService.onExit(4322, onExit);
ExecutionLifecycleService.background(4322);
const backgroundResult = await handle.result;
expect(backgroundResult.backgrounded).toBe(true);
expect(backgroundResult.output).toBe('seed');
output += ' +late';
ExecutionLifecycleService.emitEvent(4322, {
type: 'data',
chunk: ' +late',
});
const chunks: string[] = [];
const unsubscribe = ExecutionLifecycleService.subscribe(4322, (event) => {
if (event.type === 'data' && typeof event.chunk === 'string') {
chunks.push(event.chunk);
}
});
expect(chunks[0]).toBe('seed +late');
output += ' +live';
ExecutionLifecycleService.emitEvent(4322, {
type: 'data',
chunk: ' +live',
});
expect(chunks[chunks.length - 1]).toBe(' +live');
ExecutionLifecycleService.completeWithResult(
4322,
createResult({
pid: 4322,
output,
rawOutput: Buffer.from(output),
executionMethod: 'child_process',
}),
);
await vi.waitFor(() => {
expect(onExit).toHaveBeenCalledWith(0, undefined);
});
unsubscribe();
});
it('kills external executions and settles pending promises', async () => {
const terminate = vi.fn();
const onExit = vi.fn();
const handle = ExecutionLifecycleService.attachExecution(4323, {
executionMethod: 'child_process',
initialOutput: 'running',
kill: terminate,
});
ExecutionLifecycleService.onExit(4323, onExit);
ExecutionLifecycleService.kill(4323);
const result = await handle.result;
expect(terminate).toHaveBeenCalledTimes(1);
expect(result.aborted).toBe(true);
expect(result.exitCode).toBe(130);
expect(result.output).toBe('running');
expect(result.error?.message).toContain('Operation cancelled by user');
expect(onExit).toHaveBeenCalledWith(130, undefined);
});
it('rejects duplicate execution registration for active execution IDs', () => {
ExecutionLifecycleService.attachExecution(4324, {
executionMethod: 'child_process',
});
expect(() => {
ExecutionLifecycleService.attachExecution(4324, {
executionMethod: 'child_process',
});
}).toThrow('Execution 4324 is already attached.');
});
});
@@ -0,0 +1,454 @@
/**
* @license
* Copyright 2025 Google LLC
* SPDX-License-Identifier: Apache-2.0
*/
import type { AnsiOutput } from '../utils/terminalSerializer.js';
export type ExecutionMethod =
| 'lydell-node-pty'
| 'node-pty'
| 'child_process'
| 'remote_agent'
| 'none';
export interface ExecutionResult {
rawOutput: Buffer;
output: string;
exitCode: number | null;
signal: number | null;
error: Error | null;
aborted: boolean;
pid: number | undefined;
executionMethod: ExecutionMethod;
backgrounded?: boolean;
}
export interface ExecutionHandle {
pid: number | undefined;
result: Promise<ExecutionResult>;
}
export type ExecutionOutputEvent =
| {
type: 'data';
chunk: string | AnsiOutput;
}
| {
type: 'binary_detected';
}
| {
type: 'binary_progress';
bytesReceived: number;
}
| {
type: 'exit';
exitCode: number | null;
signal: number | null;
};
export interface ExecutionCompletionOptions {
exitCode?: number | null;
signal?: number | null;
error?: Error | null;
aborted?: boolean;
}
export interface ExternalExecutionRegistration {
executionMethod: ExecutionMethod;
initialOutput?: string;
getBackgroundOutput?: () => string;
getSubscriptionSnapshot?: () => string | AnsiOutput | undefined;
writeInput?: (input: string) => void;
kill?: () => void;
isActive?: () => boolean;
}
interface ManagedExecutionBase {
executionMethod: ExecutionMethod;
output: string;
getBackgroundOutput?: () => string;
getSubscriptionSnapshot?: () => string | AnsiOutput | undefined;
}
interface VirtualExecutionState extends ManagedExecutionBase {
kind: 'virtual';
onKill?: () => void;
}
interface ExternalExecutionState extends ManagedExecutionBase {
kind: 'external';
writeInput?: (input: string) => void;
kill?: () => void;
isActive?: () => boolean;
}
type ManagedExecutionState = VirtualExecutionState | ExternalExecutionState;
const NON_PROCESS_EXECUTION_ID_START = 2_000_000_000;
/**
* Central owner for execution backgrounding lifecycle across shell and tools.
*/
export class ExecutionLifecycleService {
private static readonly EXIT_INFO_TTL_MS = 5 * 60 * 1000;
private static nextExecutionId = NON_PROCESS_EXECUTION_ID_START;
private static activeExecutions = new Map<number, ManagedExecutionState>();
private static activeResolvers = new Map<
number,
(result: ExecutionResult) => void
>();
private static activeListeners = new Map<
number,
Set<(event: ExecutionOutputEvent) => void>
>();
private static exitedExecutionInfo = new Map<
number,
{ exitCode: number; signal?: number }
>();
private static storeExitInfo(
executionId: number,
exitCode: number,
signal?: number,
): void {
this.exitedExecutionInfo.set(executionId, {
exitCode,
signal,
});
setTimeout(() => {
this.exitedExecutionInfo.delete(executionId);
}, this.EXIT_INFO_TTL_MS).unref();
}
private static allocateExecutionId(): number {
let executionId = ++this.nextExecutionId;
while (this.activeExecutions.has(executionId)) {
executionId = ++this.nextExecutionId;
}
return executionId;
}
private static createPendingResult(
executionId: number,
): Promise<ExecutionResult> {
return new Promise<ExecutionResult>((resolve) => {
this.activeResolvers.set(executionId, resolve);
});
}
private static createAbortedResult(
executionId: number,
execution: ManagedExecutionState,
): ExecutionResult {
const output = execution.getBackgroundOutput?.() ?? execution.output;
return {
rawOutput: Buffer.from(output, 'utf8'),
output,
exitCode: 130,
signal: null,
error: new Error('Operation cancelled by user.'),
aborted: true,
pid: executionId,
executionMethod: execution.executionMethod,
};
}
/**
* Resets lifecycle state for isolated unit tests.
*/
static resetForTest(): void {
this.activeExecutions.clear();
this.activeResolvers.clear();
this.activeListeners.clear();
this.exitedExecutionInfo.clear();
this.nextExecutionId = NON_PROCESS_EXECUTION_ID_START;
}
static attachExecution(
executionId: number,
registration: ExternalExecutionRegistration,
): ExecutionHandle {
if (
this.activeExecutions.has(executionId) ||
this.activeResolvers.has(executionId)
) {
throw new Error(`Execution ${executionId} is already attached.`);
}
this.exitedExecutionInfo.delete(executionId);
this.activeExecutions.set(executionId, {
executionMethod: registration.executionMethod,
output: registration.initialOutput ?? '',
kind: 'external',
getBackgroundOutput: registration.getBackgroundOutput,
getSubscriptionSnapshot: registration.getSubscriptionSnapshot,
writeInput: registration.writeInput,
kill: registration.kill,
isActive: registration.isActive,
});
return {
pid: executionId,
result: this.createPendingResult(executionId),
};
}
static createExecution(
initialOutput = '',
onKill?: () => void,
executionMethod: ExecutionMethod = 'none',
): ExecutionHandle {
const executionId = this.allocateExecutionId();
this.activeExecutions.set(executionId, {
executionMethod,
output: initialOutput,
kind: 'virtual',
onKill,
getBackgroundOutput: () => {
const state = this.activeExecutions.get(executionId);
return state?.output ?? initialOutput;
},
getSubscriptionSnapshot: () => {
const state = this.activeExecutions.get(executionId);
return state?.output ?? initialOutput;
},
});
return {
pid: executionId,
result: this.createPendingResult(executionId),
};
}
static appendOutput(executionId: number, chunk: string): void {
const execution = this.activeExecutions.get(executionId);
if (!execution || chunk.length === 0) {
return;
}
execution.output += chunk;
this.emitEvent(executionId, { type: 'data', chunk });
}
static emitEvent(executionId: number, event: ExecutionOutputEvent): void {
const listeners = this.activeListeners.get(executionId);
if (listeners) {
listeners.forEach((listener) => listener(event));
}
}
private static resolvePending(
executionId: number,
result: ExecutionResult,
): void {
const resolve = this.activeResolvers.get(executionId);
if (!resolve) {
return;
}
resolve(result);
this.activeResolvers.delete(executionId);
}
private static settleExecution(
executionId: number,
result: ExecutionResult,
): void {
if (!this.activeExecutions.has(executionId)) {
return;
}
this.resolvePending(executionId, result);
this.emitEvent(executionId, {
type: 'exit',
exitCode: result.exitCode,
signal: result.signal,
});
this.activeListeners.delete(executionId);
this.activeExecutions.delete(executionId);
this.storeExitInfo(
executionId,
result.exitCode ?? 0,
result.signal ?? undefined,
);
}
static completeExecution(
executionId: number,
options?: ExecutionCompletionOptions,
): void {
const execution = this.activeExecutions.get(executionId);
if (!execution) {
return;
}
const {
error = null,
aborted = false,
exitCode = error ? 1 : 0,
signal = null,
} = options ?? {};
const output = execution.getBackgroundOutput?.() ?? execution.output;
this.settleExecution(executionId, {
rawOutput: Buffer.from(output, 'utf8'),
output,
exitCode,
signal,
error,
aborted,
pid: executionId,
executionMethod: execution.executionMethod,
});
}
static completeWithResult(
executionId: number,
result: ExecutionResult,
): void {
this.settleExecution(executionId, result);
}
static background(executionId: number): void {
const resolve = this.activeResolvers.get(executionId);
if (!resolve) {
return;
}
const execution = this.activeExecutions.get(executionId);
if (!execution) {
return;
}
const output = execution.getBackgroundOutput?.() ?? execution.output;
resolve({
rawOutput: Buffer.from(''),
output,
exitCode: null,
signal: null,
error: null,
aborted: false,
pid: executionId,
executionMethod: execution.executionMethod,
backgrounded: true,
});
this.activeResolvers.delete(executionId);
}
static subscribe(
executionId: number,
listener: (event: ExecutionOutputEvent) => void,
): () => void {
if (!this.activeListeners.has(executionId)) {
this.activeListeners.set(executionId, new Set());
}
this.activeListeners.get(executionId)?.add(listener);
const execution = this.activeExecutions.get(executionId);
if (execution) {
const snapshot =
execution.getSubscriptionSnapshot?.() ??
(execution.output.length > 0 ? execution.output : undefined);
if (snapshot && (typeof snapshot !== 'string' || snapshot.length > 0)) {
listener({ type: 'data', chunk: snapshot });
}
}
return () => {
this.activeListeners.get(executionId)?.delete(listener);
if (this.activeListeners.get(executionId)?.size === 0) {
this.activeListeners.delete(executionId);
}
};
}
static onExit(
executionId: number,
callback: (exitCode: number, signal?: number) => void,
): () => void {
if (this.activeExecutions.has(executionId)) {
const listener = (event: ExecutionOutputEvent) => {
if (event.type === 'exit') {
callback(event.exitCode ?? 0, event.signal ?? undefined);
unsubscribe();
}
};
const unsubscribe = this.subscribe(executionId, listener);
return unsubscribe;
}
const exitedInfo = this.exitedExecutionInfo.get(executionId);
if (exitedInfo) {
callback(exitedInfo.exitCode, exitedInfo.signal);
}
return () => {};
}
static kill(executionId: number): void {
const execution = this.activeExecutions.get(executionId);
if (!execution) {
return;
}
if (execution.kind === 'virtual') {
execution.onKill?.();
}
if (execution.kind === 'external') {
execution.kill?.();
}
this.completeWithResult(
executionId,
this.createAbortedResult(executionId, execution),
);
}
static isActive(executionId: number): boolean {
const execution = this.activeExecutions.get(executionId);
if (!execution) {
if (executionId >= NON_PROCESS_EXECUTION_ID_START) {
return false;
}
try {
return process.kill(executionId, 0);
} catch {
return false;
}
}
if (execution.kind === 'virtual') {
return true;
}
if (execution.kind === 'external' && execution.isActive) {
try {
return execution.isActive();
} catch {
return false;
}
}
try {
return process.kill(executionId, 0);
} catch {
return false;
}
}
static writeInput(executionId: number, input: string): void {
const execution = this.activeExecutions.get(executionId);
if (execution?.kind === 'external') {
execution.writeInput?.(input);
}
}
}
@@ -22,6 +22,7 @@ import {
type ShellOutputEvent,
type ShellExecutionConfig,
} from './shellExecutionService.js';
import { ExecutionLifecycleService } from './executionLifecycleService.js';
import type { AnsiOutput, AnsiToken } from '../utils/terminalSerializer.js';
// Hoisted Mocks
@@ -201,6 +202,7 @@ describe('ShellExecutionService', () => {
beforeEach(() => {
vi.clearAllMocks();
ExecutionLifecycleService.resetForTest();
mockSerializeTerminalToObject.mockReturnValue([]);
mockIsBinary.mockReturnValue(false);
mockPlatform.mockReturnValue('linux');
@@ -469,9 +471,10 @@ describe('ShellExecutionService', () => {
});
describe('pty interaction', () => {
let ptySpy: { mockRestore(): void };
let activePtysGetSpy: { mockRestore: () => void };
beforeEach(() => {
ptySpy = vi
activePtysGetSpy = vi
.spyOn(ShellExecutionService['activePtys'], 'get')
.mockReturnValue({
// eslint-disable-next-line @typescript-eslint/no-explicit-any
@@ -482,7 +485,7 @@ describe('ShellExecutionService', () => {
});
afterEach(() => {
ptySpy.mockRestore();
activePtysGetSpy.mockRestore();
});
it('should write to the pty and trigger a render', async () => {
@@ -1102,11 +1105,10 @@ describe('ShellExecutionService', () => {
});
it('should destroy the PTY when an exception occurs after spawn in executeWithPty', async () => {
// Simulate: spawn succeeds, Promise executor runs fine (pid accesses 1-2),
// but the return statement `{ pid: ptyProcess.pid }` (access 3) throws.
// The catch block should call spawnedPty.destroy() to release the fd.
// Simulate: spawn succeeds, but accessing ptyProcess.pid throws.
// spawnedPty is set before the pid access, so the catch block should
// call spawnedPty.destroy() to release the fd.
const destroySpy = vi.fn();
let pidAccessCount = 0;
const faultyPty = {
onData: vi.fn(),
onExit: vi.fn(),
@@ -1114,15 +1116,8 @@ describe('ShellExecutionService', () => {
kill: vi.fn(),
resize: vi.fn(),
destroy: destroySpy,
get pid() {
pidAccessCount++;
// Accesses 1-2 are inside the Promise executor (setup).
// Access 3 is at `return { pid: ptyProcess.pid, result }`,
// outside the Promise — caught by the outer try/catch.
if (pidAccessCount > 2) {
throw new Error('Simulated post-spawn failure on pid access');
}
return 77777;
get pid(): number {
throw new Error('Simulated post-spawn failure on pid access');
},
};
mockPtySpawn.mockReturnValueOnce(faultyPty);
File diff suppressed because it is too large Load Diff