feat(core): extract execution lifecycle facade

Extracts ExecutionLifecycleService to decouple background task tracking from ShellExecutionService, enabling Ctrl+B backgrounding for any tool (including remote agents).
This commit is contained in:
Adam Weidman
2026-03-09 12:20:04 -04:00
committed by Adam Weidman
parent d012929a28
commit a367a1724c
13 changed files with 1556 additions and 793 deletions
@@ -80,7 +80,7 @@ export const useShellCommandProcessor = (
setShellInputFocused: (value: boolean) => void,
terminalWidth?: number,
terminalHeight?: number,
activeToolPtyId?: number,
activeBackgroundExecutionId?: number,
isWaitingForConfirmation?: boolean,
) => {
const [state, dispatch] = useReducer(shellReducer, initialState);
@@ -103,7 +103,8 @@ export const useShellCommandProcessor = (
}
const m = manager.current;
const activePtyId = state.activeShellPtyId || activeToolPtyId;
const activePtyId =
state.activeShellPtyId ?? activeBackgroundExecutionId ?? undefined;
useEffect(() => {
const isForegroundActive = !!activePtyId || !!isWaitingForConfirmation;
@@ -191,7 +192,8 @@ export const useShellCommandProcessor = (
]);
const backgroundCurrentShell = useCallback(() => {
const pidToBackground = state.activeShellPtyId || activeToolPtyId;
const pidToBackground =
state.activeShellPtyId ?? activeBackgroundExecutionId;
if (pidToBackground) {
ShellExecutionService.background(pidToBackground);
m.backgroundedPids.add(pidToBackground);
@@ -202,7 +204,7 @@ export const useShellCommandProcessor = (
m.restoreTimeout = null;
}
}
}, [state.activeShellPtyId, activeToolPtyId, m]);
}, [state.activeShellPtyId, activeBackgroundExecutionId, m]);
const dismissBackgroundShell = useCallback(
(pid: number) => {
@@ -96,6 +96,25 @@ const MockedUserPromptEvent = vi.hoisted(() =>
vi.fn().mockImplementation(() => {}),
);
const mockParseAndFormatApiError = vi.hoisted(() => vi.fn());
const mockIsBackgroundExecutionData = vi.hoisted(
() =>
(data: unknown): data is { pid?: number } => {
if (typeof data !== 'object' || data === null) {
return false;
}
const value = data as {
pid?: unknown;
command?: unknown;
initialOutput?: unknown;
};
return (
(value.pid === undefined || typeof value.pid === 'number') &&
(value.command === undefined || typeof value.command === 'string') &&
(value.initialOutput === undefined ||
typeof value.initialOutput === 'string')
);
},
);
const MockValidationRequiredError = vi.hoisted(
() =>
@@ -121,6 +140,7 @@ vi.mock('@google/gemini-cli-core', async (importOriginal) => {
const actualCoreModule = (await importOriginal()) as any;
return {
...actualCoreModule,
isBackgroundExecutionData: mockIsBackgroundExecutionData,
GitService: vi.fn(),
GeminiClient: MockedGeminiClientClass,
UserPromptEvent: MockedUserPromptEvent,
@@ -599,6 +619,35 @@ describe('useGeminiStream', () => {
expect(mockSendMessageStream).not.toHaveBeenCalled(); // submitQuery uses this
});
it('should expose activePtyId for non-shell executing tools that report an execution ID', () => {
const remoteExecutingTool: TrackedExecutingToolCall = {
request: {
callId: 'remote-call-1',
name: 'remote_agent_call',
args: {},
isClientInitiated: false,
prompt_id: 'prompt-id-remote',
},
status: CoreToolCallStatus.Executing,
responseSubmittedToGemini: false,
tool: {
name: 'remote_agent_call',
displayName: 'Remote Agent',
description: 'Remote agent execution',
build: vi.fn(),
} as any,
invocation: {
getDescription: () => 'Calling remote agent',
} as unknown as AnyToolInvocation,
startTime: Date.now(),
liveOutput: 'working...',
pid: 4242,
};
const { result } = renderTestHook([remoteExecutingTool]);
expect(result.current.activePtyId).toBe(4242);
});
it('should submit tool responses when all tool calls are completed and ready', async () => {
const toolCall1ResponseParts: Part[] = [{ text: 'tool 1 final response' }];
const toolCall2ResponseParts: Part[] = [{ text: 'tool 2 final response' }];
+45 -38
View File
@@ -37,6 +37,7 @@ import {
buildUserSteeringHintPrompt,
GeminiCliOperation,
getPlanModeExitMessage,
isBackgroundExecutionData,
} from '@google/gemini-cli-core';
import type {
Config,
@@ -94,10 +95,10 @@ type ToolResponseWithParts = ToolCallResponseInfo & {
llmContent?: PartListUnion;
};
interface ShellToolData {
pid?: number;
command?: string;
initialOutput?: string;
interface BackgroundedToolInfo {
pid: number;
command: string;
initialOutput: string;
}
enum StreamProcessingStatus {
@@ -111,15 +112,32 @@ const SUPPRESSED_TOOL_ERRORS_NOTE =
const LOW_VERBOSITY_FAILURE_NOTE =
'This request failed. Press F12 for diagnostics, or run /settings and change "Error Verbosity" to full for full details.';
function isShellToolData(data: unknown): data is ShellToolData {
if (typeof data !== 'object' || data === null) {
return false;
function getBackgroundedToolInfo(
toolCall: TrackedCompletedToolCall | TrackedCancelledToolCall,
): BackgroundedToolInfo | undefined {
const response = toolCall.response as ToolResponseWithParts;
const rawData: unknown = response?.data;
if (!isBackgroundExecutionData(rawData)) {
return undefined;
}
const d = data as Partial<ShellToolData>;
if (rawData.pid === undefined) {
return undefined;
}
return {
pid: rawData.pid,
command: rawData.command ?? toolCall.request.name,
initialOutput: rawData.initialOutput ?? '',
};
}
function isBackgroundableExecutingToolCall(
toolCall: TrackedToolCall,
): toolCall is TrackedExecutingToolCall {
return (
(d.pid === undefined || typeof d.pid === 'number') &&
(d.command === undefined || typeof d.command === 'string') &&
(d.initialOutput === undefined || typeof d.initialOutput === 'string')
toolCall.status === CoreToolCallStatus.Executing &&
typeof toolCall.pid === 'number'
);
}
@@ -311,13 +329,11 @@ export const useGeminiStream = (
getPreferredEditor,
);
const activeToolPtyId = useMemo(() => {
const executingShellTool = toolCalls.find(
(tc) =>
tc.status === 'executing' && tc.request.name === 'run_shell_command',
const activeBackgroundExecutionId = useMemo(() => {
const executingBackgroundableTool = toolCalls.find(
isBackgroundableExecutingToolCall,
);
// eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion
return (executingShellTool as TrackedExecutingToolCall | undefined)?.pid;
return executingBackgroundableTool?.pid;
}, [toolCalls]);
const onExec = useCallback(async (done: Promise<void>) => {
@@ -347,7 +363,7 @@ export const useGeminiStream = (
setShellInputFocused,
terminalWidth,
terminalHeight,
activeToolPtyId,
activeBackgroundExecutionId,
);
const streamingState = useMemo(
@@ -525,7 +541,8 @@ export const useGeminiStream = (
onComplete: (result: { userSelection: 'disable' | 'keep' }) => void;
} | null>(null);
const activePtyId = activeShellPtyId || activeToolPtyId;
const activePtyId =
activeShellPtyId ?? activeBackgroundExecutionId ?? undefined;
const prevActiveShellPtyIdRef = useRef<number | null>(null);
useEffect(() => {
@@ -1651,26 +1668,16 @@ export const useGeminiStream = (
!processedMemoryToolsRef.current.has(t.request.callId),
);
// Handle backgrounded shell tools
completedAndReadyToSubmitTools.forEach((t) => {
const isShell = t.request.name === 'run_shell_command';
// Access result from the tracked tool call response
const response = t.response as ToolResponseWithParts;
const rawData = response?.data;
const data = isShellToolData(rawData) ? rawData : undefined;
// Use data.pid for shell commands moved to the background.
const pid = data?.pid;
if (isShell && pid) {
// eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion
const command = (data?.['command'] as string) ?? 'shell';
// eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion
const initialOutput = (data?.['initialOutput'] as string) ?? '';
registerBackgroundShell(pid, command, initialOutput);
for (const toolCall of completedAndReadyToSubmitTools) {
const backgroundedTool = getBackgroundedToolInfo(toolCall);
if (backgroundedTool) {
registerBackgroundShell(
backgroundedTool.pid,
backgroundedTool.command,
backgroundedTool.initialOutput,
);
}
}
});
if (newSuccessfulMemorySaves.length > 0) {
// Perform the refresh only if there are new ones.
@@ -11,6 +11,7 @@ import {
BaseToolInvocation,
type ToolResult,
type AnyDeclarativeTool,
type ToolLiveOutput,
} from '../tools/tools.js';
import type { MessageBus } from '../confirmation-bus/message-bus.js';
import type { HookSystem } from '../hooks/hookSystem.js';
@@ -37,6 +38,30 @@ class MockInvocation extends BaseToolInvocation<{ key?: string }, ToolResult> {
}
}
class MockBackgroundableInvocation extends BaseToolInvocation<
{ key?: string },
ToolResult
> {
constructor(params: { key?: string }, messageBus: MessageBus) {
super(params, messageBus);
}
getDescription() {
return 'mock-pid';
}
async execute(
_signal: AbortSignal,
_updateOutput?: (output: ToolLiveOutput) => void,
_shellExecutionConfig?: unknown,
setExecutionIdCallback?: (executionId: number) => void,
) {
setExecutionIdCallback?.(4242);
return {
llmContent: 'pid',
returnDisplay: 'pid',
};
}
}
describe('executeToolWithHooks', () => {
let messageBus: MessageBus;
let mockTool: AnyDeclarativeTool;
@@ -258,4 +283,26 @@ describe('executeToolWithHooks', () => {
expect(invocation.params.key).toBe('original');
expect(mockTool.build).not.toHaveBeenCalled();
});
it('should pass execution ID callback through for non-shell invocations', async () => {
const invocation = new MockBackgroundableInvocation({}, messageBus);
const abortSignal = new AbortController().signal;
const setExecutionIdCallback = vi.fn();
vi.mocked(mockHookSystem.fireBeforeToolEvent).mockResolvedValue(undefined);
vi.mocked(mockHookSystem.fireAfterToolEvent).mockResolvedValue(undefined);
await executeToolWithHooks(
invocation,
'test_tool',
abortSignal,
mockTool,
undefined,
undefined,
setExecutionIdCallback,
mockConfig,
);
expect(setExecutionIdCallback).toHaveBeenCalledWith(4242);
});
});
+8 -17
View File
@@ -15,7 +15,6 @@ import type {
import { ToolErrorType } from '../tools/tool-error.js';
import { debugLogger } from '../utils/debugLogger.js';
import type { ShellExecutionConfig } from '../index.js';
import { ShellToolInvocation } from '../tools/shell.js';
import { DiscoveredMCPToolInvocation } from '../tools/mcp-tool.js';
/**
@@ -26,7 +25,7 @@ import { DiscoveredMCPToolInvocation } from '../tools/mcp-tool.js';
* @returns MCP context if this is an MCP tool, undefined otherwise
*/
function extractMcpContext(
invocation: ShellToolInvocation | AnyToolInvocation,
invocation: AnyToolInvocation,
config: Config,
): McpToolContext | undefined {
if (!(invocation instanceof DiscoveredMCPToolInvocation)) {
@@ -63,18 +62,18 @@ function extractMcpContext(
* @param signal Abort signal for cancellation
* @param liveOutputCallback Optional callback for live output updates
* @param shellExecutionConfig Optional shell execution config
* @param setPidCallback Optional callback to set the PID for shell invocations
* @param setExecutionIdCallback Optional callback to set an execution ID for backgroundable invocations
* @param config Config to look up MCP server details for hook context
* @returns The tool result
*/
export async function executeToolWithHooks(
invocation: ShellToolInvocation | AnyToolInvocation,
invocation: AnyToolInvocation,
toolName: string,
signal: AbortSignal,
tool: AnyDeclarativeTool,
liveOutputCallback?: (outputChunk: ToolLiveOutput) => void,
shellExecutionConfig?: ShellExecutionConfig,
setPidCallback?: (pid: number) => void,
setExecutionIdCallback?: (executionId: number) => void,
config?: Config,
originalRequestName?: string,
): Promise<ToolResult> {
@@ -154,22 +153,14 @@ export async function executeToolWithHooks(
}
}
// Execute the actual tool
let toolResult: ToolResult;
if (setPidCallback && invocation instanceof ShellToolInvocation) {
toolResult = await invocation.execute(
// Execute the actual tool. Tools that support backgrounding can optionally
// surface an execution ID via the callback.
const toolResult: ToolResult = await invocation.execute(
signal,
liveOutputCallback,
shellExecutionConfig,
setPidCallback,
setExecutionIdCallback,
);
} else {
toolResult = await invocation.execute(
signal,
liveOutputCallback,
shellExecutionConfig,
);
}
// Append notification if parameters were modified
if (inputWasModified) {
@@ -469,7 +469,7 @@ describe('ToolExecutor', () => {
expect(result.status).toBe(CoreToolCallStatus.Success);
});
it('should report PID updates for shell tools', async () => {
it('should report execution ID updates for backgroundable tools', async () => {
// 1. Setup ShellToolInvocation
const messageBus = createMockMessageBus();
const shellInvocation = new ShellToolInvocation(
@@ -480,7 +480,7 @@ describe('ToolExecutor', () => {
// We need a dummy tool that matches the invocation just for structure
const mockTool = new MockTool({ name: SHELL_TOOL_NAME });
// 2. Mock executeToolWithHooks to trigger the PID callback
// 2. Mock executeToolWithHooks to trigger the execution ID callback
const testPid = 12345;
vi.mocked(coreToolHookTriggers.executeToolWithHooks).mockImplementation(
async (
@@ -490,13 +490,13 @@ describe('ToolExecutor', () => {
_tool,
_liveCb,
_shellCfg,
setPidCallback,
setExecutionIdCallback,
_config,
_originalRequestName,
) => {
// Simulate the shell tool reporting a PID
if (setPidCallback) {
setPidCallback(testPid);
// Simulate the tool reporting an execution ID
if (setExecutionIdCallback) {
setExecutionIdCallback(testPid);
}
return { llmContent: 'done', returnDisplay: 'done' };
},
@@ -525,7 +525,7 @@ describe('ToolExecutor', () => {
onUpdateToolCall,
});
// 4. Verify PID was reported
// 4. Verify execution ID was reported
expect(onUpdateToolCall).toHaveBeenCalledWith(
expect.objectContaining({
status: CoreToolCallStatus.Executing,
@@ -534,6 +534,59 @@ describe('ToolExecutor', () => {
);
});
it('should report execution ID updates for non-shell backgroundable tools', async () => {
const mockTool = new MockTool({
name: 'remote_agent_call',
description: 'Remote agent call',
});
const invocation = mockTool.build({});
const testExecutionId = 67890;
vi.mocked(coreToolHookTriggers.executeToolWithHooks).mockImplementation(
async (
_inv,
_name,
_sig,
_tool,
_liveCb,
_shellCfg,
setExecutionIdCallback,
) => {
setExecutionIdCallback?.(testExecutionId);
return { llmContent: 'done', returnDisplay: 'done' };
},
);
const scheduledCall: ScheduledToolCall = {
status: CoreToolCallStatus.Scheduled,
request: {
callId: 'call-remote-pid',
name: 'remote_agent_call',
args: {},
isClientInitiated: false,
prompt_id: 'prompt-remote-pid',
},
tool: mockTool,
invocation: invocation as unknown as AnyToolInvocation,
startTime: Date.now(),
};
const onUpdateToolCall = vi.fn();
await executor.execute({
call: scheduledCall,
signal: new AbortController().signal,
onUpdateToolCall,
});
expect(onUpdateToolCall).toHaveBeenCalledWith(
expect.objectContaining({
status: CoreToolCallStatus.Executing,
pid: testExecutionId,
}),
);
});
it('should return cancelled result with partial output when signal is aborted', async () => {
const mockTool = new MockTool({
name: 'slowTool',
+5 -20
View File
@@ -16,7 +16,6 @@ import {
type ToolLiveOutput,
} from '../index.js';
import { SHELL_TOOL_NAME } from '../tools/tool-names.js';
import { ShellToolInvocation } from '../tools/shell.js';
import { DiscoveredMCPTool } from '../tools/mcp-tool.js';
import { executeToolWithHooks } from '../core/coreToolHookTriggers.js';
import {
@@ -89,43 +88,29 @@ export class ToolExecutor {
let completedToolCall: CompletedToolCall;
try {
let promise: Promise<ToolResult>;
if (invocation instanceof ShellToolInvocation) {
const setPidCallback = (pid: number) => {
const setExecutionIdCallback = (executionId: number) => {
const executingCall: ExecutingToolCall = {
...call,
status: CoreToolCallStatus.Executing,
tool,
invocation,
pid,
pid: executionId,
startTime: 'startTime' in call ? call.startTime : undefined,
};
onUpdateToolCall(executingCall);
};
promise = executeToolWithHooks(
const promise = executeToolWithHooks(
invocation,
toolName,
signal,
tool,
liveOutputCallback,
shellExecutionConfig,
setPidCallback,
setExecutionIdCallback,
this.config,
request.originalRequestName,
);
} else {
promise = executeToolWithHooks(
invocation,
toolName,
signal,
tool,
liveOutputCallback,
shellExecutionConfig,
undefined,
this.config,
request.originalRequestName,
);
}
const toolResult: ToolResult = await promise;
@@ -0,0 +1,298 @@
/**
* @license
* Copyright 2025 Google LLC
* SPDX-License-Identifier: Apache-2.0
*/
import { beforeEach, describe, expect, it, vi } from 'vitest';
import {
ExecutionLifecycleService,
type ExecutionHandle,
type ExecutionResult,
} from './executionLifecycleService.js';
function createResult(
overrides: Partial<ExecutionResult> = {},
): ExecutionResult {
return {
rawOutput: Buffer.from(''),
output: '',
exitCode: 0,
signal: null,
error: null,
aborted: false,
pid: 123,
executionMethod: 'child_process',
...overrides,
};
}
describe('ExecutionLifecycleService', () => {
beforeEach(() => {
ExecutionLifecycleService.resetForTest();
});
it('completes managed executions in the foreground and notifies exit subscribers', async () => {
const handle = ExecutionLifecycleService.createExecution();
if (handle.pid === undefined) {
throw new Error('Expected execution ID.');
}
const onExit = vi.fn();
const unsubscribe = ExecutionLifecycleService.onExit(handle.pid, onExit);
ExecutionLifecycleService.appendOutput(handle.pid, 'Hello');
ExecutionLifecycleService.appendOutput(handle.pid, ' World');
ExecutionLifecycleService.completeExecution(handle.pid, {
exitCode: 0,
});
const result = await handle.result;
expect(result.output).toBe('Hello World');
expect(result.executionMethod).toBe('none');
expect(result.backgrounded).toBeUndefined();
await vi.waitFor(() => {
expect(onExit).toHaveBeenCalledWith(0, undefined);
});
unsubscribe();
});
it('supports explicit execution methods for managed executions', async () => {
const handle = ExecutionLifecycleService.createExecution(
'',
undefined,
'remote_agent',
);
if (handle.pid === undefined) {
throw new Error('Expected execution ID.');
}
ExecutionLifecycleService.completeExecution(handle.pid, {
exitCode: 0,
});
const result = await handle.result;
expect(result.executionMethod).toBe('remote_agent');
});
it('supports backgrounding managed executions and continues streaming updates', async () => {
const handle = ExecutionLifecycleService.createExecution();
if (handle.pid === undefined) {
throw new Error('Expected execution ID.');
}
const chunks: string[] = [];
const onExit = vi.fn();
const unsubscribeStream = ExecutionLifecycleService.subscribe(
handle.pid,
(event) => {
if (event.type === 'data' && typeof event.chunk === 'string') {
chunks.push(event.chunk);
}
},
);
const unsubscribeExit = ExecutionLifecycleService.onExit(
handle.pid,
onExit,
);
ExecutionLifecycleService.appendOutput(handle.pid, 'Chunk 1');
ExecutionLifecycleService.background(handle.pid);
const backgroundResult = await handle.result;
expect(backgroundResult.backgrounded).toBe(true);
expect(backgroundResult.output).toBe('Chunk 1');
ExecutionLifecycleService.appendOutput(handle.pid, '\nChunk 2');
ExecutionLifecycleService.completeExecution(handle.pid, {
exitCode: 0,
});
await vi.waitFor(() => {
expect(chunks.join('')).toContain('Chunk 2');
expect(onExit).toHaveBeenCalledWith(0, undefined);
});
unsubscribeStream();
unsubscribeExit();
});
it('kills managed executions and resolves with aborted result', async () => {
const onKill = vi.fn();
const handle = ExecutionLifecycleService.createExecution('', onKill);
if (handle.pid === undefined) {
throw new Error('Expected execution ID.');
}
ExecutionLifecycleService.appendOutput(handle.pid, 'work');
ExecutionLifecycleService.kill(handle.pid);
const result = await handle.result;
expect(onKill).toHaveBeenCalledTimes(1);
expect(result.aborted).toBe(true);
expect(result.exitCode).toBe(130);
expect(result.error?.message).toContain('Operation cancelled by user');
});
it('does not probe OS process state for completed non-process execution IDs', async () => {
const handle = ExecutionLifecycleService.createExecution();
if (handle.pid === undefined) {
throw new Error('Expected execution ID.');
}
ExecutionLifecycleService.completeExecution(handle.pid, { exitCode: 0 });
await handle.result;
const processKillSpy = vi.spyOn(process, 'kill');
expect(ExecutionLifecycleService.isActive(handle.pid)).toBe(false);
expect(processKillSpy).not.toHaveBeenCalled();
processKillSpy.mockRestore();
});
it('manages external executions through registration hooks', async () => {
const writeInput = vi.fn();
const isActive = vi.fn().mockReturnValue(true);
const exitListener = vi.fn();
const chunks: string[] = [];
let output = 'seed';
const handle: ExecutionHandle = ExecutionLifecycleService.attachExecution(
4321,
{
executionMethod: 'child_process',
getBackgroundOutput: () => output,
getSubscriptionSnapshot: () => output,
writeInput,
isActive,
},
);
const unsubscribe = ExecutionLifecycleService.subscribe(4321, (event) => {
if (event.type === 'data' && typeof event.chunk === 'string') {
chunks.push(event.chunk);
}
});
ExecutionLifecycleService.onExit(4321, exitListener);
ExecutionLifecycleService.writeInput(4321, 'stdin');
expect(writeInput).toHaveBeenCalledWith('stdin');
expect(ExecutionLifecycleService.isActive(4321)).toBe(true);
const firstChunk = { type: 'data', chunk: ' +delta' } as const;
ExecutionLifecycleService.emitEvent(4321, firstChunk);
output += firstChunk.chunk;
ExecutionLifecycleService.background(4321);
const backgroundResult = await handle.result;
expect(backgroundResult.backgrounded).toBe(true);
expect(backgroundResult.output).toBe('seed +delta');
expect(backgroundResult.executionMethod).toBe('child_process');
ExecutionLifecycleService.completeWithResult(
4321,
createResult({
pid: 4321,
output: 'seed +delta done',
rawOutput: Buffer.from('seed +delta done'),
executionMethod: 'child_process',
}),
);
await vi.waitFor(() => {
expect(exitListener).toHaveBeenCalledWith(0, undefined);
});
const lateExit = vi.fn();
ExecutionLifecycleService.onExit(4321, lateExit);
expect(lateExit).toHaveBeenCalledWith(0, undefined);
unsubscribe();
});
it('supports late subscription catch-up after backgrounding an external execution', async () => {
let output = 'seed';
const onExit = vi.fn();
const handle = ExecutionLifecycleService.attachExecution(4322, {
executionMethod: 'child_process',
getBackgroundOutput: () => output,
getSubscriptionSnapshot: () => output,
});
ExecutionLifecycleService.onExit(4322, onExit);
ExecutionLifecycleService.background(4322);
const backgroundResult = await handle.result;
expect(backgroundResult.backgrounded).toBe(true);
expect(backgroundResult.output).toBe('seed');
output += ' +late';
ExecutionLifecycleService.emitEvent(4322, {
type: 'data',
chunk: ' +late',
});
const chunks: string[] = [];
const unsubscribe = ExecutionLifecycleService.subscribe(4322, (event) => {
if (event.type === 'data' && typeof event.chunk === 'string') {
chunks.push(event.chunk);
}
});
expect(chunks[0]).toBe('seed +late');
output += ' +live';
ExecutionLifecycleService.emitEvent(4322, {
type: 'data',
chunk: ' +live',
});
expect(chunks[chunks.length - 1]).toBe(' +live');
ExecutionLifecycleService.completeWithResult(
4322,
createResult({
pid: 4322,
output,
rawOutput: Buffer.from(output),
executionMethod: 'child_process',
}),
);
await vi.waitFor(() => {
expect(onExit).toHaveBeenCalledWith(0, undefined);
});
unsubscribe();
});
it('kills external executions and settles pending promises', async () => {
const terminate = vi.fn();
const onExit = vi.fn();
const handle = ExecutionLifecycleService.attachExecution(4323, {
executionMethod: 'child_process',
initialOutput: 'running',
kill: terminate,
});
ExecutionLifecycleService.onExit(4323, onExit);
ExecutionLifecycleService.kill(4323);
const result = await handle.result;
expect(terminate).toHaveBeenCalledTimes(1);
expect(result.aborted).toBe(true);
expect(result.exitCode).toBe(130);
expect(result.output).toBe('running');
expect(result.error?.message).toContain('Operation cancelled by user');
expect(onExit).toHaveBeenCalledWith(130, undefined);
});
it('rejects duplicate execution registration for active execution IDs', () => {
ExecutionLifecycleService.attachExecution(4324, {
executionMethod: 'child_process',
});
expect(() => {
ExecutionLifecycleService.attachExecution(4324, {
executionMethod: 'child_process',
});
}).toThrow('Execution 4324 is already attached.');
});
});
@@ -0,0 +1,454 @@
/**
* @license
* Copyright 2025 Google LLC
* SPDX-License-Identifier: Apache-2.0
*/
import type { AnsiOutput } from '../utils/terminalSerializer.js';
export type ExecutionMethod =
| 'lydell-node-pty'
| 'node-pty'
| 'child_process'
| 'remote_agent'
| 'none';
export interface ExecutionResult {
rawOutput: Buffer;
output: string;
exitCode: number | null;
signal: number | null;
error: Error | null;
aborted: boolean;
pid: number | undefined;
executionMethod: ExecutionMethod;
backgrounded?: boolean;
}
export interface ExecutionHandle {
pid: number | undefined;
result: Promise<ExecutionResult>;
}
export type ExecutionOutputEvent =
| {
type: 'data';
chunk: string | AnsiOutput;
}
| {
type: 'binary_detected';
}
| {
type: 'binary_progress';
bytesReceived: number;
}
| {
type: 'exit';
exitCode: number | null;
signal: number | null;
};
export interface ExecutionCompletionOptions {
exitCode?: number | null;
signal?: number | null;
error?: Error | null;
aborted?: boolean;
}
export interface ExternalExecutionRegistration {
executionMethod: ExecutionMethod;
initialOutput?: string;
getBackgroundOutput?: () => string;
getSubscriptionSnapshot?: () => string | AnsiOutput | undefined;
writeInput?: (input: string) => void;
kill?: () => void;
isActive?: () => boolean;
}
interface ManagedExecutionBase {
executionMethod: ExecutionMethod;
output: string;
getBackgroundOutput?: () => string;
getSubscriptionSnapshot?: () => string | AnsiOutput | undefined;
}
interface VirtualExecutionState extends ManagedExecutionBase {
kind: 'virtual';
onKill?: () => void;
}
interface ExternalExecutionState extends ManagedExecutionBase {
kind: 'external';
writeInput?: (input: string) => void;
kill?: () => void;
isActive?: () => boolean;
}
type ManagedExecutionState = VirtualExecutionState | ExternalExecutionState;
const NON_PROCESS_EXECUTION_ID_START = 2_000_000_000;
/**
* Central owner for execution backgrounding lifecycle across shell and tools.
*/
export class ExecutionLifecycleService {
private static readonly EXIT_INFO_TTL_MS = 5 * 60 * 1000;
private static nextExecutionId = NON_PROCESS_EXECUTION_ID_START;
private static activeExecutions = new Map<number, ManagedExecutionState>();
private static activeResolvers = new Map<
number,
(result: ExecutionResult) => void
>();
private static activeListeners = new Map<
number,
Set<(event: ExecutionOutputEvent) => void>
>();
private static exitedExecutionInfo = new Map<
number,
{ exitCode: number; signal?: number }
>();
private static storeExitInfo(
executionId: number,
exitCode: number,
signal?: number,
): void {
this.exitedExecutionInfo.set(executionId, {
exitCode,
signal,
});
setTimeout(() => {
this.exitedExecutionInfo.delete(executionId);
}, this.EXIT_INFO_TTL_MS).unref();
}
private static allocateExecutionId(): number {
let executionId = ++this.nextExecutionId;
while (this.activeExecutions.has(executionId)) {
executionId = ++this.nextExecutionId;
}
return executionId;
}
private static createPendingResult(
executionId: number,
): Promise<ExecutionResult> {
return new Promise<ExecutionResult>((resolve) => {
this.activeResolvers.set(executionId, resolve);
});
}
private static createAbortedResult(
executionId: number,
execution: ManagedExecutionState,
): ExecutionResult {
const output = execution.getBackgroundOutput?.() ?? execution.output;
return {
rawOutput: Buffer.from(output, 'utf8'),
output,
exitCode: 130,
signal: null,
error: new Error('Operation cancelled by user.'),
aborted: true,
pid: executionId,
executionMethod: execution.executionMethod,
};
}
/**
* Resets lifecycle state for isolated unit tests.
*/
static resetForTest(): void {
this.activeExecutions.clear();
this.activeResolvers.clear();
this.activeListeners.clear();
this.exitedExecutionInfo.clear();
this.nextExecutionId = NON_PROCESS_EXECUTION_ID_START;
}
static attachExecution(
executionId: number,
registration: ExternalExecutionRegistration,
): ExecutionHandle {
if (
this.activeExecutions.has(executionId) ||
this.activeResolvers.has(executionId)
) {
throw new Error(`Execution ${executionId} is already attached.`);
}
this.exitedExecutionInfo.delete(executionId);
this.activeExecutions.set(executionId, {
executionMethod: registration.executionMethod,
output: registration.initialOutput ?? '',
kind: 'external',
getBackgroundOutput: registration.getBackgroundOutput,
getSubscriptionSnapshot: registration.getSubscriptionSnapshot,
writeInput: registration.writeInput,
kill: registration.kill,
isActive: registration.isActive,
});
return {
pid: executionId,
result: this.createPendingResult(executionId),
};
}
static createExecution(
initialOutput = '',
onKill?: () => void,
executionMethod: ExecutionMethod = 'none',
): ExecutionHandle {
const executionId = this.allocateExecutionId();
this.activeExecutions.set(executionId, {
executionMethod,
output: initialOutput,
kind: 'virtual',
onKill,
getBackgroundOutput: () => {
const state = this.activeExecutions.get(executionId);
return state?.output ?? initialOutput;
},
getSubscriptionSnapshot: () => {
const state = this.activeExecutions.get(executionId);
return state?.output ?? initialOutput;
},
});
return {
pid: executionId,
result: this.createPendingResult(executionId),
};
}
static appendOutput(executionId: number, chunk: string): void {
const execution = this.activeExecutions.get(executionId);
if (!execution || chunk.length === 0) {
return;
}
execution.output += chunk;
this.emitEvent(executionId, { type: 'data', chunk });
}
static emitEvent(executionId: number, event: ExecutionOutputEvent): void {
const listeners = this.activeListeners.get(executionId);
if (listeners) {
listeners.forEach((listener) => listener(event));
}
}
private static resolvePending(
executionId: number,
result: ExecutionResult,
): void {
const resolve = this.activeResolvers.get(executionId);
if (!resolve) {
return;
}
resolve(result);
this.activeResolvers.delete(executionId);
}
private static settleExecution(
executionId: number,
result: ExecutionResult,
): void {
if (!this.activeExecutions.has(executionId)) {
return;
}
this.resolvePending(executionId, result);
this.emitEvent(executionId, {
type: 'exit',
exitCode: result.exitCode,
signal: result.signal,
});
this.activeListeners.delete(executionId);
this.activeExecutions.delete(executionId);
this.storeExitInfo(
executionId,
result.exitCode ?? 0,
result.signal ?? undefined,
);
}
static completeExecution(
executionId: number,
options?: ExecutionCompletionOptions,
): void {
const execution = this.activeExecutions.get(executionId);
if (!execution) {
return;
}
const {
error = null,
aborted = false,
exitCode = error ? 1 : 0,
signal = null,
} = options ?? {};
const output = execution.getBackgroundOutput?.() ?? execution.output;
this.settleExecution(executionId, {
rawOutput: Buffer.from(output, 'utf8'),
output,
exitCode,
signal,
error,
aborted,
pid: executionId,
executionMethod: execution.executionMethod,
});
}
static completeWithResult(
executionId: number,
result: ExecutionResult,
): void {
this.settleExecution(executionId, result);
}
static background(executionId: number): void {
const resolve = this.activeResolvers.get(executionId);
if (!resolve) {
return;
}
const execution = this.activeExecutions.get(executionId);
if (!execution) {
return;
}
const output = execution.getBackgroundOutput?.() ?? execution.output;
resolve({
rawOutput: Buffer.from(''),
output,
exitCode: null,
signal: null,
error: null,
aborted: false,
pid: executionId,
executionMethod: execution.executionMethod,
backgrounded: true,
});
this.activeResolvers.delete(executionId);
}
static subscribe(
executionId: number,
listener: (event: ExecutionOutputEvent) => void,
): () => void {
if (!this.activeListeners.has(executionId)) {
this.activeListeners.set(executionId, new Set());
}
this.activeListeners.get(executionId)?.add(listener);
const execution = this.activeExecutions.get(executionId);
if (execution) {
const snapshot =
execution.getSubscriptionSnapshot?.() ??
(execution.output.length > 0 ? execution.output : undefined);
if (snapshot && (typeof snapshot !== 'string' || snapshot.length > 0)) {
listener({ type: 'data', chunk: snapshot });
}
}
return () => {
this.activeListeners.get(executionId)?.delete(listener);
if (this.activeListeners.get(executionId)?.size === 0) {
this.activeListeners.delete(executionId);
}
};
}
static onExit(
executionId: number,
callback: (exitCode: number, signal?: number) => void,
): () => void {
if (this.activeExecutions.has(executionId)) {
const listener = (event: ExecutionOutputEvent) => {
if (event.type === 'exit') {
callback(event.exitCode ?? 0, event.signal ?? undefined);
unsubscribe();
}
};
const unsubscribe = this.subscribe(executionId, listener);
return unsubscribe;
}
const exitedInfo = this.exitedExecutionInfo.get(executionId);
if (exitedInfo) {
callback(exitedInfo.exitCode, exitedInfo.signal);
}
return () => {};
}
static kill(executionId: number): void {
const execution = this.activeExecutions.get(executionId);
if (!execution) {
return;
}
if (execution.kind === 'virtual') {
execution.onKill?.();
}
if (execution.kind === 'external') {
execution.kill?.();
}
this.completeWithResult(
executionId,
this.createAbortedResult(executionId, execution),
);
}
static isActive(executionId: number): boolean {
const execution = this.activeExecutions.get(executionId);
if (!execution) {
if (executionId >= NON_PROCESS_EXECUTION_ID_START) {
return false;
}
try {
return process.kill(executionId, 0);
} catch {
return false;
}
}
if (execution.kind === 'virtual') {
return true;
}
if (execution.kind === 'external' && execution.isActive) {
try {
return execution.isActive();
} catch {
return false;
}
}
try {
return process.kill(executionId, 0);
} catch {
return false;
}
}
static writeInput(executionId: number, input: string): void {
const execution = this.activeExecutions.get(executionId);
if (execution?.kind === 'external') {
execution.writeInput?.(input);
}
}
}
@@ -21,6 +21,7 @@ import {
type ShellOutputEvent,
type ShellExecutionConfig,
} from './shellExecutionService.js';
import { ExecutionLifecycleService } from './executionLifecycleService.js';
import type { AnsiOutput, AnsiToken } from '../utils/terminalSerializer.js';
// Hoisted Mocks
@@ -166,6 +167,7 @@ describe('ShellExecutionService', () => {
beforeEach(() => {
vi.clearAllMocks();
ExecutionLifecycleService.resetForTest();
mockSerializeTerminalToObject.mockReturnValue([]);
mockIsBinary.mockReturnValue(false);
mockPlatform.mockReturnValue('linux');
@@ -432,8 +434,12 @@ describe('ShellExecutionService', () => {
});
describe('pty interaction', () => {
let activePtysGetSpy: { mockRestore: () => void };
beforeEach(() => {
vi.spyOn(ShellExecutionService['activePtys'], 'get').mockReturnValue({
activePtysGetSpy = vi
.spyOn(ShellExecutionService['activePtys'], 'get')
.mockReturnValue({
// eslint-disable-next-line @typescript-eslint/no-explicit-any
ptyProcess: mockPtyProcess as any,
// eslint-disable-next-line @typescript-eslint/no-explicit-any
@@ -441,6 +447,10 @@ describe('ShellExecutionService', () => {
});
});
afterEach(() => {
activePtysGetSpy.mockRestore();
});
it('should write to the pty and trigger a render', async () => {
vi.useFakeTimers();
await simulateExecution('interactive-app', (pty) => {
@@ -6,8 +6,9 @@
import stripAnsi from 'strip-ansi';
import { getPty, type PtyImplementation } from '../utils/getPty.js';
import { spawn as cpSpawn, type ChildProcess } from 'node:child_process';
import { spawn as cpSpawn } from 'node:child_process';
import { TextDecoder } from 'node:util';
import type { Writable } from 'node:stream';
import os from 'node:os';
import type { IPty } from '@lydell/node-pty';
import { getCachedEncodingForBuffer } from '../utils/systemEncoding.js';
@@ -27,6 +28,12 @@ import {
type EnvironmentSanitizationConfig,
} from './environmentSanitization.js';
import { killProcessGroup } from '../utils/process-utils.js';
import {
ExecutionLifecycleService,
type ExecutionHandle,
type ExecutionOutputEvent,
type ExecutionResult,
} from './executionLifecycleService.js';
const { Terminal } = pkg;
const MAX_CHILD_PROCESS_BUFFER_SIZE = 16 * 1024 * 1024; // 16MB
@@ -65,34 +72,10 @@ function ensurePromptvarsDisabled(command: string, shell: ShellType): string {
}
/** A structured result from a shell command execution. */
export interface ShellExecutionResult {
/** The raw, unprocessed output buffer. */
rawOutput: Buffer;
/** The combined, decoded output as a string. */
output: string;
/** The process exit code, or null if terminated by a signal. */
exitCode: number | null;
/** The signal that terminated the process, if any. */
signal: number | null;
/** An error object if the process failed to spawn. */
error: Error | null;
/** A boolean indicating if the command was aborted by the user. */
aborted: boolean;
/** The process ID of the spawned shell. */
pid: number | undefined;
/** The method used to execute the shell command. */
executionMethod: 'lydell-node-pty' | 'node-pty' | 'child_process' | 'none';
/** Whether the command was moved to the background. */
backgrounded?: boolean;
}
export type ShellExecutionResult = ExecutionResult;
/** A handle for an ongoing shell execution. */
export interface ShellExecutionHandle {
/** The process ID of the spawned shell. */
pid: number | undefined;
/** A promise that resolves with the complete execution result. */
result: Promise<ShellExecutionResult>;
}
export type ShellExecutionHandle = ExecutionHandle;
export interface ShellExecutionConfig {
terminalWidth?: number;
@@ -111,31 +94,7 @@ export interface ShellExecutionConfig {
/**
* Describes a structured event emitted during shell command execution.
*/
export type ShellOutputEvent =
| {
/** The event contains a chunk of output data. */
type: 'data';
/** The decoded string chunk. */
chunk: string | AnsiOutput;
}
| {
/** Signals that the output stream has been identified as binary. */
type: 'binary_detected';
}
| {
/** Provides progress updates for a binary stream. */
type: 'binary_progress';
/** The total number of bytes received so far. */
bytesReceived: number;
}
| {
/** Signals that the process has exited. */
type: 'exit';
/** The exit code of the process, if any. */
exitCode: number | null;
/** The signal that terminated the process, if any. */
signal: number | null;
};
export type ShellOutputEvent = ExecutionOutputEvent;
interface ActivePty {
ptyProcess: IPty;
@@ -143,15 +102,6 @@ interface ActivePty {
maxSerializedLines?: number;
}
interface ActiveChildProcess {
process: ChildProcess;
state: {
output: string;
truncated: boolean;
outputChunks: Buffer[];
};
}
const getFullBufferText = (terminal: pkg.Terminal): string => {
const buffer = terminal.buffer.active;
const lines: string[] = [];
@@ -197,19 +147,6 @@ const getFullBufferText = (terminal: pkg.Terminal): string => {
export class ShellExecutionService {
private static activePtys = new Map<number, ActivePty>();
private static activeChildProcesses = new Map<number, ActiveChildProcess>();
private static exitedPtyInfo = new Map<
number,
{ exitCode: number; signal?: number }
>();
private static activeResolvers = new Map<
number,
(res: ShellExecutionResult) => void
>();
private static activeListeners = new Map<
number,
Set<(event: ShellOutputEvent) => void>
>();
/**
* Executes a shell command using `node-pty`, capturing all output and lifecycle events.
*
@@ -285,13 +222,6 @@ export class ShellExecutionService {
return { newBuffer: truncatedBuffer + chunk, truncated: true };
}
private static emitEvent(pid: number, event: ShellOutputEvent): void {
const listeners = this.activeListeners.get(pid);
if (listeners) {
listeners.forEach((listener) => listener(event));
}
}
private static childProcessFallback(
commandToExecute: string,
cwd: string,
@@ -327,17 +257,43 @@ export class ShellExecutionService {
outputChunks: [] as Buffer[],
};
if (child.pid) {
this.activeChildProcesses.set(child.pid, {
process: child,
state,
});
const lifecycleHandle = child.pid
? ExecutionLifecycleService.attachExecution(child.pid, {
executionMethod: 'child_process',
getBackgroundOutput: () => state.output,
getSubscriptionSnapshot: () => state.output || undefined,
writeInput: (input) => {
const stdin = child.stdin as Writable | null;
if (stdin) {
stdin.write(input);
}
},
kill: () => {
if (child.pid) {
killProcessGroup({ pid: child.pid }).catch(() => {});
}
},
isActive: () => {
if (!child.pid) {
return false;
}
try {
return process.kill(child.pid, 0);
} catch {
return false;
}
},
})
: undefined;
const result = new Promise<ShellExecutionResult>((resolve) => {
if (child.pid) {
this.activeResolvers.set(child.pid, resolve);
}
let resolveWithoutPid:
| ((result: ShellExecutionResult) => void)
| undefined;
const result =
lifecycleHandle?.result ??
new Promise<ShellExecutionResult>((resolve) => {
resolveWithoutPid = resolve;
});
let stdoutDecoder: TextDecoder | null = null;
let stderrDecoder: TextDecoder | null = null;
@@ -370,7 +326,9 @@ export class ShellExecutionService {
isStreamingRawContent = false;
const event: ShellOutputEvent = { type: 'binary_detected' };
onOutputEvent(event);
if (child.pid) ShellExecutionService.emitEvent(child.pid, event);
if (child.pid) {
ExecutionLifecycleService.emitEvent(child.pid, event);
}
}
}
@@ -394,7 +352,9 @@ export class ShellExecutionService {
chunk: decodedChunk,
};
onOutputEvent(event);
if (child.pid) ShellExecutionService.emitEvent(child.pid, event);
if (child.pid) {
ExecutionLifecycleService.emitEvent(child.pid, event);
}
}
} else {
const totalBytes = state.outputChunks.reduce(
@@ -406,7 +366,9 @@ export class ShellExecutionService {
bytesReceived: totalBytes,
};
onOutputEvent(event);
if (child.pid) ShellExecutionService.emitEvent(child.pid, event);
if (child.pid) {
ExecutionLifecycleService.emitEvent(child.pid, event);
}
}
};
@@ -417,7 +379,6 @@ export class ShellExecutionService {
const { finalBuffer } = cleanup();
let combinedOutput = state.output;
if (state.truncated) {
const truncationMessage = `\n[GEMINI_CLI_WARNING: Output truncated. The buffer is limited to ${
MAX_CHILD_PROCESS_BUFFER_SIZE / (1024 * 1024)
@@ -429,21 +390,7 @@ export class ShellExecutionService {
const exitCode = code;
const exitSignal = signal ? os.constants.signals[signal] : null;
if (child.pid) {
const event: ShellOutputEvent = {
type: 'exit',
exitCode,
signal: exitSignal,
};
onOutputEvent(event);
ShellExecutionService.emitEvent(child.pid, event);
this.activeChildProcesses.delete(child.pid);
this.activeResolvers.delete(child.pid);
this.activeListeners.delete(child.pid);
}
resolve({
const resultPayload: ShellExecutionResult = {
rawOutput: finalBuffer,
output: finalStrippedOutput,
exitCode,
@@ -452,7 +399,22 @@ export class ShellExecutionService {
aborted: abortSignal.aborted,
pid: child.pid,
executionMethod: 'child_process',
});
};
if (child.pid) {
const event: ShellOutputEvent = {
type: 'exit',
exitCode,
signal: exitSignal,
};
onOutputEvent(event);
ExecutionLifecycleService.completeWithResult(
child.pid,
resultPayload,
);
} else {
resolveWithoutPid?.(resultPayload);
}
};
child.stdout.on('data', (data) => handleOutput(data, 'stdout'));
@@ -485,16 +447,15 @@ export class ShellExecutionService {
const remaining = stdoutDecoder.decode();
if (remaining) {
state.output += remaining;
// If there's remaining output, we should technically emit it too,
// but it's rare to have partial utf8 chars at the very end of stream.
if (isStreamingRawContent && remaining) {
if (isStreamingRawContent) {
const event: ShellOutputEvent = {
type: 'data',
chunk: remaining,
};
onOutputEvent(event);
if (child.pid)
ShellExecutionService.emitEvent(child.pid, event);
if (child.pid) {
ExecutionLifecycleService.emitEvent(child.pid, event);
}
}
}
}
@@ -502,23 +463,22 @@ export class ShellExecutionService {
const remaining = stderrDecoder.decode();
if (remaining) {
state.output += remaining;
if (isStreamingRawContent && remaining) {
if (isStreamingRawContent) {
const event: ShellOutputEvent = {
type: 'data',
chunk: remaining,
};
onOutputEvent(event);
if (child.pid)
ShellExecutionService.emitEvent(child.pid, event);
if (child.pid) {
ExecutionLifecycleService.emitEvent(child.pid, event);
}
}
}
}
const finalBuffer = Buffer.concat(state.outputChunks);
return { finalBuffer };
}
});
return { pid: child.pid, result };
} catch (e) {
@@ -585,9 +545,7 @@ export class ShellExecutionService {
},
handleFlowControl: true,
});
const result = new Promise<ShellExecutionResult>((resolve) => {
this.activeResolvers.set(ptyProcess.pid, resolve);
const ptyPid = Number(ptyProcess.pid);
const headlessTerminal = new Terminal({
allowProposedApi: true,
@@ -597,13 +555,52 @@ export class ShellExecutionService {
});
headlessTerminal.scrollToTop();
this.activePtys.set(ptyProcess.pid, {
this.activePtys.set(ptyPid, {
// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment
ptyProcess,
headlessTerminal,
maxSerializedLines: shellExecutionConfig.maxSerializedLines,
});
const result = ExecutionLifecycleService.attachExecution(ptyPid, {
executionMethod: ptyInfo?.name ?? 'node-pty',
writeInput: (input) => {
if (!ExecutionLifecycleService.isActive(ptyPid)) {
return;
}
ptyProcess.write(input);
},
kill: () => {
killProcessGroup({
pid: ptyPid,
// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment
pty: ptyProcess,
}).catch(() => {});
this.activePtys.delete(ptyPid);
},
isActive: () => {
try {
return process.kill(ptyPid, 0);
} catch {
return false;
}
},
getBackgroundOutput: () => getFullBufferText(headlessTerminal),
getSubscriptionSnapshot: () => {
const endLine = headlessTerminal.buffer.active.length;
const startLine = Math.max(
0,
endLine - (shellExecutionConfig.maxSerializedLines ?? 2000),
);
const bufferData = serializeTerminalToObject(
headlessTerminal,
startLine,
endLine,
);
return bufferData.length > 0 ? bufferData : undefined;
},
}).result;
let processingChain = Promise.resolve();
let decoder: TextDecoder | null = null;
let output: string | AnsiOutput | null = null;
@@ -684,7 +681,6 @@ export class ShellExecutionService {
}
const trimmedOutput = newOutput.slice(0, lastNonEmptyLine + 1);
const finalOutput = shellExecutionConfig.disableDynamicLineTrimming
? newOutput
: trimmedOutput;
@@ -696,7 +692,7 @@ export class ShellExecutionService {
chunk: finalOutput,
};
onOutputEvent(event);
ShellExecutionService.emitEvent(ptyProcess.pid, event);
ExecutionLifecycleService.emitEvent(ptyPid, event);
}
};
@@ -728,7 +724,7 @@ export class ShellExecutionService {
const handleOutput = (data: Buffer) => {
processingChain = processingChain.then(
() =>
new Promise<void>((resolve) => {
new Promise<void>((resolveChunk) => {
if (!decoder) {
const encoding = getCachedEncodingForBuffer(data);
try {
@@ -748,21 +744,21 @@ export class ShellExecutionService {
isStreamingRawContent = false;
const event: ShellOutputEvent = { type: 'binary_detected' };
onOutputEvent(event);
ShellExecutionService.emitEvent(ptyProcess.pid, event);
ExecutionLifecycleService.emitEvent(ptyPid, event);
}
}
if (isStreamingRawContent) {
const decodedChunk = decoder.decode(data, { stream: true });
if (decodedChunk.length === 0) {
resolve();
resolveChunk();
return;
}
isWriting = true;
headlessTerminal.write(decodedChunk, () => {
render();
isWriting = false;
resolve();
resolveChunk();
});
} else {
const totalBytes = outputChunks.reduce(
@@ -774,8 +770,8 @@ export class ShellExecutionService {
bytesReceived: totalBytes,
};
onOutputEvent(event);
ShellExecutionService.emitEvent(ptyProcess.pid, event);
resolve();
ExecutionLifecycleService.emitEvent(ptyPid, event);
resolveChunk();
}
}),
);
@@ -790,8 +786,7 @@ export class ShellExecutionService {
({ exitCode, signal }: { exitCode: number; signal?: number }) => {
exited = true;
abortSignal.removeEventListener('abort', abortHandler);
this.activePtys.delete(ptyProcess.pid);
// Attempt to destroy the PTY to ensure FD is closed
this.activePtys.delete(ptyPid);
try {
// eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion
(ptyProcess as IPty & { destroy?: () => void }).destroy?.();
@@ -801,18 +796,7 @@ export class ShellExecutionService {
const finalize = () => {
render(true);
// Store exit info for late subscribers (e.g. backgrounding race condition)
this.exitedPtyInfo.set(ptyProcess.pid, { exitCode, signal });
setTimeout(
() => {
this.exitedPtyInfo.delete(ptyProcess.pid);
},
5 * 60 * 1000,
).unref();
this.activePtys.delete(ptyProcess.pid);
this.activeResolvers.delete(ptyProcess.pid);
this.activePtys.delete(ptyPid);
const event: ShellOutputEvent = {
type: 'exit',
@@ -820,20 +804,15 @@ export class ShellExecutionService {
signal: signal ?? null,
};
onOutputEvent(event);
ShellExecutionService.emitEvent(ptyProcess.pid, event);
this.activeListeners.delete(ptyProcess.pid);
const finalBuffer = Buffer.concat(outputChunks);
resolve({
rawOutput: finalBuffer,
ExecutionLifecycleService.completeWithResult(ptyPid, {
rawOutput: Buffer.concat(outputChunks),
output: getFullBufferText(headlessTerminal),
exitCode,
signal: signal ?? null,
error,
aborted: abortSignal.aborted,
// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment
pid: ptyProcess.pid,
pid: ptyPid,
executionMethod: ptyInfo?.name ?? 'node-pty',
});
};
@@ -864,8 +843,7 @@ export class ShellExecutionService {
const abortHandler = async () => {
if (ptyProcess.pid && !exited) {
await killProcessGroup({
// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment
pid: ptyProcess.pid,
pid: ptyPid,
escalate: true,
isExited: () => exited,
// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment
@@ -875,10 +853,8 @@ export class ShellExecutionService {
};
abortSignal.addEventListener('abort', abortHandler, { once: true });
});
// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment
return { pid: ptyProcess.pid, result };
return { pid: ptyPid, result };
} catch (e) {
// eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion
const error = e as Error;
@@ -914,40 +890,11 @@ export class ShellExecutionService {
* @param input The string to write to the terminal.
*/
static writeToPty(pid: number, input: string): void {
if (this.activeChildProcesses.has(pid)) {
const activeChild = this.activeChildProcesses.get(pid);
if (activeChild) {
activeChild.process.stdin?.write(input);
}
return;
}
if (!this.isPtyActive(pid)) {
return;
}
const activePty = this.activePtys.get(pid);
if (activePty) {
activePty.ptyProcess.write(input);
}
ExecutionLifecycleService.writeInput(pid, input);
}
static isPtyActive(pid: number): boolean {
if (this.activeChildProcesses.has(pid)) {
try {
return process.kill(pid, 0);
} catch {
return false;
}
}
try {
// process.kill with signal 0 is a way to check for the existence of a process.
// It doesn't actually send a signal.
return process.kill(pid, 0);
} catch (_) {
return false;
}
return ExecutionLifecycleService.isActive(pid);
}
/**
@@ -962,36 +909,7 @@ export class ShellExecutionService {
pid: number,
callback: (exitCode: number, signal?: number) => void,
): () => void {
const activePty = this.activePtys.get(pid);
if (activePty) {
const disposable = activePty.ptyProcess.onExit(
({ exitCode, signal }: { exitCode: number; signal?: number }) => {
callback(exitCode, signal);
disposable.dispose();
},
);
return () => disposable.dispose();
} else if (this.activeChildProcesses.has(pid)) {
const activeChild = this.activeChildProcesses.get(pid);
const listener = (code: number | null, signal: NodeJS.Signals | null) => {
let signalNumber: number | undefined;
if (signal) {
signalNumber = os.constants.signals[signal];
}
callback(code ?? 0, signalNumber);
};
activeChild?.process.on('exit', listener);
return () => {
activeChild?.process.removeListener('exit', listener);
};
} else {
// Check if it already exited recently
const exitedInfo = this.exitedPtyInfo.get(pid);
if (exitedInfo) {
callback(exitedInfo.exitCode, exitedInfo.signal);
}
return () => {};
}
return ExecutionLifecycleService.onExit(pid, callback);
}
/**
@@ -1000,19 +918,8 @@ export class ShellExecutionService {
* @param pid The process ID to kill.
*/
static kill(pid: number): void {
const activePty = this.activePtys.get(pid);
const activeChild = this.activeChildProcesses.get(pid);
if (activeChild) {
killProcessGroup({ pid }).catch(() => {});
this.activeChildProcesses.delete(pid);
} else if (activePty) {
killProcessGroup({ pid, pty: activePty.ptyProcess }).catch(() => {});
this.activePtys.delete(pid);
}
this.activeResolvers.delete(pid);
this.activeListeners.delete(pid);
ExecutionLifecycleService.kill(pid);
}
/**
@@ -1022,88 +929,14 @@ export class ShellExecutionService {
* @param pid The process ID of the target PTY.
*/
static background(pid: number): void {
const resolve = this.activeResolvers.get(pid);
if (resolve) {
let output = '';
const rawOutput = Buffer.from('');
const activePty = this.activePtys.get(pid);
const activeChild = this.activeChildProcesses.get(pid);
if (activePty) {
output = getFullBufferText(activePty.headlessTerminal);
resolve({
rawOutput,
output,
exitCode: null,
signal: null,
error: null,
aborted: false,
pid,
executionMethod: 'node-pty',
backgrounded: true,
});
} else if (activeChild) {
output = activeChild.state.output;
resolve({
rawOutput,
output,
exitCode: null,
signal: null,
error: null,
aborted: false,
pid,
executionMethod: 'child_process',
backgrounded: true,
});
}
this.activeResolvers.delete(pid);
}
ExecutionLifecycleService.background(pid);
}
static subscribe(
pid: number,
listener: (event: ShellOutputEvent) => void,
): () => void {
if (!this.activeListeners.has(pid)) {
this.activeListeners.set(pid, new Set());
}
this.activeListeners.get(pid)?.add(listener);
// Send current buffer content immediately
const activePty = this.activePtys.get(pid);
const activeChild = this.activeChildProcesses.get(pid);
if (activePty) {
// Use serializeTerminalToObject to preserve colors and structure
const endLine = activePty.headlessTerminal.buffer.active.length;
const startLine = Math.max(
0,
endLine - (activePty.maxSerializedLines ?? 2000),
);
const bufferData = serializeTerminalToObject(
activePty.headlessTerminal,
startLine,
endLine,
);
if (bufferData && bufferData.length > 0) {
listener({ type: 'data', chunk: bufferData });
}
} else if (activeChild) {
const output = activeChild.state.output;
if (output) {
listener({ type: 'data', chunk: output });
}
}
return () => {
this.activeListeners.get(pid)?.delete(listener);
if (this.activeListeners.get(pid)?.size === 0) {
this.activeListeners.delete(pid);
}
};
return ExecutionLifecycleService.subscribe(pid, listener);
}
/**
@@ -1156,10 +989,7 @@ export class ShellExecutionService {
endLine,
);
const event: ShellOutputEvent = { type: 'data', chunk: bufferData };
const listeners = ShellExecutionService.activeListeners.get(pid);
if (listeners) {
listeners.forEach((listener) => listener(event));
}
ExecutionLifecycleService.emitEvent(pid, event);
}
}
+5 -4
View File
@@ -18,6 +18,7 @@ import {
Kind,
type ToolInvocation,
type ToolResult,
type BackgroundExecutionData,
type ToolCallConfirmationDetails,
type ToolExecuteConfirmationDetails,
type PolicyUpdateOptions,
@@ -150,7 +151,7 @@ export class ShellToolInvocation extends BaseToolInvocation<
signal: AbortSignal,
updateOutput?: (output: ToolLiveOutput) => void,
shellExecutionConfig?: ShellExecutionConfig,
setPidCallback?: (pid: number) => void,
setExecutionIdCallback?: (executionId: number) => void,
): Promise<ToolResult> {
const strippedCommand = stripShellWrapper(this.params.command);
@@ -281,8 +282,8 @@ export class ShellToolInvocation extends BaseToolInvocation<
);
if (pid) {
if (setPidCallback) {
setPidCallback(pid);
if (setExecutionIdCallback) {
setExecutionIdCallback(pid);
}
// If the model requested to run in the background, do so after a short delay.
@@ -324,7 +325,7 @@ export class ShellToolInvocation extends BaseToolInvocation<
}
}
let data: Record<string, unknown> | undefined;
let data: BackgroundExecutionData | undefined;
let llmContent = '';
let timeoutMessage = '';
+36
View File
@@ -61,15 +61,51 @@ export interface ToolInvocation<
* Executes the tool with the validated parameters.
* @param signal AbortSignal for tool cancellation.
* @param updateOutput Optional callback to stream output.
* @param setExecutionIdCallback Optional callback for tools that expose a background execution handle.
* @returns Result of the tool execution.
*/
execute(
signal: AbortSignal,
updateOutput?: (output: ToolLiveOutput) => void,
shellExecutionConfig?: ShellExecutionConfig,
setExecutionIdCallback?: (executionId: number) => void,
): Promise<TResult>;
}
/**
* Structured payload used by tools to surface background execution metadata to
* the CLI UI.
*
* NOTE: `pid` is used as the canonical identifier for now to stay consistent
* with existing types (ExecutingToolCall.pid, ExecutionHandle.pid, etc.).
* A future rename to `executionId` is planned once the codebase is fully
* migrated not done in this PR to keep the diff focused on the abstraction.
*/
export interface BackgroundExecutionData extends Record<string, unknown> {
pid?: number;
command?: string;
initialOutput?: string;
}
export function isBackgroundExecutionData(
data: unknown,
): data is BackgroundExecutionData {
if (typeof data !== 'object' || data === null) {
return false;
}
const pid = 'pid' in data ? data.pid : undefined;
const command = 'command' in data ? data.command : undefined;
const initialOutput =
'initialOutput' in data ? data.initialOutput : undefined;
return (
(pid === undefined || typeof pid === 'number') &&
(command === undefined || typeof command === 'string') &&
(initialOutput === undefined || typeof initialOutput === 'string')
);
}
/**
* Options for policy updates that can be customized by tool invocations.
*/