From a367a1724cf06e92af14c1d4be271be2ea077808 Mon Sep 17 00:00:00 2001 From: Adam Weidman Date: Mon, 9 Mar 2026 12:20:04 -0400 Subject: [PATCH] feat(core): extract execution lifecycle facade Extracts ExecutionLifecycleService to decouple background task tracking from ShellExecutionService, enabling Ctrl+B backgrounding for any tool (including remote agents). --- .../cli/src/ui/hooks/shellCommandProcessor.ts | 10 +- .../cli/src/ui/hooks/useGeminiStream.test.tsx | 49 + packages/cli/src/ui/hooks/useGeminiStream.ts | 83 +- .../src/core/coreToolHookTriggers.test.ts | 47 + .../core/src/core/coreToolHookTriggers.ts | 33 +- .../core/src/scheduler/tool-executor.test.ts | 67 +- packages/core/src/scheduler/tool-executor.ts | 59 +- .../executionLifecycleService.test.ts | 298 +++++ .../src/services/executionLifecycleService.ts | 454 +++++++ .../services/shellExecutionService.test.ts | 22 +- .../src/services/shellExecutionService.ts | 1182 +++++++---------- packages/core/src/tools/shell.ts | 9 +- packages/core/src/tools/tools.ts | 36 + 13 files changed, 1556 insertions(+), 793 deletions(-) create mode 100644 packages/core/src/services/executionLifecycleService.test.ts create mode 100644 packages/core/src/services/executionLifecycleService.ts diff --git a/packages/cli/src/ui/hooks/shellCommandProcessor.ts b/packages/cli/src/ui/hooks/shellCommandProcessor.ts index 364b395876..3e4a6d54b1 100644 --- a/packages/cli/src/ui/hooks/shellCommandProcessor.ts +++ b/packages/cli/src/ui/hooks/shellCommandProcessor.ts @@ -80,7 +80,7 @@ export const useShellCommandProcessor = ( setShellInputFocused: (value: boolean) => void, terminalWidth?: number, terminalHeight?: number, - activeToolPtyId?: number, + activeBackgroundExecutionId?: number, isWaitingForConfirmation?: boolean, ) => { const [state, dispatch] = useReducer(shellReducer, initialState); @@ -103,7 +103,8 @@ export const useShellCommandProcessor = ( } const m = manager.current; - const activePtyId = state.activeShellPtyId || activeToolPtyId; + const activePtyId = + state.activeShellPtyId ?? activeBackgroundExecutionId ?? undefined; useEffect(() => { const isForegroundActive = !!activePtyId || !!isWaitingForConfirmation; @@ -191,7 +192,8 @@ export const useShellCommandProcessor = ( ]); const backgroundCurrentShell = useCallback(() => { - const pidToBackground = state.activeShellPtyId || activeToolPtyId; + const pidToBackground = + state.activeShellPtyId ?? activeBackgroundExecutionId; if (pidToBackground) { ShellExecutionService.background(pidToBackground); m.backgroundedPids.add(pidToBackground); @@ -202,7 +204,7 @@ export const useShellCommandProcessor = ( m.restoreTimeout = null; } } - }, [state.activeShellPtyId, activeToolPtyId, m]); + }, [state.activeShellPtyId, activeBackgroundExecutionId, m]); const dismissBackgroundShell = useCallback( (pid: number) => { diff --git a/packages/cli/src/ui/hooks/useGeminiStream.test.tsx b/packages/cli/src/ui/hooks/useGeminiStream.test.tsx index 1f2ef5f90c..6c2374934d 100644 --- a/packages/cli/src/ui/hooks/useGeminiStream.test.tsx +++ b/packages/cli/src/ui/hooks/useGeminiStream.test.tsx @@ -96,6 +96,25 @@ const MockedUserPromptEvent = vi.hoisted(() => vi.fn().mockImplementation(() => {}), ); const mockParseAndFormatApiError = vi.hoisted(() => vi.fn()); +const mockIsBackgroundExecutionData = vi.hoisted( + () => + (data: unknown): data is { pid?: number } => { + if (typeof data !== 'object' || data === null) { + return false; + } + const value = data as { + pid?: unknown; + command?: unknown; + initialOutput?: unknown; + }; + return ( + (value.pid === undefined || typeof value.pid === 'number') && + (value.command === undefined || typeof value.command === 'string') && + (value.initialOutput === undefined || + typeof value.initialOutput === 'string') + ); + }, +); const MockValidationRequiredError = vi.hoisted( () => @@ -121,6 +140,7 @@ vi.mock('@google/gemini-cli-core', async (importOriginal) => { const actualCoreModule = (await importOriginal()) as any; return { ...actualCoreModule, + isBackgroundExecutionData: mockIsBackgroundExecutionData, GitService: vi.fn(), GeminiClient: MockedGeminiClientClass, UserPromptEvent: MockedUserPromptEvent, @@ -599,6 +619,35 @@ describe('useGeminiStream', () => { expect(mockSendMessageStream).not.toHaveBeenCalled(); // submitQuery uses this }); + it('should expose activePtyId for non-shell executing tools that report an execution ID', () => { + const remoteExecutingTool: TrackedExecutingToolCall = { + request: { + callId: 'remote-call-1', + name: 'remote_agent_call', + args: {}, + isClientInitiated: false, + prompt_id: 'prompt-id-remote', + }, + status: CoreToolCallStatus.Executing, + responseSubmittedToGemini: false, + tool: { + name: 'remote_agent_call', + displayName: 'Remote Agent', + description: 'Remote agent execution', + build: vi.fn(), + } as any, + invocation: { + getDescription: () => 'Calling remote agent', + } as unknown as AnyToolInvocation, + startTime: Date.now(), + liveOutput: 'working...', + pid: 4242, + }; + + const { result } = renderTestHook([remoteExecutingTool]); + expect(result.current.activePtyId).toBe(4242); + }); + it('should submit tool responses when all tool calls are completed and ready', async () => { const toolCall1ResponseParts: Part[] = [{ text: 'tool 1 final response' }]; const toolCall2ResponseParts: Part[] = [{ text: 'tool 2 final response' }]; diff --git a/packages/cli/src/ui/hooks/useGeminiStream.ts b/packages/cli/src/ui/hooks/useGeminiStream.ts index d254902a94..3aee149fb9 100644 --- a/packages/cli/src/ui/hooks/useGeminiStream.ts +++ b/packages/cli/src/ui/hooks/useGeminiStream.ts @@ -37,6 +37,7 @@ import { buildUserSteeringHintPrompt, GeminiCliOperation, getPlanModeExitMessage, + isBackgroundExecutionData, } from '@google/gemini-cli-core'; import type { Config, @@ -94,10 +95,10 @@ type ToolResponseWithParts = ToolCallResponseInfo & { llmContent?: PartListUnion; }; -interface ShellToolData { - pid?: number; - command?: string; - initialOutput?: string; +interface BackgroundedToolInfo { + pid: number; + command: string; + initialOutput: string; } enum StreamProcessingStatus { @@ -111,15 +112,32 @@ const SUPPRESSED_TOOL_ERRORS_NOTE = const LOW_VERBOSITY_FAILURE_NOTE = 'This request failed. Press F12 for diagnostics, or run /settings and change "Error Verbosity" to full for full details.'; -function isShellToolData(data: unknown): data is ShellToolData { - if (typeof data !== 'object' || data === null) { - return false; +function getBackgroundedToolInfo( + toolCall: TrackedCompletedToolCall | TrackedCancelledToolCall, +): BackgroundedToolInfo | undefined { + const response = toolCall.response as ToolResponseWithParts; + const rawData: unknown = response?.data; + if (!isBackgroundExecutionData(rawData)) { + return undefined; } - const d = data as Partial; + + if (rawData.pid === undefined) { + return undefined; + } + + return { + pid: rawData.pid, + command: rawData.command ?? toolCall.request.name, + initialOutput: rawData.initialOutput ?? '', + }; +} + +function isBackgroundableExecutingToolCall( + toolCall: TrackedToolCall, +): toolCall is TrackedExecutingToolCall { return ( - (d.pid === undefined || typeof d.pid === 'number') && - (d.command === undefined || typeof d.command === 'string') && - (d.initialOutput === undefined || typeof d.initialOutput === 'string') + toolCall.status === CoreToolCallStatus.Executing && + typeof toolCall.pid === 'number' ); } @@ -311,13 +329,11 @@ export const useGeminiStream = ( getPreferredEditor, ); - const activeToolPtyId = useMemo(() => { - const executingShellTool = toolCalls.find( - (tc) => - tc.status === 'executing' && tc.request.name === 'run_shell_command', + const activeBackgroundExecutionId = useMemo(() => { + const executingBackgroundableTool = toolCalls.find( + isBackgroundableExecutingToolCall, ); - // eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion - return (executingShellTool as TrackedExecutingToolCall | undefined)?.pid; + return executingBackgroundableTool?.pid; }, [toolCalls]); const onExec = useCallback(async (done: Promise) => { @@ -347,7 +363,7 @@ export const useGeminiStream = ( setShellInputFocused, terminalWidth, terminalHeight, - activeToolPtyId, + activeBackgroundExecutionId, ); const streamingState = useMemo( @@ -525,7 +541,8 @@ export const useGeminiStream = ( onComplete: (result: { userSelection: 'disable' | 'keep' }) => void; } | null>(null); - const activePtyId = activeShellPtyId || activeToolPtyId; + const activePtyId = + activeShellPtyId ?? activeBackgroundExecutionId ?? undefined; const prevActiveShellPtyIdRef = useRef(null); useEffect(() => { @@ -1651,26 +1668,16 @@ export const useGeminiStream = ( !processedMemoryToolsRef.current.has(t.request.callId), ); - // Handle backgrounded shell tools - completedAndReadyToSubmitTools.forEach((t) => { - const isShell = t.request.name === 'run_shell_command'; - // Access result from the tracked tool call response - const response = t.response as ToolResponseWithParts; - const rawData = response?.data; - const data = isShellToolData(rawData) ? rawData : undefined; - - // Use data.pid for shell commands moved to the background. - const pid = data?.pid; - - if (isShell && pid) { - // eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion - const command = (data?.['command'] as string) ?? 'shell'; - // eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion - const initialOutput = (data?.['initialOutput'] as string) ?? ''; - - registerBackgroundShell(pid, command, initialOutput); + for (const toolCall of completedAndReadyToSubmitTools) { + const backgroundedTool = getBackgroundedToolInfo(toolCall); + if (backgroundedTool) { + registerBackgroundShell( + backgroundedTool.pid, + backgroundedTool.command, + backgroundedTool.initialOutput, + ); } - }); + } if (newSuccessfulMemorySaves.length > 0) { // Perform the refresh only if there are new ones. diff --git a/packages/core/src/core/coreToolHookTriggers.test.ts b/packages/core/src/core/coreToolHookTriggers.test.ts index 2a654042c6..ff9601fc33 100644 --- a/packages/core/src/core/coreToolHookTriggers.test.ts +++ b/packages/core/src/core/coreToolHookTriggers.test.ts @@ -11,6 +11,7 @@ import { BaseToolInvocation, type ToolResult, type AnyDeclarativeTool, + type ToolLiveOutput, } from '../tools/tools.js'; import type { MessageBus } from '../confirmation-bus/message-bus.js'; import type { HookSystem } from '../hooks/hookSystem.js'; @@ -37,6 +38,30 @@ class MockInvocation extends BaseToolInvocation<{ key?: string }, ToolResult> { } } +class MockBackgroundableInvocation extends BaseToolInvocation< + { key?: string }, + ToolResult +> { + constructor(params: { key?: string }, messageBus: MessageBus) { + super(params, messageBus); + } + getDescription() { + return 'mock-pid'; + } + async execute( + _signal: AbortSignal, + _updateOutput?: (output: ToolLiveOutput) => void, + _shellExecutionConfig?: unknown, + setExecutionIdCallback?: (executionId: number) => void, + ) { + setExecutionIdCallback?.(4242); + return { + llmContent: 'pid', + returnDisplay: 'pid', + }; + } +} + describe('executeToolWithHooks', () => { let messageBus: MessageBus; let mockTool: AnyDeclarativeTool; @@ -258,4 +283,26 @@ describe('executeToolWithHooks', () => { expect(invocation.params.key).toBe('original'); expect(mockTool.build).not.toHaveBeenCalled(); }); + + it('should pass execution ID callback through for non-shell invocations', async () => { + const invocation = new MockBackgroundableInvocation({}, messageBus); + const abortSignal = new AbortController().signal; + const setExecutionIdCallback = vi.fn(); + + vi.mocked(mockHookSystem.fireBeforeToolEvent).mockResolvedValue(undefined); + vi.mocked(mockHookSystem.fireAfterToolEvent).mockResolvedValue(undefined); + + await executeToolWithHooks( + invocation, + 'test_tool', + abortSignal, + mockTool, + undefined, + undefined, + setExecutionIdCallback, + mockConfig, + ); + + expect(setExecutionIdCallback).toHaveBeenCalledWith(4242); + }); }); diff --git a/packages/core/src/core/coreToolHookTriggers.ts b/packages/core/src/core/coreToolHookTriggers.ts index cbd90e8039..464cfc5f04 100644 --- a/packages/core/src/core/coreToolHookTriggers.ts +++ b/packages/core/src/core/coreToolHookTriggers.ts @@ -15,7 +15,6 @@ import type { import { ToolErrorType } from '../tools/tool-error.js'; import { debugLogger } from '../utils/debugLogger.js'; import type { ShellExecutionConfig } from '../index.js'; -import { ShellToolInvocation } from '../tools/shell.js'; import { DiscoveredMCPToolInvocation } from '../tools/mcp-tool.js'; /** @@ -26,7 +25,7 @@ import { DiscoveredMCPToolInvocation } from '../tools/mcp-tool.js'; * @returns MCP context if this is an MCP tool, undefined otherwise */ function extractMcpContext( - invocation: ShellToolInvocation | AnyToolInvocation, + invocation: AnyToolInvocation, config: Config, ): McpToolContext | undefined { if (!(invocation instanceof DiscoveredMCPToolInvocation)) { @@ -63,18 +62,18 @@ function extractMcpContext( * @param signal Abort signal for cancellation * @param liveOutputCallback Optional callback for live output updates * @param shellExecutionConfig Optional shell execution config - * @param setPidCallback Optional callback to set the PID for shell invocations + * @param setExecutionIdCallback Optional callback to set an execution ID for backgroundable invocations * @param config Config to look up MCP server details for hook context * @returns The tool result */ export async function executeToolWithHooks( - invocation: ShellToolInvocation | AnyToolInvocation, + invocation: AnyToolInvocation, toolName: string, signal: AbortSignal, tool: AnyDeclarativeTool, liveOutputCallback?: (outputChunk: ToolLiveOutput) => void, shellExecutionConfig?: ShellExecutionConfig, - setPidCallback?: (pid: number) => void, + setExecutionIdCallback?: (executionId: number) => void, config?: Config, originalRequestName?: string, ): Promise { @@ -154,22 +153,14 @@ export async function executeToolWithHooks( } } - // Execute the actual tool - let toolResult: ToolResult; - if (setPidCallback && invocation instanceof ShellToolInvocation) { - toolResult = await invocation.execute( - signal, - liveOutputCallback, - shellExecutionConfig, - setPidCallback, - ); - } else { - toolResult = await invocation.execute( - signal, - liveOutputCallback, - shellExecutionConfig, - ); - } + // Execute the actual tool. Tools that support backgrounding can optionally + // surface an execution ID via the callback. + const toolResult: ToolResult = await invocation.execute( + signal, + liveOutputCallback, + shellExecutionConfig, + setExecutionIdCallback, + ); // Append notification if parameters were modified if (inputWasModified) { diff --git a/packages/core/src/scheduler/tool-executor.test.ts b/packages/core/src/scheduler/tool-executor.test.ts index e744738341..9e26ff4b3e 100644 --- a/packages/core/src/scheduler/tool-executor.test.ts +++ b/packages/core/src/scheduler/tool-executor.test.ts @@ -469,7 +469,7 @@ describe('ToolExecutor', () => { expect(result.status).toBe(CoreToolCallStatus.Success); }); - it('should report PID updates for shell tools', async () => { + it('should report execution ID updates for backgroundable tools', async () => { // 1. Setup ShellToolInvocation const messageBus = createMockMessageBus(); const shellInvocation = new ShellToolInvocation( @@ -480,7 +480,7 @@ describe('ToolExecutor', () => { // We need a dummy tool that matches the invocation just for structure const mockTool = new MockTool({ name: SHELL_TOOL_NAME }); - // 2. Mock executeToolWithHooks to trigger the PID callback + // 2. Mock executeToolWithHooks to trigger the execution ID callback const testPid = 12345; vi.mocked(coreToolHookTriggers.executeToolWithHooks).mockImplementation( async ( @@ -490,13 +490,13 @@ describe('ToolExecutor', () => { _tool, _liveCb, _shellCfg, - setPidCallback, + setExecutionIdCallback, _config, _originalRequestName, ) => { - // Simulate the shell tool reporting a PID - if (setPidCallback) { - setPidCallback(testPid); + // Simulate the tool reporting an execution ID + if (setExecutionIdCallback) { + setExecutionIdCallback(testPid); } return { llmContent: 'done', returnDisplay: 'done' }; }, @@ -525,7 +525,7 @@ describe('ToolExecutor', () => { onUpdateToolCall, }); - // 4. Verify PID was reported + // 4. Verify execution ID was reported expect(onUpdateToolCall).toHaveBeenCalledWith( expect.objectContaining({ status: CoreToolCallStatus.Executing, @@ -534,6 +534,59 @@ describe('ToolExecutor', () => { ); }); + it('should report execution ID updates for non-shell backgroundable tools', async () => { + const mockTool = new MockTool({ + name: 'remote_agent_call', + description: 'Remote agent call', + }); + const invocation = mockTool.build({}); + + const testExecutionId = 67890; + vi.mocked(coreToolHookTriggers.executeToolWithHooks).mockImplementation( + async ( + _inv, + _name, + _sig, + _tool, + _liveCb, + _shellCfg, + setExecutionIdCallback, + ) => { + setExecutionIdCallback?.(testExecutionId); + return { llmContent: 'done', returnDisplay: 'done' }; + }, + ); + + const scheduledCall: ScheduledToolCall = { + status: CoreToolCallStatus.Scheduled, + request: { + callId: 'call-remote-pid', + name: 'remote_agent_call', + args: {}, + isClientInitiated: false, + prompt_id: 'prompt-remote-pid', + }, + tool: mockTool, + invocation: invocation as unknown as AnyToolInvocation, + startTime: Date.now(), + }; + + const onUpdateToolCall = vi.fn(); + + await executor.execute({ + call: scheduledCall, + signal: new AbortController().signal, + onUpdateToolCall, + }); + + expect(onUpdateToolCall).toHaveBeenCalledWith( + expect.objectContaining({ + status: CoreToolCallStatus.Executing, + pid: testExecutionId, + }), + ); + }); + it('should return cancelled result with partial output when signal is aborted', async () => { const mockTool = new MockTool({ name: 'slowTool', diff --git a/packages/core/src/scheduler/tool-executor.ts b/packages/core/src/scheduler/tool-executor.ts index 8269f1fc41..c367d30d72 100644 --- a/packages/core/src/scheduler/tool-executor.ts +++ b/packages/core/src/scheduler/tool-executor.ts @@ -16,7 +16,6 @@ import { type ToolLiveOutput, } from '../index.js'; import { SHELL_TOOL_NAME } from '../tools/tool-names.js'; -import { ShellToolInvocation } from '../tools/shell.js'; import { DiscoveredMCPTool } from '../tools/mcp-tool.js'; import { executeToolWithHooks } from '../core/coreToolHookTriggers.js'; import { @@ -89,43 +88,29 @@ export class ToolExecutor { let completedToolCall: CompletedToolCall; try { - let promise: Promise; - if (invocation instanceof ShellToolInvocation) { - const setPidCallback = (pid: number) => { - const executingCall: ExecutingToolCall = { - ...call, - status: CoreToolCallStatus.Executing, - tool, - invocation, - pid, - startTime: 'startTime' in call ? call.startTime : undefined, - }; - onUpdateToolCall(executingCall); + const setExecutionIdCallback = (executionId: number) => { + const executingCall: ExecutingToolCall = { + ...call, + status: CoreToolCallStatus.Executing, + tool, + invocation, + pid: executionId, + startTime: 'startTime' in call ? call.startTime : undefined, }; - promise = executeToolWithHooks( - invocation, - toolName, - signal, - tool, - liveOutputCallback, - shellExecutionConfig, - setPidCallback, - this.config, - request.originalRequestName, - ); - } else { - promise = executeToolWithHooks( - invocation, - toolName, - signal, - tool, - liveOutputCallback, - shellExecutionConfig, - undefined, - this.config, - request.originalRequestName, - ); - } + onUpdateToolCall(executingCall); + }; + + const promise = executeToolWithHooks( + invocation, + toolName, + signal, + tool, + liveOutputCallback, + shellExecutionConfig, + setExecutionIdCallback, + this.config, + request.originalRequestName, + ); const toolResult: ToolResult = await promise; diff --git a/packages/core/src/services/executionLifecycleService.test.ts b/packages/core/src/services/executionLifecycleService.test.ts new file mode 100644 index 0000000000..213ad39224 --- /dev/null +++ b/packages/core/src/services/executionLifecycleService.test.ts @@ -0,0 +1,298 @@ +/** + * @license + * Copyright 2025 Google LLC + * SPDX-License-Identifier: Apache-2.0 + */ + +import { beforeEach, describe, expect, it, vi } from 'vitest'; +import { + ExecutionLifecycleService, + type ExecutionHandle, + type ExecutionResult, +} from './executionLifecycleService.js'; + +function createResult( + overrides: Partial = {}, +): ExecutionResult { + return { + rawOutput: Buffer.from(''), + output: '', + exitCode: 0, + signal: null, + error: null, + aborted: false, + pid: 123, + executionMethod: 'child_process', + ...overrides, + }; +} + +describe('ExecutionLifecycleService', () => { + beforeEach(() => { + ExecutionLifecycleService.resetForTest(); + }); + + it('completes managed executions in the foreground and notifies exit subscribers', async () => { + const handle = ExecutionLifecycleService.createExecution(); + if (handle.pid === undefined) { + throw new Error('Expected execution ID.'); + } + + const onExit = vi.fn(); + const unsubscribe = ExecutionLifecycleService.onExit(handle.pid, onExit); + + ExecutionLifecycleService.appendOutput(handle.pid, 'Hello'); + ExecutionLifecycleService.appendOutput(handle.pid, ' World'); + ExecutionLifecycleService.completeExecution(handle.pid, { + exitCode: 0, + }); + + const result = await handle.result; + expect(result.output).toBe('Hello World'); + expect(result.executionMethod).toBe('none'); + expect(result.backgrounded).toBeUndefined(); + + await vi.waitFor(() => { + expect(onExit).toHaveBeenCalledWith(0, undefined); + }); + + unsubscribe(); + }); + + it('supports explicit execution methods for managed executions', async () => { + const handle = ExecutionLifecycleService.createExecution( + '', + undefined, + 'remote_agent', + ); + if (handle.pid === undefined) { + throw new Error('Expected execution ID.'); + } + + ExecutionLifecycleService.completeExecution(handle.pid, { + exitCode: 0, + }); + const result = await handle.result; + expect(result.executionMethod).toBe('remote_agent'); + }); + + it('supports backgrounding managed executions and continues streaming updates', async () => { + const handle = ExecutionLifecycleService.createExecution(); + if (handle.pid === undefined) { + throw new Error('Expected execution ID.'); + } + + const chunks: string[] = []; + const onExit = vi.fn(); + + const unsubscribeStream = ExecutionLifecycleService.subscribe( + handle.pid, + (event) => { + if (event.type === 'data' && typeof event.chunk === 'string') { + chunks.push(event.chunk); + } + }, + ); + const unsubscribeExit = ExecutionLifecycleService.onExit( + handle.pid, + onExit, + ); + + ExecutionLifecycleService.appendOutput(handle.pid, 'Chunk 1'); + ExecutionLifecycleService.background(handle.pid); + + const backgroundResult = await handle.result; + expect(backgroundResult.backgrounded).toBe(true); + expect(backgroundResult.output).toBe('Chunk 1'); + + ExecutionLifecycleService.appendOutput(handle.pid, '\nChunk 2'); + ExecutionLifecycleService.completeExecution(handle.pid, { + exitCode: 0, + }); + + await vi.waitFor(() => { + expect(chunks.join('')).toContain('Chunk 2'); + expect(onExit).toHaveBeenCalledWith(0, undefined); + }); + + unsubscribeStream(); + unsubscribeExit(); + }); + + it('kills managed executions and resolves with aborted result', async () => { + const onKill = vi.fn(); + const handle = ExecutionLifecycleService.createExecution('', onKill); + if (handle.pid === undefined) { + throw new Error('Expected execution ID.'); + } + + ExecutionLifecycleService.appendOutput(handle.pid, 'work'); + ExecutionLifecycleService.kill(handle.pid); + + const result = await handle.result; + expect(onKill).toHaveBeenCalledTimes(1); + expect(result.aborted).toBe(true); + expect(result.exitCode).toBe(130); + expect(result.error?.message).toContain('Operation cancelled by user'); + }); + + it('does not probe OS process state for completed non-process execution IDs', async () => { + const handle = ExecutionLifecycleService.createExecution(); + if (handle.pid === undefined) { + throw new Error('Expected execution ID.'); + } + + ExecutionLifecycleService.completeExecution(handle.pid, { exitCode: 0 }); + await handle.result; + + const processKillSpy = vi.spyOn(process, 'kill'); + expect(ExecutionLifecycleService.isActive(handle.pid)).toBe(false); + expect(processKillSpy).not.toHaveBeenCalled(); + processKillSpy.mockRestore(); + }); + + it('manages external executions through registration hooks', async () => { + const writeInput = vi.fn(); + const isActive = vi.fn().mockReturnValue(true); + const exitListener = vi.fn(); + const chunks: string[] = []; + + let output = 'seed'; + const handle: ExecutionHandle = ExecutionLifecycleService.attachExecution( + 4321, + { + executionMethod: 'child_process', + getBackgroundOutput: () => output, + getSubscriptionSnapshot: () => output, + writeInput, + isActive, + }, + ); + + const unsubscribe = ExecutionLifecycleService.subscribe(4321, (event) => { + if (event.type === 'data' && typeof event.chunk === 'string') { + chunks.push(event.chunk); + } + }); + ExecutionLifecycleService.onExit(4321, exitListener); + + ExecutionLifecycleService.writeInput(4321, 'stdin'); + expect(writeInput).toHaveBeenCalledWith('stdin'); + expect(ExecutionLifecycleService.isActive(4321)).toBe(true); + + const firstChunk = { type: 'data', chunk: ' +delta' } as const; + ExecutionLifecycleService.emitEvent(4321, firstChunk); + output += firstChunk.chunk; + + ExecutionLifecycleService.background(4321); + const backgroundResult = await handle.result; + expect(backgroundResult.backgrounded).toBe(true); + expect(backgroundResult.output).toBe('seed +delta'); + expect(backgroundResult.executionMethod).toBe('child_process'); + + ExecutionLifecycleService.completeWithResult( + 4321, + createResult({ + pid: 4321, + output: 'seed +delta done', + rawOutput: Buffer.from('seed +delta done'), + executionMethod: 'child_process', + }), + ); + + await vi.waitFor(() => { + expect(exitListener).toHaveBeenCalledWith(0, undefined); + }); + + const lateExit = vi.fn(); + ExecutionLifecycleService.onExit(4321, lateExit); + expect(lateExit).toHaveBeenCalledWith(0, undefined); + + unsubscribe(); + }); + + it('supports late subscription catch-up after backgrounding an external execution', async () => { + let output = 'seed'; + const onExit = vi.fn(); + const handle = ExecutionLifecycleService.attachExecution(4322, { + executionMethod: 'child_process', + getBackgroundOutput: () => output, + getSubscriptionSnapshot: () => output, + }); + + ExecutionLifecycleService.onExit(4322, onExit); + ExecutionLifecycleService.background(4322); + + const backgroundResult = await handle.result; + expect(backgroundResult.backgrounded).toBe(true); + expect(backgroundResult.output).toBe('seed'); + + output += ' +late'; + ExecutionLifecycleService.emitEvent(4322, { + type: 'data', + chunk: ' +late', + }); + + const chunks: string[] = []; + const unsubscribe = ExecutionLifecycleService.subscribe(4322, (event) => { + if (event.type === 'data' && typeof event.chunk === 'string') { + chunks.push(event.chunk); + } + }); + expect(chunks[0]).toBe('seed +late'); + + output += ' +live'; + ExecutionLifecycleService.emitEvent(4322, { + type: 'data', + chunk: ' +live', + }); + expect(chunks[chunks.length - 1]).toBe(' +live'); + + ExecutionLifecycleService.completeWithResult( + 4322, + createResult({ + pid: 4322, + output, + rawOutput: Buffer.from(output), + executionMethod: 'child_process', + }), + ); + + await vi.waitFor(() => { + expect(onExit).toHaveBeenCalledWith(0, undefined); + }); + unsubscribe(); + }); + + it('kills external executions and settles pending promises', async () => { + const terminate = vi.fn(); + const onExit = vi.fn(); + const handle = ExecutionLifecycleService.attachExecution(4323, { + executionMethod: 'child_process', + initialOutput: 'running', + kill: terminate, + }); + ExecutionLifecycleService.onExit(4323, onExit); + ExecutionLifecycleService.kill(4323); + + const result = await handle.result; + expect(terminate).toHaveBeenCalledTimes(1); + expect(result.aborted).toBe(true); + expect(result.exitCode).toBe(130); + expect(result.output).toBe('running'); + expect(result.error?.message).toContain('Operation cancelled by user'); + expect(onExit).toHaveBeenCalledWith(130, undefined); + }); + + it('rejects duplicate execution registration for active execution IDs', () => { + ExecutionLifecycleService.attachExecution(4324, { + executionMethod: 'child_process', + }); + + expect(() => { + ExecutionLifecycleService.attachExecution(4324, { + executionMethod: 'child_process', + }); + }).toThrow('Execution 4324 is already attached.'); + }); +}); diff --git a/packages/core/src/services/executionLifecycleService.ts b/packages/core/src/services/executionLifecycleService.ts new file mode 100644 index 0000000000..6195e516da --- /dev/null +++ b/packages/core/src/services/executionLifecycleService.ts @@ -0,0 +1,454 @@ +/** + * @license + * Copyright 2025 Google LLC + * SPDX-License-Identifier: Apache-2.0 + */ + +import type { AnsiOutput } from '../utils/terminalSerializer.js'; + +export type ExecutionMethod = + | 'lydell-node-pty' + | 'node-pty' + | 'child_process' + | 'remote_agent' + | 'none'; + +export interface ExecutionResult { + rawOutput: Buffer; + output: string; + exitCode: number | null; + signal: number | null; + error: Error | null; + aborted: boolean; + pid: number | undefined; + executionMethod: ExecutionMethod; + backgrounded?: boolean; +} + +export interface ExecutionHandle { + pid: number | undefined; + result: Promise; +} + +export type ExecutionOutputEvent = + | { + type: 'data'; + chunk: string | AnsiOutput; + } + | { + type: 'binary_detected'; + } + | { + type: 'binary_progress'; + bytesReceived: number; + } + | { + type: 'exit'; + exitCode: number | null; + signal: number | null; + }; + +export interface ExecutionCompletionOptions { + exitCode?: number | null; + signal?: number | null; + error?: Error | null; + aborted?: boolean; +} + +export interface ExternalExecutionRegistration { + executionMethod: ExecutionMethod; + initialOutput?: string; + getBackgroundOutput?: () => string; + getSubscriptionSnapshot?: () => string | AnsiOutput | undefined; + writeInput?: (input: string) => void; + kill?: () => void; + isActive?: () => boolean; +} + +interface ManagedExecutionBase { + executionMethod: ExecutionMethod; + output: string; + getBackgroundOutput?: () => string; + getSubscriptionSnapshot?: () => string | AnsiOutput | undefined; +} + +interface VirtualExecutionState extends ManagedExecutionBase { + kind: 'virtual'; + onKill?: () => void; +} + +interface ExternalExecutionState extends ManagedExecutionBase { + kind: 'external'; + writeInput?: (input: string) => void; + kill?: () => void; + isActive?: () => boolean; +} + +type ManagedExecutionState = VirtualExecutionState | ExternalExecutionState; + +const NON_PROCESS_EXECUTION_ID_START = 2_000_000_000; + +/** + * Central owner for execution backgrounding lifecycle across shell and tools. + */ +export class ExecutionLifecycleService { + private static readonly EXIT_INFO_TTL_MS = 5 * 60 * 1000; + private static nextExecutionId = NON_PROCESS_EXECUTION_ID_START; + + private static activeExecutions = new Map(); + private static activeResolvers = new Map< + number, + (result: ExecutionResult) => void + >(); + private static activeListeners = new Map< + number, + Set<(event: ExecutionOutputEvent) => void> + >(); + private static exitedExecutionInfo = new Map< + number, + { exitCode: number; signal?: number } + >(); + + private static storeExitInfo( + executionId: number, + exitCode: number, + signal?: number, + ): void { + this.exitedExecutionInfo.set(executionId, { + exitCode, + signal, + }); + setTimeout(() => { + this.exitedExecutionInfo.delete(executionId); + }, this.EXIT_INFO_TTL_MS).unref(); + } + + private static allocateExecutionId(): number { + let executionId = ++this.nextExecutionId; + while (this.activeExecutions.has(executionId)) { + executionId = ++this.nextExecutionId; + } + return executionId; + } + + private static createPendingResult( + executionId: number, + ): Promise { + return new Promise((resolve) => { + this.activeResolvers.set(executionId, resolve); + }); + } + + private static createAbortedResult( + executionId: number, + execution: ManagedExecutionState, + ): ExecutionResult { + const output = execution.getBackgroundOutput?.() ?? execution.output; + return { + rawOutput: Buffer.from(output, 'utf8'), + output, + exitCode: 130, + signal: null, + error: new Error('Operation cancelled by user.'), + aborted: true, + pid: executionId, + executionMethod: execution.executionMethod, + }; + } + + /** + * Resets lifecycle state for isolated unit tests. + */ + static resetForTest(): void { + this.activeExecutions.clear(); + this.activeResolvers.clear(); + this.activeListeners.clear(); + this.exitedExecutionInfo.clear(); + this.nextExecutionId = NON_PROCESS_EXECUTION_ID_START; + } + + static attachExecution( + executionId: number, + registration: ExternalExecutionRegistration, + ): ExecutionHandle { + if ( + this.activeExecutions.has(executionId) || + this.activeResolvers.has(executionId) + ) { + throw new Error(`Execution ${executionId} is already attached.`); + } + this.exitedExecutionInfo.delete(executionId); + + this.activeExecutions.set(executionId, { + executionMethod: registration.executionMethod, + output: registration.initialOutput ?? '', + kind: 'external', + getBackgroundOutput: registration.getBackgroundOutput, + getSubscriptionSnapshot: registration.getSubscriptionSnapshot, + writeInput: registration.writeInput, + kill: registration.kill, + isActive: registration.isActive, + }); + + return { + pid: executionId, + result: this.createPendingResult(executionId), + }; + } + + static createExecution( + initialOutput = '', + onKill?: () => void, + executionMethod: ExecutionMethod = 'none', + ): ExecutionHandle { + const executionId = this.allocateExecutionId(); + + this.activeExecutions.set(executionId, { + executionMethod, + output: initialOutput, + kind: 'virtual', + onKill, + getBackgroundOutput: () => { + const state = this.activeExecutions.get(executionId); + return state?.output ?? initialOutput; + }, + getSubscriptionSnapshot: () => { + const state = this.activeExecutions.get(executionId); + return state?.output ?? initialOutput; + }, + }); + + return { + pid: executionId, + result: this.createPendingResult(executionId), + }; + } + + static appendOutput(executionId: number, chunk: string): void { + const execution = this.activeExecutions.get(executionId); + if (!execution || chunk.length === 0) { + return; + } + + execution.output += chunk; + this.emitEvent(executionId, { type: 'data', chunk }); + } + + static emitEvent(executionId: number, event: ExecutionOutputEvent): void { + const listeners = this.activeListeners.get(executionId); + if (listeners) { + listeners.forEach((listener) => listener(event)); + } + } + + private static resolvePending( + executionId: number, + result: ExecutionResult, + ): void { + const resolve = this.activeResolvers.get(executionId); + if (!resolve) { + return; + } + + resolve(result); + this.activeResolvers.delete(executionId); + } + + private static settleExecution( + executionId: number, + result: ExecutionResult, + ): void { + if (!this.activeExecutions.has(executionId)) { + return; + } + + this.resolvePending(executionId, result); + this.emitEvent(executionId, { + type: 'exit', + exitCode: result.exitCode, + signal: result.signal, + }); + + this.activeListeners.delete(executionId); + this.activeExecutions.delete(executionId); + this.storeExitInfo( + executionId, + result.exitCode ?? 0, + result.signal ?? undefined, + ); + } + + static completeExecution( + executionId: number, + options?: ExecutionCompletionOptions, + ): void { + const execution = this.activeExecutions.get(executionId); + if (!execution) { + return; + } + + const { + error = null, + aborted = false, + exitCode = error ? 1 : 0, + signal = null, + } = options ?? {}; + + const output = execution.getBackgroundOutput?.() ?? execution.output; + + this.settleExecution(executionId, { + rawOutput: Buffer.from(output, 'utf8'), + output, + exitCode, + signal, + error, + aborted, + pid: executionId, + executionMethod: execution.executionMethod, + }); + } + + static completeWithResult( + executionId: number, + result: ExecutionResult, + ): void { + this.settleExecution(executionId, result); + } + + static background(executionId: number): void { + const resolve = this.activeResolvers.get(executionId); + if (!resolve) { + return; + } + + const execution = this.activeExecutions.get(executionId); + if (!execution) { + return; + } + + const output = execution.getBackgroundOutput?.() ?? execution.output; + + resolve({ + rawOutput: Buffer.from(''), + output, + exitCode: null, + signal: null, + error: null, + aborted: false, + pid: executionId, + executionMethod: execution.executionMethod, + backgrounded: true, + }); + + this.activeResolvers.delete(executionId); + } + + static subscribe( + executionId: number, + listener: (event: ExecutionOutputEvent) => void, + ): () => void { + if (!this.activeListeners.has(executionId)) { + this.activeListeners.set(executionId, new Set()); + } + this.activeListeners.get(executionId)?.add(listener); + + const execution = this.activeExecutions.get(executionId); + if (execution) { + const snapshot = + execution.getSubscriptionSnapshot?.() ?? + (execution.output.length > 0 ? execution.output : undefined); + if (snapshot && (typeof snapshot !== 'string' || snapshot.length > 0)) { + listener({ type: 'data', chunk: snapshot }); + } + } + + return () => { + this.activeListeners.get(executionId)?.delete(listener); + if (this.activeListeners.get(executionId)?.size === 0) { + this.activeListeners.delete(executionId); + } + }; + } + + static onExit( + executionId: number, + callback: (exitCode: number, signal?: number) => void, + ): () => void { + if (this.activeExecutions.has(executionId)) { + const listener = (event: ExecutionOutputEvent) => { + if (event.type === 'exit') { + callback(event.exitCode ?? 0, event.signal ?? undefined); + unsubscribe(); + } + }; + const unsubscribe = this.subscribe(executionId, listener); + return unsubscribe; + } + + const exitedInfo = this.exitedExecutionInfo.get(executionId); + if (exitedInfo) { + callback(exitedInfo.exitCode, exitedInfo.signal); + } + + return () => {}; + } + + static kill(executionId: number): void { + const execution = this.activeExecutions.get(executionId); + if (!execution) { + return; + } + + if (execution.kind === 'virtual') { + execution.onKill?.(); + } + + if (execution.kind === 'external') { + execution.kill?.(); + } + + this.completeWithResult( + executionId, + this.createAbortedResult(executionId, execution), + ); + } + + static isActive(executionId: number): boolean { + const execution = this.activeExecutions.get(executionId); + if (!execution) { + if (executionId >= NON_PROCESS_EXECUTION_ID_START) { + return false; + } + try { + return process.kill(executionId, 0); + } catch { + return false; + } + } + + if (execution.kind === 'virtual') { + return true; + } + + if (execution.kind === 'external' && execution.isActive) { + try { + return execution.isActive(); + } catch { + return false; + } + } + + try { + return process.kill(executionId, 0); + } catch { + return false; + } + } + + static writeInput(executionId: number, input: string): void { + const execution = this.activeExecutions.get(executionId); + if (execution?.kind === 'external') { + execution.writeInput?.(input); + } + } +} diff --git a/packages/core/src/services/shellExecutionService.test.ts b/packages/core/src/services/shellExecutionService.test.ts index 77de13de3a..c2d59c1bdf 100644 --- a/packages/core/src/services/shellExecutionService.test.ts +++ b/packages/core/src/services/shellExecutionService.test.ts @@ -21,6 +21,7 @@ import { type ShellOutputEvent, type ShellExecutionConfig, } from './shellExecutionService.js'; +import { ExecutionLifecycleService } from './executionLifecycleService.js'; import type { AnsiOutput, AnsiToken } from '../utils/terminalSerializer.js'; // Hoisted Mocks @@ -166,6 +167,7 @@ describe('ShellExecutionService', () => { beforeEach(() => { vi.clearAllMocks(); + ExecutionLifecycleService.resetForTest(); mockSerializeTerminalToObject.mockReturnValue([]); mockIsBinary.mockReturnValue(false); mockPlatform.mockReturnValue('linux'); @@ -432,13 +434,21 @@ describe('ShellExecutionService', () => { }); describe('pty interaction', () => { + let activePtysGetSpy: { mockRestore: () => void }; + beforeEach(() => { - vi.spyOn(ShellExecutionService['activePtys'], 'get').mockReturnValue({ - // eslint-disable-next-line @typescript-eslint/no-explicit-any - ptyProcess: mockPtyProcess as any, - // eslint-disable-next-line @typescript-eslint/no-explicit-any - headlessTerminal: mockHeadlessTerminal as any, - }); + activePtysGetSpy = vi + .spyOn(ShellExecutionService['activePtys'], 'get') + .mockReturnValue({ + // eslint-disable-next-line @typescript-eslint/no-explicit-any + ptyProcess: mockPtyProcess as any, + // eslint-disable-next-line @typescript-eslint/no-explicit-any + headlessTerminal: mockHeadlessTerminal as any, + }); + }); + + afterEach(() => { + activePtysGetSpy.mockRestore(); }); it('should write to the pty and trigger a render', async () => { diff --git a/packages/core/src/services/shellExecutionService.ts b/packages/core/src/services/shellExecutionService.ts index fdb2ca79b5..dee515965f 100644 --- a/packages/core/src/services/shellExecutionService.ts +++ b/packages/core/src/services/shellExecutionService.ts @@ -6,8 +6,9 @@ import stripAnsi from 'strip-ansi'; import { getPty, type PtyImplementation } from '../utils/getPty.js'; -import { spawn as cpSpawn, type ChildProcess } from 'node:child_process'; +import { spawn as cpSpawn } from 'node:child_process'; import { TextDecoder } from 'node:util'; +import type { Writable } from 'node:stream'; import os from 'node:os'; import type { IPty } from '@lydell/node-pty'; import { getCachedEncodingForBuffer } from '../utils/systemEncoding.js'; @@ -27,6 +28,12 @@ import { type EnvironmentSanitizationConfig, } from './environmentSanitization.js'; import { killProcessGroup } from '../utils/process-utils.js'; +import { + ExecutionLifecycleService, + type ExecutionHandle, + type ExecutionOutputEvent, + type ExecutionResult, +} from './executionLifecycleService.js'; const { Terminal } = pkg; const MAX_CHILD_PROCESS_BUFFER_SIZE = 16 * 1024 * 1024; // 16MB @@ -65,34 +72,10 @@ function ensurePromptvarsDisabled(command: string, shell: ShellType): string { } /** A structured result from a shell command execution. */ -export interface ShellExecutionResult { - /** The raw, unprocessed output buffer. */ - rawOutput: Buffer; - /** The combined, decoded output as a string. */ - output: string; - /** The process exit code, or null if terminated by a signal. */ - exitCode: number | null; - /** The signal that terminated the process, if any. */ - signal: number | null; - /** An error object if the process failed to spawn. */ - error: Error | null; - /** A boolean indicating if the command was aborted by the user. */ - aborted: boolean; - /** The process ID of the spawned shell. */ - pid: number | undefined; - /** The method used to execute the shell command. */ - executionMethod: 'lydell-node-pty' | 'node-pty' | 'child_process' | 'none'; - /** Whether the command was moved to the background. */ - backgrounded?: boolean; -} +export type ShellExecutionResult = ExecutionResult; /** A handle for an ongoing shell execution. */ -export interface ShellExecutionHandle { - /** The process ID of the spawned shell. */ - pid: number | undefined; - /** A promise that resolves with the complete execution result. */ - result: Promise; -} +export type ShellExecutionHandle = ExecutionHandle; export interface ShellExecutionConfig { terminalWidth?: number; @@ -111,31 +94,7 @@ export interface ShellExecutionConfig { /** * Describes a structured event emitted during shell command execution. */ -export type ShellOutputEvent = - | { - /** The event contains a chunk of output data. */ - type: 'data'; - /** The decoded string chunk. */ - chunk: string | AnsiOutput; - } - | { - /** Signals that the output stream has been identified as binary. */ - type: 'binary_detected'; - } - | { - /** Provides progress updates for a binary stream. */ - type: 'binary_progress'; - /** The total number of bytes received so far. */ - bytesReceived: number; - } - | { - /** Signals that the process has exited. */ - type: 'exit'; - /** The exit code of the process, if any. */ - exitCode: number | null; - /** The signal that terminated the process, if any. */ - signal: number | null; - }; +export type ShellOutputEvent = ExecutionOutputEvent; interface ActivePty { ptyProcess: IPty; @@ -143,15 +102,6 @@ interface ActivePty { maxSerializedLines?: number; } -interface ActiveChildProcess { - process: ChildProcess; - state: { - output: string; - truncated: boolean; - outputChunks: Buffer[]; - }; -} - const getFullBufferText = (terminal: pkg.Terminal): string => { const buffer = terminal.buffer.active; const lines: string[] = []; @@ -197,19 +147,6 @@ const getFullBufferText = (terminal: pkg.Terminal): string => { export class ShellExecutionService { private static activePtys = new Map(); - private static activeChildProcesses = new Map(); - private static exitedPtyInfo = new Map< - number, - { exitCode: number; signal?: number } - >(); - private static activeResolvers = new Map< - number, - (res: ShellExecutionResult) => void - >(); - private static activeListeners = new Map< - number, - Set<(event: ShellOutputEvent) => void> - >(); /** * Executes a shell command using `node-pty`, capturing all output and lifecycle events. * @@ -285,13 +222,6 @@ export class ShellExecutionService { return { newBuffer: truncatedBuffer + chunk, truncated: true }; } - private static emitEvent(pid: number, event: ShellOutputEvent): void { - const listeners = this.activeListeners.get(pid); - if (listeners) { - listeners.forEach((listener) => listener(event)); - } - } - private static childProcessFallback( commandToExecute: string, cwd: string, @@ -327,199 +257,229 @@ export class ShellExecutionService { outputChunks: [] as Buffer[], }; - if (child.pid) { - this.activeChildProcesses.set(child.pid, { - process: child, - state, - }); - } + const lifecycleHandle = child.pid + ? ExecutionLifecycleService.attachExecution(child.pid, { + executionMethod: 'child_process', + getBackgroundOutput: () => state.output, + getSubscriptionSnapshot: () => state.output || undefined, + writeInput: (input) => { + const stdin = child.stdin as Writable | null; + if (stdin) { + stdin.write(input); + } + }, + kill: () => { + if (child.pid) { + killProcessGroup({ pid: child.pid }).catch(() => {}); + } + }, + isActive: () => { + if (!child.pid) { + return false; + } + try { + return process.kill(child.pid, 0); + } catch { + return false; + } + }, + }) + : undefined; - const result = new Promise((resolve) => { - if (child.pid) { - this.activeResolvers.set(child.pid, resolve); + let resolveWithoutPid: + | ((result: ShellExecutionResult) => void) + | undefined; + const result = + lifecycleHandle?.result ?? + new Promise((resolve) => { + resolveWithoutPid = resolve; + }); + + let stdoutDecoder: TextDecoder | null = null; + let stderrDecoder: TextDecoder | null = null; + let error: Error | null = null; + let exited = false; + + let isStreamingRawContent = true; + const MAX_SNIFF_SIZE = 4096; + let sniffedBytes = 0; + + const handleOutput = (data: Buffer, stream: 'stdout' | 'stderr') => { + if (!stdoutDecoder || !stderrDecoder) { + const encoding = getCachedEncodingForBuffer(data); + try { + stdoutDecoder = new TextDecoder(encoding); + stderrDecoder = new TextDecoder(encoding); + } catch { + stdoutDecoder = new TextDecoder('utf-8'); + stderrDecoder = new TextDecoder('utf-8'); + } } - let stdoutDecoder: TextDecoder | null = null; - let stderrDecoder: TextDecoder | null = null; - let error: Error | null = null; - let exited = false; + state.outputChunks.push(data); - let isStreamingRawContent = true; - const MAX_SNIFF_SIZE = 4096; - let sniffedBytes = 0; + if (isStreamingRawContent && sniffedBytes < MAX_SNIFF_SIZE) { + const sniffBuffer = Buffer.concat(state.outputChunks.slice(0, 20)); + sniffedBytes = sniffBuffer.length; - const handleOutput = (data: Buffer, stream: 'stdout' | 'stderr') => { - if (!stdoutDecoder || !stderrDecoder) { - const encoding = getCachedEncodingForBuffer(data); - try { - stdoutDecoder = new TextDecoder(encoding); - stderrDecoder = new TextDecoder(encoding); - } catch { - stdoutDecoder = new TextDecoder('utf-8'); - stderrDecoder = new TextDecoder('utf-8'); + if (isBinary(sniffBuffer)) { + isStreamingRawContent = false; + const event: ShellOutputEvent = { type: 'binary_detected' }; + onOutputEvent(event); + if (child.pid) { + ExecutionLifecycleService.emitEvent(child.pid, event); } } + } - state.outputChunks.push(data); + if (isStreamingRawContent) { + const decoder = stream === 'stdout' ? stdoutDecoder : stderrDecoder; + const decodedChunk = decoder.decode(data, { stream: true }); - if (isStreamingRawContent && sniffedBytes < MAX_SNIFF_SIZE) { - const sniffBuffer = Buffer.concat(state.outputChunks.slice(0, 20)); - sniffedBytes = sniffBuffer.length; - - if (isBinary(sniffBuffer)) { - isStreamingRawContent = false; - const event: ShellOutputEvent = { type: 'binary_detected' }; - onOutputEvent(event); - if (child.pid) ShellExecutionService.emitEvent(child.pid, event); - } + const { newBuffer, truncated } = this.appendAndTruncate( + state.output, + decodedChunk, + MAX_CHILD_PROCESS_BUFFER_SIZE, + ); + state.output = newBuffer; + if (truncated) { + state.truncated = true; } - if (isStreamingRawContent) { - const decoder = stream === 'stdout' ? stdoutDecoder : stderrDecoder; - const decodedChunk = decoder.decode(data, { stream: true }); - - const { newBuffer, truncated } = this.appendAndTruncate( - state.output, - decodedChunk, - MAX_CHILD_PROCESS_BUFFER_SIZE, - ); - state.output = newBuffer; - if (truncated) { - state.truncated = true; - } - - if (decodedChunk) { - const event: ShellOutputEvent = { - type: 'data', - chunk: decodedChunk, - }; - onOutputEvent(event); - if (child.pid) ShellExecutionService.emitEvent(child.pid, event); - } - } else { - const totalBytes = state.outputChunks.reduce( - (sum, chunk) => sum + chunk.length, - 0, - ); + if (decodedChunk) { const event: ShellOutputEvent = { - type: 'binary_progress', - bytesReceived: totalBytes, + type: 'data', + chunk: decodedChunk, }; onOutputEvent(event); - if (child.pid) ShellExecutionService.emitEvent(child.pid, event); + if (child.pid) { + ExecutionLifecycleService.emitEvent(child.pid, event); + } } + } else { + const totalBytes = state.outputChunks.reduce( + (sum, chunk) => sum + chunk.length, + 0, + ); + const event: ShellOutputEvent = { + type: 'binary_progress', + bytesReceived: totalBytes, + }; + onOutputEvent(event); + if (child.pid) { + ExecutionLifecycleService.emitEvent(child.pid, event); + } + } + }; + + const handleExit = ( + code: number | null, + signal: NodeJS.Signals | null, + ) => { + const { finalBuffer } = cleanup(); + + let combinedOutput = state.output; + if (state.truncated) { + const truncationMessage = `\n[GEMINI_CLI_WARNING: Output truncated. The buffer is limited to ${ + MAX_CHILD_PROCESS_BUFFER_SIZE / (1024 * 1024) + }MB.]`; + combinedOutput += truncationMessage; + } + + const finalStrippedOutput = stripAnsi(combinedOutput).trim(); + const exitCode = code; + const exitSignal = signal ? os.constants.signals[signal] : null; + + const resultPayload: ShellExecutionResult = { + rawOutput: finalBuffer, + output: finalStrippedOutput, + exitCode, + signal: exitSignal, + error, + aborted: abortSignal.aborted, + pid: child.pid, + executionMethod: 'child_process', }; - const handleExit = ( - code: number | null, - signal: NodeJS.Signals | null, - ) => { - const { finalBuffer } = cleanup(); - - let combinedOutput = state.output; - - if (state.truncated) { - const truncationMessage = `\n[GEMINI_CLI_WARNING: Output truncated. The buffer is limited to ${ - MAX_CHILD_PROCESS_BUFFER_SIZE / (1024 * 1024) - }MB.]`; - combinedOutput += truncationMessage; - } - - const finalStrippedOutput = stripAnsi(combinedOutput).trim(); - const exitCode = code; - const exitSignal = signal ? os.constants.signals[signal] : null; - - if (child.pid) { - const event: ShellOutputEvent = { - type: 'exit', - exitCode, - signal: exitSignal, - }; - onOutputEvent(event); - ShellExecutionService.emitEvent(child.pid, event); - - this.activeChildProcesses.delete(child.pid); - this.activeResolvers.delete(child.pid); - this.activeListeners.delete(child.pid); - } - - resolve({ - rawOutput: finalBuffer, - output: finalStrippedOutput, + if (child.pid) { + const event: ShellOutputEvent = { + type: 'exit', exitCode, signal: exitSignal, - error, - aborted: abortSignal.aborted, - pid: child.pid, - executionMethod: 'child_process', - }); - }; - - child.stdout.on('data', (data) => handleOutput(data, 'stdout')); - child.stderr.on('data', (data) => handleOutput(data, 'stderr')); - child.on('error', (err) => { - error = err; - handleExit(1, null); - }); - - const abortHandler = async () => { - if (child.pid && !exited) { - await killProcessGroup({ - pid: child.pid, - escalate: true, - isExited: () => exited, - }); - } - }; - - abortSignal.addEventListener('abort', abortHandler, { once: true }); - - child.on('exit', (code, signal) => { - handleExit(code, signal); - }); - - function cleanup() { - exited = true; - abortSignal.removeEventListener('abort', abortHandler); - if (stdoutDecoder) { - const remaining = stdoutDecoder.decode(); - if (remaining) { - state.output += remaining; - // If there's remaining output, we should technically emit it too, - // but it's rare to have partial utf8 chars at the very end of stream. - if (isStreamingRawContent && remaining) { - const event: ShellOutputEvent = { - type: 'data', - chunk: remaining, - }; - onOutputEvent(event); - if (child.pid) - ShellExecutionService.emitEvent(child.pid, event); - } - } - } - if (stderrDecoder) { - const remaining = stderrDecoder.decode(); - if (remaining) { - state.output += remaining; - if (isStreamingRawContent && remaining) { - const event: ShellOutputEvent = { - type: 'data', - chunk: remaining, - }; - onOutputEvent(event); - if (child.pid) - ShellExecutionService.emitEvent(child.pid, event); - } - } - } - - const finalBuffer = Buffer.concat(state.outputChunks); - - return { finalBuffer }; + }; + onOutputEvent(event); + ExecutionLifecycleService.completeWithResult( + child.pid, + resultPayload, + ); + } else { + resolveWithoutPid?.(resultPayload); } + }; + + child.stdout.on('data', (data) => handleOutput(data, 'stdout')); + child.stderr.on('data', (data) => handleOutput(data, 'stderr')); + child.on('error', (err) => { + error = err; + handleExit(1, null); }); + const abortHandler = async () => { + if (child.pid && !exited) { + await killProcessGroup({ + pid: child.pid, + escalate: true, + isExited: () => exited, + }); + } + }; + + abortSignal.addEventListener('abort', abortHandler, { once: true }); + + child.on('exit', (code, signal) => { + handleExit(code, signal); + }); + + function cleanup() { + exited = true; + abortSignal.removeEventListener('abort', abortHandler); + if (stdoutDecoder) { + const remaining = stdoutDecoder.decode(); + if (remaining) { + state.output += remaining; + if (isStreamingRawContent) { + const event: ShellOutputEvent = { + type: 'data', + chunk: remaining, + }; + onOutputEvent(event); + if (child.pid) { + ExecutionLifecycleService.emitEvent(child.pid, event); + } + } + } + } + if (stderrDecoder) { + const remaining = stderrDecoder.decode(); + if (remaining) { + state.output += remaining; + if (isStreamingRawContent) { + const event: ShellOutputEvent = { + type: 'data', + chunk: remaining, + }; + onOutputEvent(event); + if (child.pid) { + ExecutionLifecycleService.emitEvent(child.pid, event); + } + } + } + } + + const finalBuffer = Buffer.concat(state.outputChunks); + return { finalBuffer }; + } + return { pid: child.pid, result }; } catch (e) { // eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion @@ -585,300 +545,316 @@ export class ShellExecutionService { }, handleFlowControl: true, }); + const ptyPid = Number(ptyProcess.pid); - const result = new Promise((resolve) => { - this.activeResolvers.set(ptyProcess.pid, resolve); + const headlessTerminal = new Terminal({ + allowProposedApi: true, + cols, + rows, + scrollback: shellExecutionConfig.scrollback ?? SCROLLBACK_LIMIT, + }); + headlessTerminal.scrollToTop(); - const headlessTerminal = new Terminal({ - allowProposedApi: true, - cols, - rows, - scrollback: shellExecutionConfig.scrollback ?? SCROLLBACK_LIMIT, - }); - headlessTerminal.scrollToTop(); + this.activePtys.set(ptyPid, { + // eslint-disable-next-line @typescript-eslint/no-unsafe-assignment + ptyProcess, + headlessTerminal, + maxSerializedLines: shellExecutionConfig.maxSerializedLines, + }); - this.activePtys.set(ptyProcess.pid, { - // eslint-disable-next-line @typescript-eslint/no-unsafe-assignment - ptyProcess, - headlessTerminal, - maxSerializedLines: shellExecutionConfig.maxSerializedLines, - }); - - let processingChain = Promise.resolve(); - let decoder: TextDecoder | null = null; - let output: string | AnsiOutput | null = null; - const outputChunks: Buffer[] = []; - const error: Error | null = null; - let exited = false; - - let isStreamingRawContent = true; - const MAX_SNIFF_SIZE = 4096; - let sniffedBytes = 0; - let isWriting = false; - let hasStartedOutput = false; - let renderTimeout: NodeJS.Timeout | null = null; - - const renderFn = () => { - renderTimeout = null; - - if (!isStreamingRawContent) { + const result = ExecutionLifecycleService.attachExecution(ptyPid, { + executionMethod: ptyInfo?.name ?? 'node-pty', + writeInput: (input) => { + if (!ExecutionLifecycleService.isActive(ptyPid)) { return; } - - if (!shellExecutionConfig.disableDynamicLineTrimming) { - if (!hasStartedOutput) { - const bufferText = getFullBufferText(headlessTerminal); - if (bufferText.trim().length === 0) { - return; - } - hasStartedOutput = true; - } + ptyProcess.write(input); + }, + kill: () => { + killProcessGroup({ + pid: ptyPid, + // eslint-disable-next-line @typescript-eslint/no-unsafe-assignment + pty: ptyProcess, + }).catch(() => {}); + this.activePtys.delete(ptyPid); + }, + isActive: () => { + try { + return process.kill(ptyPid, 0); + } catch { + return false; } - - const buffer = headlessTerminal.buffer.active; - const endLine = buffer.length; + }, + getBackgroundOutput: () => getFullBufferText(headlessTerminal), + getSubscriptionSnapshot: () => { + const endLine = headlessTerminal.buffer.active.length; const startLine = Math.max( 0, endLine - (shellExecutionConfig.maxSerializedLines ?? 2000), ); - - let newOutput: AnsiOutput; - if (shellExecutionConfig.showColor) { - newOutput = serializeTerminalToObject( - headlessTerminal, - startLine, - endLine, - ); - } else { - newOutput = ( - serializeTerminalToObject(headlessTerminal, startLine, endLine) || - [] - ).map((line) => - line.map((token) => { - token.fg = ''; - token.bg = ''; - return token; - }), - ); - } - - let lastNonEmptyLine = -1; - for (let i = newOutput.length - 1; i >= 0; i--) { - const line = newOutput[i]; - if ( - line - .map((segment) => segment.text) - .join('') - .trim().length > 0 - ) { - lastNonEmptyLine = i; - break; - } - } - - const absoluteCursorY = buffer.baseY + buffer.cursorY; - const cursorRelativeIndex = absoluteCursorY - startLine; - - if (cursorRelativeIndex > lastNonEmptyLine) { - lastNonEmptyLine = cursorRelativeIndex; - } - - const trimmedOutput = newOutput.slice(0, lastNonEmptyLine + 1); - - const finalOutput = shellExecutionConfig.disableDynamicLineTrimming - ? newOutput - : trimmedOutput; - - if (output !== finalOutput) { - output = finalOutput; - const event: ShellOutputEvent = { - type: 'data', - chunk: finalOutput, - }; - onOutputEvent(event); - ShellExecutionService.emitEvent(ptyProcess.pid, event); - } - }; - - const render = (finalRender = false) => { - if (finalRender) { - if (renderTimeout) { - clearTimeout(renderTimeout); - } - renderFn(); - return; - } - - if (renderTimeout) { - return; - } - - renderTimeout = setTimeout(() => { - renderFn(); - renderTimeout = null; - }, 68); - }; - - headlessTerminal.onScroll(() => { - if (!isWriting) { - render(); - } - }); - - const handleOutput = (data: Buffer) => { - processingChain = processingChain.then( - () => - new Promise((resolve) => { - if (!decoder) { - const encoding = getCachedEncodingForBuffer(data); - try { - decoder = new TextDecoder(encoding); - } catch { - decoder = new TextDecoder('utf-8'); - } - } - - outputChunks.push(data); - - if (isStreamingRawContent && sniffedBytes < MAX_SNIFF_SIZE) { - const sniffBuffer = Buffer.concat(outputChunks.slice(0, 20)); - sniffedBytes = sniffBuffer.length; - - if (isBinary(sniffBuffer)) { - isStreamingRawContent = false; - const event: ShellOutputEvent = { type: 'binary_detected' }; - onOutputEvent(event); - ShellExecutionService.emitEvent(ptyProcess.pid, event); - } - } - - if (isStreamingRawContent) { - const decodedChunk = decoder.decode(data, { stream: true }); - if (decodedChunk.length === 0) { - resolve(); - return; - } - isWriting = true; - headlessTerminal.write(decodedChunk, () => { - render(); - isWriting = false; - resolve(); - }); - } else { - const totalBytes = outputChunks.reduce( - (sum, chunk) => sum + chunk.length, - 0, - ); - const event: ShellOutputEvent = { - type: 'binary_progress', - bytesReceived: totalBytes, - }; - onOutputEvent(event); - ShellExecutionService.emitEvent(ptyProcess.pid, event); - resolve(); - } - }), + const bufferData = serializeTerminalToObject( + headlessTerminal, + startLine, + endLine, ); - }; + return bufferData.length > 0 ? bufferData : undefined; + }, + }).result; - ptyProcess.onData((data: string) => { - const bufferData = Buffer.from(data, 'utf-8'); - handleOutput(bufferData); - }); + let processingChain = Promise.resolve(); + let decoder: TextDecoder | null = null; + let output: string | AnsiOutput | null = null; + const outputChunks: Buffer[] = []; + const error: Error | null = null; + let exited = false; - ptyProcess.onExit( - ({ exitCode, signal }: { exitCode: number; signal?: number }) => { - exited = true; - abortSignal.removeEventListener('abort', abortHandler); - this.activePtys.delete(ptyProcess.pid); - // Attempt to destroy the PTY to ensure FD is closed - try { - // eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion - (ptyProcess as IPty & { destroy?: () => void }).destroy?.(); - } catch { - // Ignore errors during cleanup - } + let isStreamingRawContent = true; + const MAX_SNIFF_SIZE = 4096; + let sniffedBytes = 0; + let isWriting = false; + let hasStartedOutput = false; + let renderTimeout: NodeJS.Timeout | null = null; - const finalize = () => { - render(true); + const renderFn = () => { + renderTimeout = null; - // Store exit info for late subscribers (e.g. backgrounding race condition) - this.exitedPtyInfo.set(ptyProcess.pid, { exitCode, signal }); - setTimeout( - () => { - this.exitedPtyInfo.delete(ptyProcess.pid); - }, - 5 * 60 * 1000, - ).unref(); + if (!isStreamingRawContent) { + return; + } - this.activePtys.delete(ptyProcess.pid); - this.activeResolvers.delete(ptyProcess.pid); - - const event: ShellOutputEvent = { - type: 'exit', - exitCode, - signal: signal ?? null, - }; - onOutputEvent(event); - ShellExecutionService.emitEvent(ptyProcess.pid, event); - this.activeListeners.delete(ptyProcess.pid); - - const finalBuffer = Buffer.concat(outputChunks); - - resolve({ - rawOutput: finalBuffer, - output: getFullBufferText(headlessTerminal), - exitCode, - signal: signal ?? null, - error, - aborted: abortSignal.aborted, - // eslint-disable-next-line @typescript-eslint/no-unsafe-assignment - pid: ptyProcess.pid, - executionMethod: ptyInfo?.name ?? 'node-pty', - }); - }; - - if (abortSignal.aborted) { - finalize(); + if (!shellExecutionConfig.disableDynamicLineTrimming) { + if (!hasStartedOutput) { + const bufferText = getFullBufferText(headlessTerminal); + if (bufferText.trim().length === 0) { return; } + hasStartedOutput = true; + } + } - const processingComplete = processingChain.then(() => 'processed'); - const abortFired = new Promise<'aborted'>((res) => { - if (abortSignal.aborted) { - res('aborted'); - return; - } - abortSignal.addEventListener('abort', () => res('aborted'), { - once: true, - }); - }); - - // eslint-disable-next-line @typescript-eslint/no-floating-promises - Promise.race([processingComplete, abortFired]).then(() => { - finalize(); - }); - }, + const buffer = headlessTerminal.buffer.active; + const endLine = buffer.length; + const startLine = Math.max( + 0, + endLine - (shellExecutionConfig.maxSerializedLines ?? 2000), ); - const abortHandler = async () => { - if (ptyProcess.pid && !exited) { - await killProcessGroup({ - // eslint-disable-next-line @typescript-eslint/no-unsafe-assignment - pid: ptyProcess.pid, - escalate: true, - isExited: () => exited, - // eslint-disable-next-line @typescript-eslint/no-unsafe-assignment - pty: ptyProcess, - }); - } - }; + let newOutput: AnsiOutput; + if (shellExecutionConfig.showColor) { + newOutput = serializeTerminalToObject( + headlessTerminal, + startLine, + endLine, + ); + } else { + newOutput = ( + serializeTerminalToObject(headlessTerminal, startLine, endLine) || + [] + ).map((line) => + line.map((token) => { + token.fg = ''; + token.bg = ''; + return token; + }), + ); + } - abortSignal.addEventListener('abort', abortHandler, { once: true }); + let lastNonEmptyLine = -1; + for (let i = newOutput.length - 1; i >= 0; i--) { + const line = newOutput[i]; + if ( + line + .map((segment) => segment.text) + .join('') + .trim().length > 0 + ) { + lastNonEmptyLine = i; + break; + } + } + + const absoluteCursorY = buffer.baseY + buffer.cursorY; + const cursorRelativeIndex = absoluteCursorY - startLine; + + if (cursorRelativeIndex > lastNonEmptyLine) { + lastNonEmptyLine = cursorRelativeIndex; + } + + const trimmedOutput = newOutput.slice(0, lastNonEmptyLine + 1); + const finalOutput = shellExecutionConfig.disableDynamicLineTrimming + ? newOutput + : trimmedOutput; + + if (output !== finalOutput) { + output = finalOutput; + const event: ShellOutputEvent = { + type: 'data', + chunk: finalOutput, + }; + onOutputEvent(event); + ExecutionLifecycleService.emitEvent(ptyPid, event); + } + }; + + const render = (finalRender = false) => { + if (finalRender) { + if (renderTimeout) { + clearTimeout(renderTimeout); + } + renderFn(); + return; + } + + if (renderTimeout) { + return; + } + + renderTimeout = setTimeout(() => { + renderFn(); + renderTimeout = null; + }, 68); + }; + + headlessTerminal.onScroll(() => { + if (!isWriting) { + render(); + } }); - // eslint-disable-next-line @typescript-eslint/no-unsafe-assignment - return { pid: ptyProcess.pid, result }; + const handleOutput = (data: Buffer) => { + processingChain = processingChain.then( + () => + new Promise((resolveChunk) => { + if (!decoder) { + const encoding = getCachedEncodingForBuffer(data); + try { + decoder = new TextDecoder(encoding); + } catch { + decoder = new TextDecoder('utf-8'); + } + } + + outputChunks.push(data); + + if (isStreamingRawContent && sniffedBytes < MAX_SNIFF_SIZE) { + const sniffBuffer = Buffer.concat(outputChunks.slice(0, 20)); + sniffedBytes = sniffBuffer.length; + + if (isBinary(sniffBuffer)) { + isStreamingRawContent = false; + const event: ShellOutputEvent = { type: 'binary_detected' }; + onOutputEvent(event); + ExecutionLifecycleService.emitEvent(ptyPid, event); + } + } + + if (isStreamingRawContent) { + const decodedChunk = decoder.decode(data, { stream: true }); + if (decodedChunk.length === 0) { + resolveChunk(); + return; + } + isWriting = true; + headlessTerminal.write(decodedChunk, () => { + render(); + isWriting = false; + resolveChunk(); + }); + } else { + const totalBytes = outputChunks.reduce( + (sum, chunk) => sum + chunk.length, + 0, + ); + const event: ShellOutputEvent = { + type: 'binary_progress', + bytesReceived: totalBytes, + }; + onOutputEvent(event); + ExecutionLifecycleService.emitEvent(ptyPid, event); + resolveChunk(); + } + }), + ); + }; + + ptyProcess.onData((data: string) => { + const bufferData = Buffer.from(data, 'utf-8'); + handleOutput(bufferData); + }); + + ptyProcess.onExit( + ({ exitCode, signal }: { exitCode: number; signal?: number }) => { + exited = true; + abortSignal.removeEventListener('abort', abortHandler); + this.activePtys.delete(ptyPid); + try { + // eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion + (ptyProcess as IPty & { destroy?: () => void }).destroy?.(); + } catch { + // Ignore errors during cleanup + } + + const finalize = () => { + render(true); + this.activePtys.delete(ptyPid); + + const event: ShellOutputEvent = { + type: 'exit', + exitCode, + signal: signal ?? null, + }; + onOutputEvent(event); + + ExecutionLifecycleService.completeWithResult(ptyPid, { + rawOutput: Buffer.concat(outputChunks), + output: getFullBufferText(headlessTerminal), + exitCode, + signal: signal ?? null, + error, + aborted: abortSignal.aborted, + pid: ptyPid, + executionMethod: ptyInfo?.name ?? 'node-pty', + }); + }; + + if (abortSignal.aborted) { + finalize(); + return; + } + + const processingComplete = processingChain.then(() => 'processed'); + const abortFired = new Promise<'aborted'>((res) => { + if (abortSignal.aborted) { + res('aborted'); + return; + } + abortSignal.addEventListener('abort', () => res('aborted'), { + once: true, + }); + }); + + // eslint-disable-next-line @typescript-eslint/no-floating-promises + Promise.race([processingComplete, abortFired]).then(() => { + finalize(); + }); + }, + ); + + const abortHandler = async () => { + if (ptyProcess.pid && !exited) { + await killProcessGroup({ + pid: ptyPid, + escalate: true, + isExited: () => exited, + // eslint-disable-next-line @typescript-eslint/no-unsafe-assignment + pty: ptyProcess, + }); + } + }; + + abortSignal.addEventListener('abort', abortHandler, { once: true }); + + return { pid: ptyPid, result }; } catch (e) { // eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion const error = e as Error; @@ -914,40 +890,11 @@ export class ShellExecutionService { * @param input The string to write to the terminal. */ static writeToPty(pid: number, input: string): void { - if (this.activeChildProcesses.has(pid)) { - const activeChild = this.activeChildProcesses.get(pid); - if (activeChild) { - activeChild.process.stdin?.write(input); - } - return; - } - - if (!this.isPtyActive(pid)) { - return; - } - - const activePty = this.activePtys.get(pid); - if (activePty) { - activePty.ptyProcess.write(input); - } + ExecutionLifecycleService.writeInput(pid, input); } static isPtyActive(pid: number): boolean { - if (this.activeChildProcesses.has(pid)) { - try { - return process.kill(pid, 0); - } catch { - return false; - } - } - - try { - // process.kill with signal 0 is a way to check for the existence of a process. - // It doesn't actually send a signal. - return process.kill(pid, 0); - } catch (_) { - return false; - } + return ExecutionLifecycleService.isActive(pid); } /** @@ -962,36 +909,7 @@ export class ShellExecutionService { pid: number, callback: (exitCode: number, signal?: number) => void, ): () => void { - const activePty = this.activePtys.get(pid); - if (activePty) { - const disposable = activePty.ptyProcess.onExit( - ({ exitCode, signal }: { exitCode: number; signal?: number }) => { - callback(exitCode, signal); - disposable.dispose(); - }, - ); - return () => disposable.dispose(); - } else if (this.activeChildProcesses.has(pid)) { - const activeChild = this.activeChildProcesses.get(pid); - const listener = (code: number | null, signal: NodeJS.Signals | null) => { - let signalNumber: number | undefined; - if (signal) { - signalNumber = os.constants.signals[signal]; - } - callback(code ?? 0, signalNumber); - }; - activeChild?.process.on('exit', listener); - return () => { - activeChild?.process.removeListener('exit', listener); - }; - } else { - // Check if it already exited recently - const exitedInfo = this.exitedPtyInfo.get(pid); - if (exitedInfo) { - callback(exitedInfo.exitCode, exitedInfo.signal); - } - return () => {}; - } + return ExecutionLifecycleService.onExit(pid, callback); } /** @@ -1000,19 +918,8 @@ export class ShellExecutionService { * @param pid The process ID to kill. */ static kill(pid: number): void { - const activePty = this.activePtys.get(pid); - const activeChild = this.activeChildProcesses.get(pid); - - if (activeChild) { - killProcessGroup({ pid }).catch(() => {}); - this.activeChildProcesses.delete(pid); - } else if (activePty) { - killProcessGroup({ pid, pty: activePty.ptyProcess }).catch(() => {}); - this.activePtys.delete(pid); - } - - this.activeResolvers.delete(pid); - this.activeListeners.delete(pid); + this.activePtys.delete(pid); + ExecutionLifecycleService.kill(pid); } /** @@ -1022,88 +929,14 @@ export class ShellExecutionService { * @param pid The process ID of the target PTY. */ static background(pid: number): void { - const resolve = this.activeResolvers.get(pid); - if (resolve) { - let output = ''; - const rawOutput = Buffer.from(''); - - const activePty = this.activePtys.get(pid); - const activeChild = this.activeChildProcesses.get(pid); - - if (activePty) { - output = getFullBufferText(activePty.headlessTerminal); - resolve({ - rawOutput, - output, - exitCode: null, - signal: null, - error: null, - aborted: false, - pid, - executionMethod: 'node-pty', - backgrounded: true, - }); - } else if (activeChild) { - output = activeChild.state.output; - - resolve({ - rawOutput, - output, - exitCode: null, - signal: null, - error: null, - aborted: false, - pid, - executionMethod: 'child_process', - backgrounded: true, - }); - } - - this.activeResolvers.delete(pid); - } + ExecutionLifecycleService.background(pid); } static subscribe( pid: number, listener: (event: ShellOutputEvent) => void, ): () => void { - if (!this.activeListeners.has(pid)) { - this.activeListeners.set(pid, new Set()); - } - this.activeListeners.get(pid)?.add(listener); - - // Send current buffer content immediately - const activePty = this.activePtys.get(pid); - const activeChild = this.activeChildProcesses.get(pid); - - if (activePty) { - // Use serializeTerminalToObject to preserve colors and structure - const endLine = activePty.headlessTerminal.buffer.active.length; - const startLine = Math.max( - 0, - endLine - (activePty.maxSerializedLines ?? 2000), - ); - const bufferData = serializeTerminalToObject( - activePty.headlessTerminal, - startLine, - endLine, - ); - if (bufferData && bufferData.length > 0) { - listener({ type: 'data', chunk: bufferData }); - } - } else if (activeChild) { - const output = activeChild.state.output; - if (output) { - listener({ type: 'data', chunk: output }); - } - } - - return () => { - this.activeListeners.get(pid)?.delete(listener); - if (this.activeListeners.get(pid)?.size === 0) { - this.activeListeners.delete(pid); - } - }; + return ExecutionLifecycleService.subscribe(pid, listener); } /** @@ -1156,10 +989,7 @@ export class ShellExecutionService { endLine, ); const event: ShellOutputEvent = { type: 'data', chunk: bufferData }; - const listeners = ShellExecutionService.activeListeners.get(pid); - if (listeners) { - listeners.forEach((listener) => listener(event)); - } + ExecutionLifecycleService.emitEvent(pid, event); } } diff --git a/packages/core/src/tools/shell.ts b/packages/core/src/tools/shell.ts index 4ea83b0af4..c21a2bddc8 100644 --- a/packages/core/src/tools/shell.ts +++ b/packages/core/src/tools/shell.ts @@ -18,6 +18,7 @@ import { Kind, type ToolInvocation, type ToolResult, + type BackgroundExecutionData, type ToolCallConfirmationDetails, type ToolExecuteConfirmationDetails, type PolicyUpdateOptions, @@ -150,7 +151,7 @@ export class ShellToolInvocation extends BaseToolInvocation< signal: AbortSignal, updateOutput?: (output: ToolLiveOutput) => void, shellExecutionConfig?: ShellExecutionConfig, - setPidCallback?: (pid: number) => void, + setExecutionIdCallback?: (executionId: number) => void, ): Promise { const strippedCommand = stripShellWrapper(this.params.command); @@ -281,8 +282,8 @@ export class ShellToolInvocation extends BaseToolInvocation< ); if (pid) { - if (setPidCallback) { - setPidCallback(pid); + if (setExecutionIdCallback) { + setExecutionIdCallback(pid); } // If the model requested to run in the background, do so after a short delay. @@ -324,7 +325,7 @@ export class ShellToolInvocation extends BaseToolInvocation< } } - let data: Record | undefined; + let data: BackgroundExecutionData | undefined; let llmContent = ''; let timeoutMessage = ''; diff --git a/packages/core/src/tools/tools.ts b/packages/core/src/tools/tools.ts index 0a82cc1510..730a40a435 100644 --- a/packages/core/src/tools/tools.ts +++ b/packages/core/src/tools/tools.ts @@ -61,15 +61,51 @@ export interface ToolInvocation< * Executes the tool with the validated parameters. * @param signal AbortSignal for tool cancellation. * @param updateOutput Optional callback to stream output. + * @param setExecutionIdCallback Optional callback for tools that expose a background execution handle. * @returns Result of the tool execution. */ execute( signal: AbortSignal, updateOutput?: (output: ToolLiveOutput) => void, shellExecutionConfig?: ShellExecutionConfig, + setExecutionIdCallback?: (executionId: number) => void, ): Promise; } +/** + * Structured payload used by tools to surface background execution metadata to + * the CLI UI. + * + * NOTE: `pid` is used as the canonical identifier for now to stay consistent + * with existing types (ExecutingToolCall.pid, ExecutionHandle.pid, etc.). + * A future rename to `executionId` is planned once the codebase is fully + * migrated — not done in this PR to keep the diff focused on the abstraction. + */ +export interface BackgroundExecutionData extends Record { + pid?: number; + command?: string; + initialOutput?: string; +} + +export function isBackgroundExecutionData( + data: unknown, +): data is BackgroundExecutionData { + if (typeof data !== 'object' || data === null) { + return false; + } + + const pid = 'pid' in data ? data.pid : undefined; + const command = 'command' in data ? data.command : undefined; + const initialOutput = + 'initialOutput' in data ? data.initialOutput : undefined; + + return ( + (pid === undefined || typeof pid === 'number') && + (command === undefined || typeof command === 'string') && + (initialOutput === undefined || typeof initialOutput === 'string') + ); +} + /** * Options for policy updates that can be customized by tool invocations. */