From e9edd6061568afb538725b2923d985613dfbd51b Mon Sep 17 00:00:00 2001 From: Adam Weidman Date: Sun, 8 Mar 2026 18:02:25 -0400 Subject: [PATCH] Make execution lifecycle the owner of background state --- packages/cli/src/ui/hooks/useGeminiStream.ts | 16 +- .../executionLifecycleService.test.ts | 296 ++-- .../src/services/executionLifecycleService.ts | 366 ++++- .../services/shellExecutionService.test.ts | 82 -- .../src/services/shellExecutionService.ts | 1269 +++++++---------- 5 files changed, 1058 insertions(+), 971 deletions(-) diff --git a/packages/cli/src/ui/hooks/useGeminiStream.ts b/packages/cli/src/ui/hooks/useGeminiStream.ts index d254902a94..9f81fc9cf0 100644 --- a/packages/cli/src/ui/hooks/useGeminiStream.ts +++ b/packages/cli/src/ui/hooks/useGeminiStream.ts @@ -94,7 +94,7 @@ type ToolResponseWithParts = ToolCallResponseInfo & { llmContent?: PartListUnion; }; -interface ShellToolData { +interface BackgroundExecutionData { pid?: number; command?: string; initialOutput?: string; @@ -111,11 +111,13 @@ const SUPPRESSED_TOOL_ERRORS_NOTE = const LOW_VERBOSITY_FAILURE_NOTE = 'This request failed. Press F12 for diagnostics, or run /settings and change "Error Verbosity" to full for full details.'; -function isShellToolData(data: unknown): data is ShellToolData { +function isBackgroundExecutionData( + data: unknown, +): data is BackgroundExecutionData { if (typeof data !== 'object' || data === null) { return false; } - const d = data as Partial; + const d = data as Partial; return ( (d.pid === undefined || typeof d.pid === 'number') && (d.command === undefined || typeof d.command === 'string') && @@ -311,7 +313,7 @@ export const useGeminiStream = ( getPreferredEditor, ); - const activeToolPtyId = useMemo(() => { + const activeToolExecutionId = useMemo(() => { const executingShellTool = toolCalls.find( (tc) => tc.status === 'executing' && tc.request.name === 'run_shell_command', @@ -347,7 +349,7 @@ export const useGeminiStream = ( setShellInputFocused, terminalWidth, terminalHeight, - activeToolPtyId, + activeToolExecutionId, ); const streamingState = useMemo( @@ -525,7 +527,7 @@ export const useGeminiStream = ( onComplete: (result: { userSelection: 'disable' | 'keep' }) => void; } | null>(null); - const activePtyId = activeShellPtyId || activeToolPtyId; + const activePtyId = activeShellPtyId || activeToolExecutionId; const prevActiveShellPtyIdRef = useRef(null); useEffect(() => { @@ -1657,7 +1659,7 @@ export const useGeminiStream = ( // Access result from the tracked tool call response const response = t.response as ToolResponseWithParts; const rawData = response?.data; - const data = isShellToolData(rawData) ? rawData : undefined; + const data = isBackgroundExecutionData(rawData) ? rawData : undefined; // Use data.pid for shell commands moved to the background. const pid = data?.pid; diff --git a/packages/core/src/services/executionLifecycleService.test.ts b/packages/core/src/services/executionLifecycleService.test.ts index 8de1bc6ba6..9e2ee9007a 100644 --- a/packages/core/src/services/executionLifecycleService.test.ts +++ b/packages/core/src/services/executionLifecycleService.test.ts @@ -4,105 +4,231 @@ * SPDX-License-Identifier: Apache-2.0 */ -import { describe, expect, it, vi } from 'vitest'; -import { - ShellExecutionService, - type ShellExecutionResult, -} from './shellExecutionService.js'; +import { beforeEach, describe, expect, it, vi } from 'vitest'; import { ExecutionLifecycleService, - type ExecutionCompletionOptions, + type ExecutionHandle, + type ExecutionResult, } from './executionLifecycleService.js'; -const createResult = (): ShellExecutionResult => ({ - rawOutput: Buffer.from(''), - output: '', - exitCode: 0, - signal: null, - error: null, - aborted: false, - pid: 123, - executionMethod: 'none', -}); +const BASE_VIRTUAL_ID = 2_000_000_000; + +function resetLifecycleState() { + ( + ExecutionLifecycleService as unknown as { + activeExecutions: Map; + activeResolvers: Map; + activeListeners: Map; + exitedExecutionInfo: Map; + nextVirtualExecutionId: number; + } + ).activeExecutions.clear(); + ( + ExecutionLifecycleService as unknown as { + activeExecutions: Map; + activeResolvers: Map; + activeListeners: Map; + exitedExecutionInfo: Map; + nextVirtualExecutionId: number; + } + ).activeResolvers.clear(); + ( + ExecutionLifecycleService as unknown as { + activeExecutions: Map; + activeResolvers: Map; + activeListeners: Map; + exitedExecutionInfo: Map; + nextVirtualExecutionId: number; + } + ).activeListeners.clear(); + ( + ExecutionLifecycleService as unknown as { + activeExecutions: Map; + activeResolvers: Map; + activeListeners: Map; + exitedExecutionInfo: Map; + nextVirtualExecutionId: number; + } + ).exitedExecutionInfo.clear(); + ( + ExecutionLifecycleService as unknown as { + activeExecutions: Map; + activeResolvers: Map; + activeListeners: Map; + exitedExecutionInfo: Map; + nextVirtualExecutionId: number; + } + ).nextVirtualExecutionId = BASE_VIRTUAL_ID; +} + +function createResult( + overrides: Partial = {}, +): ExecutionResult { + return { + rawOutput: Buffer.from(''), + output: '', + exitCode: 0, + signal: null, + error: null, + aborted: false, + pid: 123, + executionMethod: 'child_process', + ...overrides, + }; +} describe('ExecutionLifecycleService', () => { - it('creates executions through ShellExecutionService virtual execution API', () => { - const onKill = vi.fn(); - const handle = { - pid: 123, - result: Promise.resolve(createResult()), - }; - const createSpy = vi - .spyOn(ShellExecutionService, 'createVirtualExecution') - .mockReturnValue(handle); - - const created = ExecutionLifecycleService.createExecution('seed', onKill); - - expect(createSpy).toHaveBeenCalledWith('seed', onKill); - expect(created).toBe(handle); + beforeEach(() => { + resetLifecycleState(); }); - it('delegates append and completion to ShellExecutionService virtual APIs', () => { - const appendSpy = vi.spyOn(ShellExecutionService, 'appendVirtualOutput'); - const completeSpy = vi.spyOn( - ShellExecutionService, - 'completeVirtualExecution', - ); - const options: ExecutionCompletionOptions = { - exitCode: 0, - signal: null, - }; + it('completes virtual executions in the foreground and notifies exit subscribers', async () => { + const handle = ExecutionLifecycleService.createExecution(); + if (handle.pid === undefined) { + throw new Error('Expected virtual execution ID.'); + } - ExecutionLifecycleService.appendOutput(123, 'delta'); - ExecutionLifecycleService.completeExecution(123, options); - - expect(appendSpy).toHaveBeenCalledWith(123, 'delta'); - expect(completeSpy).toHaveBeenCalledWith(123, options); - }); - - it('delegates backgrounding, subscriptions, exit callbacks, and kill', () => { - const unsubscribe = vi.fn(); - const backgroundSpy = vi - .spyOn(ShellExecutionService, 'background') - .mockImplementation(() => {}); - const subscribeSpy = vi - .spyOn(ShellExecutionService, 'subscribe') - .mockReturnValue(unsubscribe); - const onExitSpy = vi - .spyOn(ShellExecutionService, 'onExit') - .mockReturnValue(unsubscribe); - const killSpy = vi - .spyOn(ShellExecutionService, 'kill') - .mockImplementation(() => {}); - - const listener = vi.fn(); const onExit = vi.fn(); - const returnedSub = ExecutionLifecycleService.subscribe(123, listener); - const returnedExit = ExecutionLifecycleService.onExit(123, onExit); - ExecutionLifecycleService.background(123); - ExecutionLifecycleService.kill(123); + const unsubscribe = ExecutionLifecycleService.onExit(handle.pid, onExit); - expect(subscribeSpy).toHaveBeenCalledWith(123, listener); - expect(onExitSpy).toHaveBeenCalledWith(123, onExit); - expect(backgroundSpy).toHaveBeenCalledWith(123); - expect(killSpy).toHaveBeenCalledWith(123); - expect(returnedSub).toBe(unsubscribe); - expect(returnedExit).toBe(unsubscribe); + ExecutionLifecycleService.appendOutput(handle.pid, 'Hello'); + ExecutionLifecycleService.appendOutput(handle.pid, ' World'); + ExecutionLifecycleService.completeExecution(handle.pid, { exitCode: 0 }); + + const result = await handle.result; + expect(result.output).toBe('Hello World'); + expect(result.executionMethod).toBe('none'); + expect(result.backgrounded).toBeUndefined(); + + await vi.waitFor(() => { + expect(onExit).toHaveBeenCalledWith(0, undefined); + }); + + unsubscribe(); }); - it('delegates active checks and input writes', () => { - const isActiveSpy = vi - .spyOn(ShellExecutionService, 'isPtyActive') - .mockReturnValue(true); - const writeSpy = vi - .spyOn(ShellExecutionService, 'writeToPty') - .mockImplementation(() => {}); + it('supports backgrounding virtual executions and continues streaming updates', async () => { + const handle = ExecutionLifecycleService.createExecution(); + if (handle.pid === undefined) { + throw new Error('Expected virtual execution ID.'); + } - const isActive = ExecutionLifecycleService.isActive(123); - ExecutionLifecycleService.writeInput(123, 'input'); + const chunks: string[] = []; + const onExit = vi.fn(); - expect(isActiveSpy).toHaveBeenCalledWith(123); - expect(writeSpy).toHaveBeenCalledWith(123, 'input'); - expect(isActive).toBe(true); + const unsubscribeStream = ExecutionLifecycleService.subscribe( + handle.pid, + (event) => { + if (event.type === 'data' && typeof event.chunk === 'string') { + chunks.push(event.chunk); + } + }, + ); + const unsubscribeExit = ExecutionLifecycleService.onExit(handle.pid, onExit); + + ExecutionLifecycleService.appendOutput(handle.pid, 'Chunk 1'); + ExecutionLifecycleService.background(handle.pid); + + const backgroundResult = await handle.result; + expect(backgroundResult.backgrounded).toBe(true); + expect(backgroundResult.output).toBe('Chunk 1'); + + ExecutionLifecycleService.appendOutput(handle.pid, '\nChunk 2'); + ExecutionLifecycleService.completeExecution(handle.pid, { exitCode: 0 }); + + await vi.waitFor(() => { + expect(chunks.join('')).toContain('Chunk 2'); + expect(onExit).toHaveBeenCalledWith(0, undefined); + }); + + unsubscribeStream(); + unsubscribeExit(); + }); + + it('kills virtual executions and resolves with aborted result', async () => { + const onKill = vi.fn(); + const handle = ExecutionLifecycleService.createExecution('', onKill); + if (handle.pid === undefined) { + throw new Error('Expected virtual execution ID.'); + } + + ExecutionLifecycleService.appendOutput(handle.pid, 'work'); + ExecutionLifecycleService.kill(handle.pid); + + const result = await handle.result; + expect(onKill).toHaveBeenCalledTimes(1); + expect(result.aborted).toBe(true); + expect(result.exitCode).toBe(130); + expect(result.error?.message).toContain('Operation cancelled by user'); + }); + + it('manages external executions through registration hooks', async () => { + const writeInput = vi.fn(); + const terminate = vi.fn(); + const isActive = vi.fn().mockReturnValue(true); + const exitListener = vi.fn(); + const chunks: string[] = []; + + let output = 'seed'; + const handle: ExecutionHandle = ExecutionLifecycleService.registerExecution( + 4321, + { + executionMethod: 'child_process', + getBackgroundOutput: () => output, + getSubscriptionSnapshot: () => output, + writeInput, + kill: terminate, + isActive, + }, + ); + + const unsubscribe = ExecutionLifecycleService.subscribe(4321, (event) => { + if (event.type === 'data' && typeof event.chunk === 'string') { + chunks.push(event.chunk); + } + }); + ExecutionLifecycleService.onExit(4321, exitListener); + + ExecutionLifecycleService.writeInput(4321, 'stdin'); + expect(writeInput).toHaveBeenCalledWith('stdin'); + expect(ExecutionLifecycleService.isActive(4321)).toBe(true); + + const firstChunk = { type: 'data', chunk: ' +delta' } as const; + ExecutionLifecycleService.emitEvent(4321, firstChunk); + output += firstChunk.chunk; + + ExecutionLifecycleService.background(4321); + const backgroundResult = await handle.result; + expect(backgroundResult.backgrounded).toBe(true); + expect(backgroundResult.output).toBe('seed +delta'); + expect(backgroundResult.executionMethod).toBe('child_process'); + + ExecutionLifecycleService.finalizeExecution( + 4321, + createResult({ + pid: 4321, + output: 'seed +delta done', + rawOutput: Buffer.from('seed +delta done'), + executionMethod: 'child_process', + }), + ); + + await vi.waitFor(() => { + expect(exitListener).toHaveBeenCalledWith(0, undefined); + }); + + const lateExit = vi.fn(); + ExecutionLifecycleService.onExit(4321, lateExit); + expect(lateExit).toHaveBeenCalledWith(0, undefined); + + unsubscribe(); + + const killHandle = ExecutionLifecycleService.registerExecution(4322, { + executionMethod: 'child_process', + kill: terminate, + }); + expect(killHandle.pid).toBe(4322); + ExecutionLifecycleService.kill(4322); + expect(terminate).toHaveBeenCalledTimes(1); }); }); diff --git a/packages/core/src/services/executionLifecycleService.ts b/packages/core/src/services/executionLifecycleService.ts index 1fe94402d1..8368e5c82f 100644 --- a/packages/core/src/services/executionLifecycleService.ts +++ b/packages/core/src/services/executionLifecycleService.ts @@ -4,11 +4,48 @@ * SPDX-License-Identifier: Apache-2.0 */ -import { - ShellExecutionService, - type ShellExecutionHandle, - type ShellOutputEvent, -} from './shellExecutionService.js'; +import type { AnsiOutput } from '../utils/terminalSerializer.js'; + +export type ExecutionMethod = + | 'lydell-node-pty' + | 'node-pty' + | 'child_process' + | 'none'; + +export interface ExecutionResult { + rawOutput: Buffer; + output: string; + exitCode: number | null; + signal: number | null; + error: Error | null; + aborted: boolean; + pid: number | undefined; + executionMethod: ExecutionMethod; + backgrounded?: boolean; +} + +export interface ExecutionHandle { + pid: number | undefined; + result: Promise; +} + +export type ExecutionOutputEvent = + | { + type: 'data'; + chunk: string | AnsiOutput; + } + | { + type: 'binary_detected'; + } + | { + type: 'binary_progress'; + bytesReceived: number; + } + | { + type: 'exit'; + exitCode: number | null; + signal: number | null; + }; export interface ExecutionCompletionOptions { exitCode?: number | null; @@ -17,58 +54,347 @@ export interface ExecutionCompletionOptions { aborted?: boolean; } +export interface ExternalExecutionRegistration { + executionMethod: ExecutionMethod; + initialOutput?: string; + getBackgroundOutput?: () => string; + getSubscriptionSnapshot?: () => string | AnsiOutput | undefined; + writeInput?: (input: string) => void; + kill?: () => void; + isActive?: () => boolean; +} + +interface ManagedExecutionState { + executionMethod: ExecutionMethod; + output: string; + isVirtual: boolean; + onKill?: () => void; + getBackgroundOutput?: () => string; + getSubscriptionSnapshot?: () => string | AnsiOutput | undefined; + writeInput?: (input: string) => void; + kill?: () => void; + isActive?: () => boolean; +} + /** - * Generic lifecycle facade for backgroundable executions. - * - * This wraps ShellExecutionService so non-shell executors (remote/local agents) - * can use neutral lifecycle naming without duplicating process-management logic. + * Central owner for execution backgrounding lifecycle across shell and tools. */ export class ExecutionLifecycleService { + private static readonly EXIT_INFO_TTL_MS = 5 * 60 * 1000; + private static nextVirtualExecutionId = 2_000_000_000; + + private static activeExecutions = new Map(); + private static activeResolvers = new Map< + number, + (result: ExecutionResult) => void + >(); + private static activeListeners = new Map< + number, + Set<(event: ExecutionOutputEvent) => void> + >(); + private static exitedExecutionInfo = new Map< + number, + { exitCode: number; signal?: number } + >(); + + private static storeExitInfo( + executionId: number, + exitCode: number, + signal?: number, + ): void { + this.exitedExecutionInfo.set(executionId, { + exitCode, + signal, + }); + setTimeout(() => { + this.exitedExecutionInfo.delete(executionId); + }, this.EXIT_INFO_TTL_MS).unref(); + } + + private static allocateVirtualExecutionId(): number { + let executionId = ++this.nextVirtualExecutionId; + while (this.activeExecutions.has(executionId)) { + executionId = ++this.nextVirtualExecutionId; + } + return executionId; + } + + private static createPendingResult(executionId: number): Promise { + return new Promise((resolve) => { + this.activeResolvers.set(executionId, resolve); + }); + } + + static registerExecution( + executionId: number, + registration: ExternalExecutionRegistration, + ): ExecutionHandle { + this.activeExecutions.set(executionId, { + executionMethod: registration.executionMethod, + output: registration.initialOutput ?? '', + isVirtual: false, + getBackgroundOutput: registration.getBackgroundOutput, + getSubscriptionSnapshot: registration.getSubscriptionSnapshot, + writeInput: registration.writeInput, + kill: registration.kill, + isActive: registration.isActive, + }); + + return { + pid: executionId, + result: this.createPendingResult(executionId), + }; + } + static createExecution( initialOutput = '', onKill?: () => void, - ): ShellExecutionHandle { - return ShellExecutionService.createVirtualExecution(initialOutput, onKill); + ): ExecutionHandle { + const executionId = this.allocateVirtualExecutionId(); + + this.activeExecutions.set(executionId, { + executionMethod: 'none', + output: initialOutput, + isVirtual: true, + onKill, + getBackgroundOutput: () => { + const state = this.activeExecutions.get(executionId); + return state?.output ?? initialOutput; + }, + getSubscriptionSnapshot: () => { + const state = this.activeExecutions.get(executionId); + return state?.output ?? initialOutput; + }, + isActive: () => true, + }); + + return { + pid: executionId, + result: this.createPendingResult(executionId), + }; } static appendOutput(executionId: number, chunk: string): void { - ShellExecutionService.appendVirtualOutput(executionId, chunk); + const execution = this.activeExecutions.get(executionId); + if (!execution || chunk.length === 0) { + return; + } + + execution.output += chunk; + this.emitEvent(executionId, { type: 'data', chunk }); + } + + static emitEvent(executionId: number, event: ExecutionOutputEvent): void { + const listeners = this.activeListeners.get(executionId); + if (listeners) { + listeners.forEach((listener) => listener(event)); + } + } + + private static resolvePending( + executionId: number, + result: ExecutionResult, + ): void { + const resolve = this.activeResolvers.get(executionId); + if (!resolve) { + return; + } + + resolve(result); + this.activeResolvers.delete(executionId); } static completeExecution( executionId: number, options?: ExecutionCompletionOptions, ): void { - ShellExecutionService.completeVirtualExecution(executionId, options); + const execution = this.activeExecutions.get(executionId); + if (!execution) { + return; + } + + const { + error = null, + aborted = false, + exitCode = error ? 1 : 0, + signal = null, + } = options ?? {}; + + const output = execution.getBackgroundOutput?.() ?? execution.output; + + this.resolvePending(executionId, { + rawOutput: Buffer.from(output, 'utf8'), + output, + exitCode, + signal, + error, + aborted, + pid: executionId, + executionMethod: execution.executionMethod, + }); + + this.emitEvent(executionId, { + type: 'exit', + exitCode, + signal, + }); + + this.activeListeners.delete(executionId); + this.activeExecutions.delete(executionId); + this.storeExitInfo(executionId, exitCode ?? 0, signal ?? undefined); + } + + static finalizeExecution( + executionId: number, + result: ExecutionResult, + ): void { + this.resolvePending(executionId, result); + + this.emitEvent(executionId, { + type: 'exit', + exitCode: result.exitCode, + signal: result.signal, + }); + + this.activeListeners.delete(executionId); + this.activeExecutions.delete(executionId); + this.storeExitInfo( + executionId, + result.exitCode ?? 0, + result.signal ?? undefined, + ); } static background(executionId: number): void { - ShellExecutionService.background(executionId); + const resolve = this.activeResolvers.get(executionId); + if (!resolve) { + return; + } + + const execution = this.activeExecutions.get(executionId); + if (!execution) { + return; + } + + const output = execution.getBackgroundOutput?.() ?? execution.output; + + resolve({ + rawOutput: Buffer.from(''), + output, + exitCode: null, + signal: null, + error: null, + aborted: false, + pid: executionId, + executionMethod: execution.executionMethod, + backgrounded: true, + }); + + this.activeResolvers.delete(executionId); } static subscribe( executionId: number, - listener: (event: ShellOutputEvent) => void, + listener: (event: ExecutionOutputEvent) => void, ): () => void { - return ShellExecutionService.subscribe(executionId, listener); + if (!this.activeListeners.has(executionId)) { + this.activeListeners.set(executionId, new Set()); + } + this.activeListeners.get(executionId)?.add(listener); + + const execution = this.activeExecutions.get(executionId); + if (execution) { + const snapshot = + execution.getSubscriptionSnapshot?.() ?? + (execution.output.length > 0 ? execution.output : undefined); + if (snapshot && (typeof snapshot !== 'string' || snapshot.length > 0)) { + listener({ type: 'data', chunk: snapshot }); + } + } + + return () => { + this.activeListeners.get(executionId)?.delete(listener); + if (this.activeListeners.get(executionId)?.size === 0) { + this.activeListeners.delete(executionId); + } + }; } static onExit( executionId: number, callback: (exitCode: number, signal?: number) => void, ): () => void { - return ShellExecutionService.onExit(executionId, callback); + if (this.activeExecutions.has(executionId)) { + const listener = (event: ExecutionOutputEvent) => { + if (event.type === 'exit') { + callback(event.exitCode ?? 0, event.signal ?? undefined); + unsubscribe(); + } + }; + const unsubscribe = this.subscribe(executionId, listener); + return unsubscribe; + } + + const exitedInfo = this.exitedExecutionInfo.get(executionId); + if (exitedInfo) { + callback(exitedInfo.exitCode, exitedInfo.signal); + } + + return () => {}; } static kill(executionId: number): void { - ShellExecutionService.kill(executionId); + const execution = this.activeExecutions.get(executionId); + if (!execution) { + return; + } + + if (execution.isVirtual) { + execution.onKill?.(); + this.completeExecution(executionId, { + error: new Error('Operation cancelled by user.'), + aborted: true, + exitCode: 130, + }); + return; + } + + execution.kill?.(); + this.activeResolvers.delete(executionId); + this.activeListeners.delete(executionId); + this.activeExecutions.delete(executionId); } static isActive(executionId: number): boolean { - return ShellExecutionService.isPtyActive(executionId); + const execution = this.activeExecutions.get(executionId); + if (!execution) { + try { + return process.kill(executionId, 0); + } catch { + return false; + } + } + + if (execution.isVirtual) { + return true; + } + + if (execution.isActive) { + try { + return execution.isActive(); + } catch { + return false; + } + } + + try { + return process.kill(executionId, 0); + } catch { + return false; + } } static writeInput(executionId: number, input: string): void { - ShellExecutionService.writeToPty(executionId, input); + this.activeExecutions.get(executionId)?.writeInput?.(input); } } diff --git a/packages/core/src/services/shellExecutionService.test.ts b/packages/core/src/services/shellExecutionService.test.ts index 0d6194888f..34c95dd4c7 100644 --- a/packages/core/src/services/shellExecutionService.test.ts +++ b/packages/core/src/services/shellExecutionService.test.ts @@ -1641,85 +1641,3 @@ describe('ShellExecutionService environment variables', () => { await new Promise(process.nextTick); }); }); - -describe('ShellExecutionService virtual executions', () => { - it('completes a virtual execution in the foreground', async () => { - const { pid, result } = ShellExecutionService.createVirtualExecution(); - if (pid === undefined) { - throw new Error('Expected virtual pid to be defined.'); - } - const onExit = vi.fn(); - const unsubscribe = ShellExecutionService.onExit(pid, onExit); - - ShellExecutionService.appendVirtualOutput(pid, 'Hello'); - ShellExecutionService.appendVirtualOutput(pid, ' World'); - ShellExecutionService.completeVirtualExecution(pid, { exitCode: 0 }); - - const executionResult = await result; - - expect(executionResult.output).toBe('Hello World'); - expect(executionResult.backgrounded).toBeUndefined(); - expect(executionResult.exitCode).toBe(0); - expect(executionResult.error).toBeNull(); - - await vi.waitFor(() => { - expect(onExit).toHaveBeenCalledWith(0, undefined); - }); - - unsubscribe(); - }); - - it('supports backgrounding virtual executions and streaming additional output', async () => { - const { pid, result } = ShellExecutionService.createVirtualExecution(); - if (pid === undefined) { - throw new Error('Expected virtual pid to be defined.'); - } - const chunks: string[] = []; - const onExit = vi.fn(); - - const unsubscribeStream = ShellExecutionService.subscribe(pid, (event) => { - if (event.type === 'data' && typeof event.chunk === 'string') { - chunks.push(event.chunk); - } - }); - const unsubscribeExit = ShellExecutionService.onExit(pid, onExit); - - ShellExecutionService.appendVirtualOutput(pid, 'Chunk 1'); - ShellExecutionService.background(pid); - - const backgroundResult = await result; - expect(backgroundResult.backgrounded).toBe(true); - expect(backgroundResult.output).toBe('Chunk 1'); - - ShellExecutionService.appendVirtualOutput(pid, '\nChunk 2'); - ShellExecutionService.completeVirtualExecution(pid, { exitCode: 0 }); - - await vi.waitFor(() => { - expect(chunks.join('')).toContain('Chunk 2'); - expect(onExit).toHaveBeenCalledWith(0, undefined); - }); - - unsubscribeStream(); - unsubscribeExit(); - }); - - it('kills virtual executions via the existing kill API', async () => { - const onKill = vi.fn(); - const { pid, result } = ShellExecutionService.createVirtualExecution( - '', - onKill, - ); - if (pid === undefined) { - throw new Error('Expected virtual pid to be defined.'); - } - - ShellExecutionService.appendVirtualOutput(pid, 'work'); - ShellExecutionService.kill(pid); - - const killResult = await result; - expect(onKill).toHaveBeenCalledTimes(1); - expect(killResult.aborted).toBe(true); - expect(killResult.exitCode).toBe(130); - expect(killResult.error?.message).toContain('Operation cancelled by user'); - }); -}); diff --git a/packages/core/src/services/shellExecutionService.ts b/packages/core/src/services/shellExecutionService.ts index 397dba1d9b..4fef30cdd0 100644 --- a/packages/core/src/services/shellExecutionService.ts +++ b/packages/core/src/services/shellExecutionService.ts @@ -6,8 +6,9 @@ import stripAnsi from 'strip-ansi'; import { getPty, type PtyImplementation } from '../utils/getPty.js'; -import { spawn as cpSpawn, type ChildProcess } from 'node:child_process'; +import { spawn as cpSpawn } from 'node:child_process'; import { TextDecoder } from 'node:util'; +import type { Writable } from 'node:stream'; import os from 'node:os'; import type { IPty } from '@lydell/node-pty'; import { getCachedEncodingForBuffer } from '../utils/systemEncoding.js'; @@ -27,6 +28,7 @@ import { type EnvironmentSanitizationConfig, } from './environmentSanitization.js'; import { killProcessGroup } from '../utils/process-utils.js'; +import { ExecutionLifecycleService } from './executionLifecycleService.js'; const { Terminal } = pkg; const MAX_CHILD_PROCESS_BUFFER_SIZE = 16 * 1024 * 1024; // 16MB @@ -143,31 +145,6 @@ interface ActivePty { maxSerializedLines?: number; } -interface ActiveChildProcess { - process: ChildProcess; - state: { - output: string; - truncated: boolean; - outputChunks: Buffer[]; - }; -} - -interface ActiveVirtualProcessState { - output: string; - onKill?: () => void; -} - -type ActiveManagedProcess = - | { - kind: 'child'; - process: ChildProcess; - state: ActiveChildProcess['state']; - } - | { - kind: 'virtual'; - state: ActiveVirtualProcessState; - }; - const getFullBufferText = (terminal: pkg.Terminal): string => { const buffer = terminal.buffer.active; const lines: string[] = []; @@ -213,20 +190,6 @@ const getFullBufferText = (terminal: pkg.Terminal): string => { export class ShellExecutionService { private static activePtys = new Map(); - private static activeProcesses = new Map(); - private static exitedPtyInfo = new Map< - number, - { exitCode: number; signal?: number } - >(); - private static activeResolvers = new Map< - number, - (res: ShellExecutionResult) => void - >(); - private static activeListeners = new Map< - number, - Set<(event: ShellOutputEvent) => void> - >(); - private static nextVirtualPid = 2_000_000_000; /** * Executes a shell command using `node-pty`, capturing all output and lifecycle events. * @@ -302,68 +265,15 @@ export class ShellExecutionService { return { newBuffer: truncatedBuffer + chunk, truncated: true }; } - private static emitEvent(pid: number, event: ShellOutputEvent): void { - const listeners = this.activeListeners.get(pid); - if (listeners) { - listeners.forEach((listener) => listener(event)); - } - } - - private static getActiveChildProcess( - pid: number, - ): ActiveChildProcess | undefined { - const activeProcess = this.activeProcesses.get(pid); - if (activeProcess?.kind !== 'child') { - return undefined; - } - return { process: activeProcess.process, state: activeProcess.state }; - } - - private static getActiveVirtualProcess( - pid: number, - ): ActiveVirtualProcessState | undefined { - const activeProcess = this.activeProcesses.get(pid); - if (activeProcess?.kind !== 'virtual') { - return undefined; - } - return activeProcess.state; - } - - private static allocateVirtualPid(): number { - let pid = ++this.nextVirtualPid; - while (this.activePtys.has(pid) || this.activeProcesses.has(pid)) { - pid = ++this.nextVirtualPid; - } - return pid; - } - static createVirtualExecution( initialOutput = '', onKill?: () => void, ): ShellExecutionHandle { - const pid = this.allocateVirtualPid(); - this.activeProcesses.set(pid, { - kind: 'virtual', - state: { - output: initialOutput, - onKill, - }, - }); - - const result = new Promise((resolve) => { - this.activeResolvers.set(pid, resolve); - }); - - return { pid, result }; + return ExecutionLifecycleService.createExecution(initialOutput, onKill); } static appendVirtualOutput(pid: number, chunk: string): void { - const virtual = this.getActiveVirtualProcess(pid); - if (!virtual || chunk.length === 0) { - return; - } - virtual.output += chunk; - this.emitEvent(pid, { type: 'data', chunk }); + ExecutionLifecycleService.appendOutput(pid, chunk); } static completeVirtualExecution( @@ -375,51 +285,7 @@ export class ShellExecutionService { aborted?: boolean; }, ): void { - const virtual = this.getActiveVirtualProcess(pid); - if (!virtual) { - return; - } - - const { - error = null, - aborted = false, - exitCode = error ? 1 : 0, - signal = null, - } = options ?? {}; - - const resolve = this.activeResolvers.get(pid); - if (resolve) { - resolve({ - rawOutput: Buffer.from(virtual.output, 'utf8'), - output: virtual.output, - exitCode, - signal, - error, - aborted, - pid, - executionMethod: 'none', - }); - this.activeResolvers.delete(pid); - } - - this.emitEvent(pid, { - type: 'exit', - exitCode, - signal, - }); - this.activeListeners.delete(pid); - this.activeProcesses.delete(pid); - - this.exitedPtyInfo.set(pid, { - exitCode: exitCode ?? 0, - signal: signal ?? undefined, - }); - setTimeout( - () => { - this.exitedPtyInfo.delete(pid); - }, - 5 * 60 * 1000, - ).unref(); + ExecutionLifecycleService.completeExecution(pid, options); } private static childProcessFallback( @@ -457,200 +323,221 @@ export class ShellExecutionService { outputChunks: [] as Buffer[], }; - if (child.pid) { - this.activeProcesses.set(child.pid, { - kind: 'child', - process: child, - state, - }); - } + const lifecycleHandle = child.pid + ? ExecutionLifecycleService.registerExecution(child.pid, { + executionMethod: 'child_process', + getBackgroundOutput: () => state.output, + getSubscriptionSnapshot: () => state.output || undefined, + writeInput: (input) => { + const stdin = child.stdin as Writable | null; + if (stdin) { + stdin.write(input); + } + }, + kill: () => { + if (child.pid) { + killProcessGroup({ pid: child.pid }).catch(() => {}); + } + }, + isActive: () => { + if (!child.pid) { + return false; + } + try { + return process.kill(child.pid, 0); + } catch { + return false; + } + }, + }) + : undefined; - const result = new Promise((resolve) => { - if (child.pid) { - this.activeResolvers.set(child.pid, resolve); + let resolveWithoutPid: ((result: ShellExecutionResult) => void) | undefined; + const result = + lifecycleHandle?.result ?? + new Promise((resolve) => { + resolveWithoutPid = resolve; + }); + + let stdoutDecoder: TextDecoder | null = null; + let stderrDecoder: TextDecoder | null = null; + let error: Error | null = null; + let exited = false; + + let isStreamingRawContent = true; + const MAX_SNIFF_SIZE = 4096; + let sniffedBytes = 0; + + const handleOutput = (data: Buffer, stream: 'stdout' | 'stderr') => { + if (!stdoutDecoder || !stderrDecoder) { + const encoding = getCachedEncodingForBuffer(data); + try { + stdoutDecoder = new TextDecoder(encoding); + stderrDecoder = new TextDecoder(encoding); + } catch { + stdoutDecoder = new TextDecoder('utf-8'); + stderrDecoder = new TextDecoder('utf-8'); + } } - let stdoutDecoder: TextDecoder | null = null; - let stderrDecoder: TextDecoder | null = null; - let error: Error | null = null; - let exited = false; + state.outputChunks.push(data); - let isStreamingRawContent = true; - const MAX_SNIFF_SIZE = 4096; - let sniffedBytes = 0; + if (isStreamingRawContent && sniffedBytes < MAX_SNIFF_SIZE) { + const sniffBuffer = Buffer.concat(state.outputChunks.slice(0, 20)); + sniffedBytes = sniffBuffer.length; - const handleOutput = (data: Buffer, stream: 'stdout' | 'stderr') => { - if (!stdoutDecoder || !stderrDecoder) { - const encoding = getCachedEncodingForBuffer(data); - try { - stdoutDecoder = new TextDecoder(encoding); - stderrDecoder = new TextDecoder(encoding); - } catch { - stdoutDecoder = new TextDecoder('utf-8'); - stderrDecoder = new TextDecoder('utf-8'); + if (isBinary(sniffBuffer)) { + isStreamingRawContent = false; + const event: ShellOutputEvent = { type: 'binary_detected' }; + onOutputEvent(event); + if (child.pid) { + ExecutionLifecycleService.emitEvent(child.pid, event); } } + } - state.outputChunks.push(data); + if (isStreamingRawContent) { + const decoder = stream === 'stdout' ? stdoutDecoder : stderrDecoder; + const decodedChunk = decoder.decode(data, { stream: true }); - if (isStreamingRawContent && sniffedBytes < MAX_SNIFF_SIZE) { - const sniffBuffer = Buffer.concat(state.outputChunks.slice(0, 20)); - sniffedBytes = sniffBuffer.length; - - if (isBinary(sniffBuffer)) { - isStreamingRawContent = false; - const event: ShellOutputEvent = { type: 'binary_detected' }; - onOutputEvent(event); - if (child.pid) ShellExecutionService.emitEvent(child.pid, event); - } + const { newBuffer, truncated } = this.appendAndTruncate( + state.output, + decodedChunk, + MAX_CHILD_PROCESS_BUFFER_SIZE, + ); + state.output = newBuffer; + if (truncated) { + state.truncated = true; } - if (isStreamingRawContent) { - const decoder = stream === 'stdout' ? stdoutDecoder : stderrDecoder; - const decodedChunk = decoder.decode(data, { stream: true }); - - const { newBuffer, truncated } = this.appendAndTruncate( - state.output, - decodedChunk, - MAX_CHILD_PROCESS_BUFFER_SIZE, - ); - state.output = newBuffer; - if (truncated) { - state.truncated = true; - } - - if (decodedChunk) { - const event: ShellOutputEvent = { - type: 'data', - chunk: decodedChunk, - }; - onOutputEvent(event); - if (child.pid) ShellExecutionService.emitEvent(child.pid, event); - } - } else { - const totalBytes = state.outputChunks.reduce( - (sum, chunk) => sum + chunk.length, - 0, - ); + if (decodedChunk) { const event: ShellOutputEvent = { - type: 'binary_progress', - bytesReceived: totalBytes, + type: 'data', + chunk: decodedChunk, }; onOutputEvent(event); - if (child.pid) ShellExecutionService.emitEvent(child.pid, event); + if (child.pid) { + ExecutionLifecycleService.emitEvent(child.pid, event); + } } + } else { + const totalBytes = state.outputChunks.reduce( + (sum, chunk) => sum + chunk.length, + 0, + ); + const event: ShellOutputEvent = { + type: 'binary_progress', + bytesReceived: totalBytes, + }; + onOutputEvent(event); + if (child.pid) { + ExecutionLifecycleService.emitEvent(child.pid, event); + } + } + }; + + const handleExit = (code: number | null, signal: NodeJS.Signals | null) => { + const { finalBuffer } = cleanup(); + + let combinedOutput = state.output; + if (state.truncated) { + const truncationMessage = `\n[GEMINI_CLI_WARNING: Output truncated. The buffer is limited to ${ + MAX_CHILD_PROCESS_BUFFER_SIZE / (1024 * 1024) + }MB.]`; + combinedOutput += truncationMessage; + } + + const finalStrippedOutput = stripAnsi(combinedOutput).trim(); + const exitCode = code; + const exitSignal = signal ? os.constants.signals[signal] : null; + + const resultPayload: ShellExecutionResult = { + rawOutput: finalBuffer, + output: finalStrippedOutput, + exitCode, + signal: exitSignal, + error, + aborted: abortSignal.aborted, + pid: child.pid, + executionMethod: 'child_process', }; - const handleExit = ( - code: number | null, - signal: NodeJS.Signals | null, - ) => { - const { finalBuffer } = cleanup(); - - let combinedOutput = state.output; - - if (state.truncated) { - const truncationMessage = `\n[GEMINI_CLI_WARNING: Output truncated. The buffer is limited to ${ - MAX_CHILD_PROCESS_BUFFER_SIZE / (1024 * 1024) - }MB.]`; - combinedOutput += truncationMessage; - } - - const finalStrippedOutput = stripAnsi(combinedOutput).trim(); - const exitCode = code; - const exitSignal = signal ? os.constants.signals[signal] : null; - - if (child.pid) { - const event: ShellOutputEvent = { - type: 'exit', - exitCode, - signal: exitSignal, - }; - onOutputEvent(event); - ShellExecutionService.emitEvent(child.pid, event); - - this.activeProcesses.delete(child.pid); - this.activeResolvers.delete(child.pid); - this.activeListeners.delete(child.pid); - } - - resolve({ - rawOutput: finalBuffer, - output: finalStrippedOutput, + if (child.pid) { + const event: ShellOutputEvent = { + type: 'exit', exitCode, signal: exitSignal, - error, - aborted: abortSignal.aborted, - pid: child.pid, - executionMethod: 'child_process', - }); - }; - - child.stdout.on('data', (data) => handleOutput(data, 'stdout')); - child.stderr.on('data', (data) => handleOutput(data, 'stderr')); - child.on('error', (err) => { - error = err; - handleExit(1, null); - }); - - const abortHandler = async () => { - if (child.pid && !exited) { - await killProcessGroup({ - pid: child.pid, - escalate: true, - isExited: () => exited, - }); - } - }; - - abortSignal.addEventListener('abort', abortHandler, { once: true }); - - child.on('exit', (code, signal) => { - handleExit(code, signal); - }); - - function cleanup() { - exited = true; - abortSignal.removeEventListener('abort', abortHandler); - if (stdoutDecoder) { - const remaining = stdoutDecoder.decode(); - if (remaining) { - state.output += remaining; - // If there's remaining output, we should technically emit it too, - // but it's rare to have partial utf8 chars at the very end of stream. - if (isStreamingRawContent && remaining) { - const event: ShellOutputEvent = { - type: 'data', - chunk: remaining, - }; - onOutputEvent(event); - if (child.pid) - ShellExecutionService.emitEvent(child.pid, event); - } - } - } - if (stderrDecoder) { - const remaining = stderrDecoder.decode(); - if (remaining) { - state.output += remaining; - if (isStreamingRawContent && remaining) { - const event: ShellOutputEvent = { - type: 'data', - chunk: remaining, - }; - onOutputEvent(event); - if (child.pid) - ShellExecutionService.emitEvent(child.pid, event); - } - } - } - - const finalBuffer = Buffer.concat(state.outputChunks); - - return { finalBuffer }; + }; + onOutputEvent(event); + ExecutionLifecycleService.finalizeExecution(child.pid, resultPayload); + } else { + resolveWithoutPid?.(resultPayload); } + }; + + child.stdout.on('data', (data) => handleOutput(data, 'stdout')); + child.stderr.on('data', (data) => handleOutput(data, 'stderr')); + child.on('error', (err) => { + error = err; + handleExit(1, null); }); + const abortHandler = async () => { + if (child.pid && !exited) { + await killProcessGroup({ + pid: child.pid, + escalate: true, + isExited: () => exited, + }); + } + }; + + abortSignal.addEventListener('abort', abortHandler, { once: true }); + + child.on('exit', (code, signal) => { + handleExit(code, signal); + }); + + function cleanup() { + exited = true; + abortSignal.removeEventListener('abort', abortHandler); + if (stdoutDecoder) { + const remaining = stdoutDecoder.decode(); + if (remaining) { + state.output += remaining; + if (isStreamingRawContent) { + const event: ShellOutputEvent = { + type: 'data', + chunk: remaining, + }; + onOutputEvent(event); + if (child.pid) { + ExecutionLifecycleService.emitEvent(child.pid, event); + } + } + } + } + if (stderrDecoder) { + const remaining = stderrDecoder.decode(); + if (remaining) { + state.output += remaining; + if (isStreamingRawContent) { + const event: ShellOutputEvent = { + type: 'data', + chunk: remaining, + }; + onOutputEvent(event); + if (child.pid) { + ExecutionLifecycleService.emitEvent(child.pid, event); + } + } + } + } + + const finalBuffer = Buffer.concat(state.outputChunks); + return { finalBuffer }; + } + return { pid: child.pid, result }; } catch (e) { // eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion @@ -716,300 +603,315 @@ export class ShellExecutionService { }, handleFlowControl: true, }); + const ptyPid = Number(ptyProcess.pid); - const result = new Promise((resolve) => { - this.activeResolvers.set(ptyProcess.pid, resolve); + const headlessTerminal = new Terminal({ + allowProposedApi: true, + cols, + rows, + scrollback: shellExecutionConfig.scrollback ?? SCROLLBACK_LIMIT, + }); + headlessTerminal.scrollToTop(); - const headlessTerminal = new Terminal({ - allowProposedApi: true, - cols, - rows, - scrollback: shellExecutionConfig.scrollback ?? SCROLLBACK_LIMIT, - }); - headlessTerminal.scrollToTop(); + this.activePtys.set(ptyPid, { + // eslint-disable-next-line @typescript-eslint/no-unsafe-assignment + ptyProcess, + headlessTerminal, + maxSerializedLines: shellExecutionConfig.maxSerializedLines, + }); - this.activePtys.set(ptyProcess.pid, { - // eslint-disable-next-line @typescript-eslint/no-unsafe-assignment - ptyProcess, - headlessTerminal, - maxSerializedLines: shellExecutionConfig.maxSerializedLines, - }); - - let processingChain = Promise.resolve(); - let decoder: TextDecoder | null = null; - let output: string | AnsiOutput | null = null; - const outputChunks: Buffer[] = []; - const error: Error | null = null; - let exited = false; - - let isStreamingRawContent = true; - const MAX_SNIFF_SIZE = 4096; - let sniffedBytes = 0; - let isWriting = false; - let hasStartedOutput = false; - let renderTimeout: NodeJS.Timeout | null = null; - - const renderFn = () => { - renderTimeout = null; - - if (!isStreamingRawContent) { + const result = ExecutionLifecycleService.registerExecution(ptyPid, { + executionMethod: ptyInfo?.name ?? 'node-pty', + writeInput: (input) => { + if (!ExecutionLifecycleService.isActive(ptyPid)) { return; } - - if (!shellExecutionConfig.disableDynamicLineTrimming) { - if (!hasStartedOutput) { - const bufferText = getFullBufferText(headlessTerminal); - if (bufferText.trim().length === 0) { - return; - } - hasStartedOutput = true; - } + ptyProcess.write(input); + }, + kill: () => { + killProcessGroup({ + pid: ptyPid, + // eslint-disable-next-line @typescript-eslint/no-unsafe-assignment + pty: ptyProcess, + }).catch(() => {}); + this.activePtys.delete(ptyPid); + }, + isActive: () => { + try { + return process.kill(ptyPid, 0); + } catch { + return false; } - - const buffer = headlessTerminal.buffer.active; - const endLine = buffer.length; + }, + getBackgroundOutput: () => getFullBufferText(headlessTerminal), + getSubscriptionSnapshot: () => { + const endLine = headlessTerminal.buffer.active.length; const startLine = Math.max( 0, endLine - (shellExecutionConfig.maxSerializedLines ?? 2000), ); - - let newOutput: AnsiOutput; - if (shellExecutionConfig.showColor) { - newOutput = serializeTerminalToObject( - headlessTerminal, - startLine, - endLine, - ); - } else { - newOutput = ( - serializeTerminalToObject(headlessTerminal, startLine, endLine) || - [] - ).map((line) => - line.map((token) => { - token.fg = ''; - token.bg = ''; - return token; - }), - ); - } - - let lastNonEmptyLine = -1; - for (let i = newOutput.length - 1; i >= 0; i--) { - const line = newOutput[i]; - if ( - line - .map((segment) => segment.text) - .join('') - .trim().length > 0 - ) { - lastNonEmptyLine = i; - break; - } - } - - const absoluteCursorY = buffer.baseY + buffer.cursorY; - const cursorRelativeIndex = absoluteCursorY - startLine; - - if (cursorRelativeIndex > lastNonEmptyLine) { - lastNonEmptyLine = cursorRelativeIndex; - } - - const trimmedOutput = newOutput.slice(0, lastNonEmptyLine + 1); - - const finalOutput = shellExecutionConfig.disableDynamicLineTrimming - ? newOutput - : trimmedOutput; - - if (output !== finalOutput) { - output = finalOutput; - const event: ShellOutputEvent = { - type: 'data', - chunk: finalOutput, - }; - onOutputEvent(event); - ShellExecutionService.emitEvent(ptyProcess.pid, event); - } - }; - - const render = (finalRender = false) => { - if (finalRender) { - if (renderTimeout) { - clearTimeout(renderTimeout); - } - renderFn(); - return; - } - - if (renderTimeout) { - return; - } - - renderTimeout = setTimeout(() => { - renderFn(); - renderTimeout = null; - }, 68); - }; - - headlessTerminal.onScroll(() => { - if (!isWriting) { - render(); - } - }); - - const handleOutput = (data: Buffer) => { - processingChain = processingChain.then( - () => - new Promise((resolve) => { - if (!decoder) { - const encoding = getCachedEncodingForBuffer(data); - try { - decoder = new TextDecoder(encoding); - } catch { - decoder = new TextDecoder('utf-8'); - } - } - - outputChunks.push(data); - - if (isStreamingRawContent && sniffedBytes < MAX_SNIFF_SIZE) { - const sniffBuffer = Buffer.concat(outputChunks.slice(0, 20)); - sniffedBytes = sniffBuffer.length; - - if (isBinary(sniffBuffer)) { - isStreamingRawContent = false; - const event: ShellOutputEvent = { type: 'binary_detected' }; - onOutputEvent(event); - ShellExecutionService.emitEvent(ptyProcess.pid, event); - } - } - - if (isStreamingRawContent) { - const decodedChunk = decoder.decode(data, { stream: true }); - if (decodedChunk.length === 0) { - resolve(); - return; - } - isWriting = true; - headlessTerminal.write(decodedChunk, () => { - render(); - isWriting = false; - resolve(); - }); - } else { - const totalBytes = outputChunks.reduce( - (sum, chunk) => sum + chunk.length, - 0, - ); - const event: ShellOutputEvent = { - type: 'binary_progress', - bytesReceived: totalBytes, - }; - onOutputEvent(event); - ShellExecutionService.emitEvent(ptyProcess.pid, event); - resolve(); - } - }), + const bufferData = serializeTerminalToObject( + headlessTerminal, + startLine, + endLine, ); - }; + return bufferData.length > 0 ? bufferData : undefined; + }, + }).result; - ptyProcess.onData((data: string) => { - const bufferData = Buffer.from(data, 'utf-8'); - handleOutput(bufferData); - }); + let processingChain = Promise.resolve(); + let decoder: TextDecoder | null = null; + let output: string | AnsiOutput | null = null; + const outputChunks: Buffer[] = []; + const error: Error | null = null; + let exited = false; - ptyProcess.onExit( - ({ exitCode, signal }: { exitCode: number; signal?: number }) => { - exited = true; - abortSignal.removeEventListener('abort', abortHandler); - this.activePtys.delete(ptyProcess.pid); - // Attempt to destroy the PTY to ensure FD is closed - try { - // eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion - (ptyProcess as IPty & { destroy?: () => void }).destroy?.(); - } catch { - // Ignore errors during cleanup - } + let isStreamingRawContent = true; + const MAX_SNIFF_SIZE = 4096; + let sniffedBytes = 0; + let isWriting = false; + let hasStartedOutput = false; + let renderTimeout: NodeJS.Timeout | null = null; - const finalize = () => { - render(true); + const renderFn = () => { + renderTimeout = null; - // Store exit info for late subscribers (e.g. backgrounding race condition) - this.exitedPtyInfo.set(ptyProcess.pid, { exitCode, signal }); - setTimeout( - () => { - this.exitedPtyInfo.delete(ptyProcess.pid); - }, - 5 * 60 * 1000, - ).unref(); + if (!isStreamingRawContent) { + return; + } - this.activePtys.delete(ptyProcess.pid); - this.activeResolvers.delete(ptyProcess.pid); - - const event: ShellOutputEvent = { - type: 'exit', - exitCode, - signal: signal ?? null, - }; - onOutputEvent(event); - ShellExecutionService.emitEvent(ptyProcess.pid, event); - this.activeListeners.delete(ptyProcess.pid); - - const finalBuffer = Buffer.concat(outputChunks); - - resolve({ - rawOutput: finalBuffer, - output: getFullBufferText(headlessTerminal), - exitCode, - signal: signal ?? null, - error, - aborted: abortSignal.aborted, - // eslint-disable-next-line @typescript-eslint/no-unsafe-assignment - pid: ptyProcess.pid, - executionMethod: ptyInfo?.name ?? 'node-pty', - }); - }; - - if (abortSignal.aborted) { - finalize(); + if (!shellExecutionConfig.disableDynamicLineTrimming) { + if (!hasStartedOutput) { + const bufferText = getFullBufferText(headlessTerminal); + if (bufferText.trim().length === 0) { return; } + hasStartedOutput = true; + } + } - const processingComplete = processingChain.then(() => 'processed'); - const abortFired = new Promise<'aborted'>((res) => { - if (abortSignal.aborted) { - res('aborted'); - return; - } - abortSignal.addEventListener('abort', () => res('aborted'), { - once: true, - }); - }); - - // eslint-disable-next-line @typescript-eslint/no-floating-promises - Promise.race([processingComplete, abortFired]).then(() => { - finalize(); - }); - }, + const buffer = headlessTerminal.buffer.active; + const endLine = buffer.length; + const startLine = Math.max( + 0, + endLine - (shellExecutionConfig.maxSerializedLines ?? 2000), ); - const abortHandler = async () => { - if (ptyProcess.pid && !exited) { - await killProcessGroup({ - // eslint-disable-next-line @typescript-eslint/no-unsafe-assignment - pid: ptyProcess.pid, - escalate: true, - isExited: () => exited, - // eslint-disable-next-line @typescript-eslint/no-unsafe-assignment - pty: ptyProcess, - }); - } - }; + let newOutput: AnsiOutput; + if (shellExecutionConfig.showColor) { + newOutput = serializeTerminalToObject( + headlessTerminal, + startLine, + endLine, + ); + } else { + newOutput = ( + serializeTerminalToObject(headlessTerminal, startLine, endLine) || [] + ).map((line) => + line.map((token) => { + token.fg = ''; + token.bg = ''; + return token; + }), + ); + } - abortSignal.addEventListener('abort', abortHandler, { once: true }); + let lastNonEmptyLine = -1; + for (let i = newOutput.length - 1; i >= 0; i--) { + const line = newOutput[i]; + if ( + line + .map((segment) => segment.text) + .join('') + .trim().length > 0 + ) { + lastNonEmptyLine = i; + break; + } + } + + const absoluteCursorY = buffer.baseY + buffer.cursorY; + const cursorRelativeIndex = absoluteCursorY - startLine; + + if (cursorRelativeIndex > lastNonEmptyLine) { + lastNonEmptyLine = cursorRelativeIndex; + } + + const trimmedOutput = newOutput.slice(0, lastNonEmptyLine + 1); + const finalOutput = shellExecutionConfig.disableDynamicLineTrimming + ? newOutput + : trimmedOutput; + + if (output !== finalOutput) { + output = finalOutput; + const event: ShellOutputEvent = { + type: 'data', + chunk: finalOutput, + }; + onOutputEvent(event); + ExecutionLifecycleService.emitEvent(ptyPid, event); + } + }; + + const render = (finalRender = false) => { + if (finalRender) { + if (renderTimeout) { + clearTimeout(renderTimeout); + } + renderFn(); + return; + } + + if (renderTimeout) { + return; + } + + renderTimeout = setTimeout(() => { + renderFn(); + renderTimeout = null; + }, 68); + }; + + headlessTerminal.onScroll(() => { + if (!isWriting) { + render(); + } }); - // eslint-disable-next-line @typescript-eslint/no-unsafe-assignment - return { pid: ptyProcess.pid, result }; + const handleOutput = (data: Buffer) => { + processingChain = processingChain.then( + () => + new Promise((resolveChunk) => { + if (!decoder) { + const encoding = getCachedEncodingForBuffer(data); + try { + decoder = new TextDecoder(encoding); + } catch { + decoder = new TextDecoder('utf-8'); + } + } + + outputChunks.push(data); + + if (isStreamingRawContent && sniffedBytes < MAX_SNIFF_SIZE) { + const sniffBuffer = Buffer.concat(outputChunks.slice(0, 20)); + sniffedBytes = sniffBuffer.length; + + if (isBinary(sniffBuffer)) { + isStreamingRawContent = false; + const event: ShellOutputEvent = { type: 'binary_detected' }; + onOutputEvent(event); + ExecutionLifecycleService.emitEvent(ptyPid, event); + } + } + + if (isStreamingRawContent) { + const decodedChunk = decoder.decode(data, { stream: true }); + if (decodedChunk.length === 0) { + resolveChunk(); + return; + } + isWriting = true; + headlessTerminal.write(decodedChunk, () => { + render(); + isWriting = false; + resolveChunk(); + }); + } else { + const totalBytes = outputChunks.reduce( + (sum, chunk) => sum + chunk.length, + 0, + ); + const event: ShellOutputEvent = { + type: 'binary_progress', + bytesReceived: totalBytes, + }; + onOutputEvent(event); + ExecutionLifecycleService.emitEvent(ptyPid, event); + resolveChunk(); + } + }), + ); + }; + + ptyProcess.onData((data: string) => { + const bufferData = Buffer.from(data, 'utf-8'); + handleOutput(bufferData); + }); + + ptyProcess.onExit( + ({ exitCode, signal }: { exitCode: number; signal?: number }) => { + exited = true; + abortSignal.removeEventListener('abort', abortHandler); + this.activePtys.delete(ptyPid); + try { + // eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion + (ptyProcess as IPty & { destroy?: () => void }).destroy?.(); + } catch { + // Ignore errors during cleanup + } + + const finalize = () => { + render(true); + this.activePtys.delete(ptyPid); + + const event: ShellOutputEvent = { + type: 'exit', + exitCode, + signal: signal ?? null, + }; + onOutputEvent(event); + + ExecutionLifecycleService.finalizeExecution(ptyPid, { + rawOutput: Buffer.concat(outputChunks), + output: getFullBufferText(headlessTerminal), + exitCode, + signal: signal ?? null, + error, + aborted: abortSignal.aborted, + pid: ptyPid, + executionMethod: ptyInfo?.name ?? 'node-pty', + }); + }; + + if (abortSignal.aborted) { + finalize(); + return; + } + + const processingComplete = processingChain.then(() => 'processed'); + const abortFired = new Promise<'aborted'>((res) => { + if (abortSignal.aborted) { + res('aborted'); + return; + } + abortSignal.addEventListener('abort', () => res('aborted'), { + once: true, + }); + }); + + // eslint-disable-next-line @typescript-eslint/no-floating-promises + Promise.race([processingComplete, abortFired]).then(() => { + finalize(); + }); + }, + ); + + const abortHandler = async () => { + if (ptyProcess.pid && !exited) { + await killProcessGroup({ + pid: ptyPid, + escalate: true, + isExited: () => exited, + // eslint-disable-next-line @typescript-eslint/no-unsafe-assignment + pty: ptyProcess, + }); + } + }; + + abortSignal.addEventListener('abort', abortHandler, { once: true }); + + return { pid: ptyPid, result }; } catch (e) { // eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion const error = e as Error; @@ -1045,42 +947,11 @@ export class ShellExecutionService { * @param input The string to write to the terminal. */ static writeToPty(pid: number, input: string): void { - const activeChild = this.getActiveChildProcess(pid); - if (activeChild) { - activeChild.process.stdin?.write(input); - return; - } - - if (!this.isPtyActive(pid)) { - return; - } - - const activePty = this.activePtys.get(pid); - if (activePty) { - activePty.ptyProcess.write(input); - } + ExecutionLifecycleService.writeInput(pid, input); } static isPtyActive(pid: number): boolean { - if (this.getActiveVirtualProcess(pid)) { - return true; - } - - if (this.getActiveChildProcess(pid)) { - try { - return process.kill(pid, 0); - } catch { - return false; - } - } - - try { - // process.kill with signal 0 is a way to check for the existence of a process. - // It doesn't actually send a signal. - return process.kill(pid, 0); - } catch (_) { - return false; - } + return ExecutionLifecycleService.isActive(pid); } /** @@ -1095,49 +966,7 @@ export class ShellExecutionService { pid: number, callback: (exitCode: number, signal?: number) => void, ): () => void { - const activePty = this.activePtys.get(pid); - if (activePty) { - const disposable = activePty.ptyProcess.onExit( - ({ exitCode, signal }: { exitCode: number; signal?: number }) => { - callback(exitCode, signal); - disposable.dispose(); - }, - ); - return () => disposable.dispose(); - } - - const activeChild = this.getActiveChildProcess(pid); - if (activeChild) { - const listener = (code: number | null, signal: NodeJS.Signals | null) => { - let signalNumber: number | undefined; - if (signal) { - signalNumber = os.constants.signals[signal]; - } - callback(code ?? 0, signalNumber); - }; - activeChild?.process.on('exit', listener); - return () => { - activeChild?.process.removeListener('exit', listener); - }; - } - - if (this.getActiveVirtualProcess(pid)) { - const listener = (event: ShellOutputEvent) => { - if (event.type === 'exit') { - callback(event.exitCode ?? 0, event.signal ?? undefined); - unsubscribe(); - } - }; - const unsubscribe = this.subscribe(pid, listener); - return unsubscribe; - } - - // Check if it already exited recently - const exitedInfo = this.exitedPtyInfo.get(pid); - if (exitedInfo) { - callback(exitedInfo.exitCode, exitedInfo.signal); - } - return () => {}; + return ExecutionLifecycleService.onExit(pid, callback); } /** @@ -1146,28 +975,8 @@ export class ShellExecutionService { * @param pid The process ID to kill. */ static kill(pid: number): void { - const activePty = this.activePtys.get(pid); - const activeChild = this.getActiveChildProcess(pid); - const activeVirtual = this.getActiveVirtualProcess(pid); - - if (activeVirtual) { - activeVirtual.onKill?.(); - this.completeVirtualExecution(pid, { - error: new Error('Operation cancelled by user.'), - aborted: true, - exitCode: 130, - }); - return; - } else if (activeChild) { - killProcessGroup({ pid }).catch(() => {}); - this.activeProcesses.delete(pid); - } else if (activePty) { - killProcessGroup({ pid, pty: activePty.ptyProcess }).catch(() => {}); - this.activePtys.delete(pid); - } - - this.activeResolvers.delete(pid); - this.activeListeners.delete(pid); + this.activePtys.delete(pid); + ExecutionLifecycleService.kill(pid); } /** @@ -1177,105 +986,14 @@ export class ShellExecutionService { * @param pid The process ID of the target PTY. */ static background(pid: number): void { - const resolve = this.activeResolvers.get(pid); - if (resolve) { - let output = ''; - const rawOutput = Buffer.from(''); - - const activePty = this.activePtys.get(pid); - const activeChild = this.getActiveChildProcess(pid); - const activeVirtual = this.getActiveVirtualProcess(pid); - - if (activePty) { - output = getFullBufferText(activePty.headlessTerminal); - resolve({ - rawOutput, - output, - exitCode: null, - signal: null, - error: null, - aborted: false, - pid, - executionMethod: 'node-pty', - backgrounded: true, - }); - } else if (activeChild) { - output = activeChild.state.output; - - resolve({ - rawOutput, - output, - exitCode: null, - signal: null, - error: null, - aborted: false, - pid, - executionMethod: 'child_process', - backgrounded: true, - }); - } else if (activeVirtual) { - output = activeVirtual.output; - resolve({ - rawOutput, - output, - exitCode: null, - signal: null, - error: null, - aborted: false, - pid, - executionMethod: 'none', - backgrounded: true, - }); - } - - this.activeResolvers.delete(pid); - } + ExecutionLifecycleService.background(pid); } static subscribe( pid: number, listener: (event: ShellOutputEvent) => void, ): () => void { - if (!this.activeListeners.has(pid)) { - this.activeListeners.set(pid, new Set()); - } - this.activeListeners.get(pid)?.add(listener); - - // Send current buffer content immediately - const activePty = this.activePtys.get(pid); - const activeChild = this.getActiveChildProcess(pid); - const activeVirtual = this.getActiveVirtualProcess(pid); - - if (activePty) { - // Use serializeTerminalToObject to preserve colors and structure - const endLine = activePty.headlessTerminal.buffer.active.length; - const startLine = Math.max( - 0, - endLine - (activePty.maxSerializedLines ?? 2000), - ); - const bufferData = serializeTerminalToObject( - activePty.headlessTerminal, - startLine, - endLine, - ); - if (bufferData && bufferData.length > 0) { - listener({ type: 'data', chunk: bufferData }); - } - } else if (activeChild) { - const output = activeChild.state.output; - if (output) { - listener({ type: 'data', chunk: output }); - } - } else if (activeVirtual?.output) { - listener({ type: 'data', chunk: activeVirtual.output }); - } - - return () => { - this.activeListeners.get(pid)?.delete(listener); - if (this.activeListeners.get(pid)?.size === 0) { - this.activeListeners.delete(pid); - } - }; + return ExecutionLifecycleService.subscribe(pid, listener); } /** @@ -1328,10 +1046,7 @@ export class ShellExecutionService { endLine, ); const event: ShellOutputEvent = { type: 'data', chunk: bufferData }; - const listeners = ShellExecutionService.activeListeners.get(pid); - if (listeners) { - listeners.forEach((listener) => listener(event)); - } + ExecutionLifecycleService.emitEvent(pid, event); } }