Refactor managed execution state for virtual backgrounding

This commit is contained in:
Adam Weidman
2026-03-08 17:14:41 -04:00
parent ba90cc2538
commit b2c7fe1367
4 changed files with 171 additions and 133 deletions

View File

@@ -313,9 +313,9 @@ export const useGeminiStream = (
const activeToolPtyId = useMemo(() => {
const executingBackgroundableTool = toolCalls.find(
(tc): tc is TrackedExecutingToolCall =>
tc.status === CoreToolCallStatus.Executing &&
typeof tc.pid === 'number',
(toolCall): toolCall is TrackedExecutingToolCall =>
toolCall.status === CoreToolCallStatus.Executing &&
typeof toolCall.pid === 'number',
);
return executingBackgroundableTool?.pid;
}, [toolCalls]);

View File

@@ -432,13 +432,21 @@ describe('ShellExecutionService', () => {
});
describe('pty interaction', () => {
let activePtysGetSpy: { mockRestore: () => void };
beforeEach(() => {
vi.spyOn(ShellExecutionService['activePtys'], 'get').mockReturnValue({
// eslint-disable-next-line @typescript-eslint/no-explicit-any
ptyProcess: mockPtyProcess as any,
// eslint-disable-next-line @typescript-eslint/no-explicit-any
headlessTerminal: mockHeadlessTerminal as any,
});
activePtysGetSpy = vi
.spyOn(ShellExecutionService['activePtys'], 'get')
.mockReturnValue({
// eslint-disable-next-line @typescript-eslint/no-explicit-any
ptyProcess: mockPtyProcess as any,
// eslint-disable-next-line @typescript-eslint/no-explicit-any
headlessTerminal: mockHeadlessTerminal as any,
});
});
afterEach(() => {
activePtysGetSpy.mockRestore();
});
it('should write to the pty and trigger a render', async () => {
@@ -1633,3 +1641,85 @@ describe('ShellExecutionService environment variables', () => {
await new Promise(process.nextTick);
});
});
describe('ShellExecutionService virtual executions', () => {
it('completes a virtual execution in the foreground', async () => {
const { pid, result } = ShellExecutionService.createVirtualExecution();
if (pid === undefined) {
throw new Error('Expected virtual pid to be defined.');
}
const onExit = vi.fn();
const unsubscribe = ShellExecutionService.onExit(pid, onExit);
ShellExecutionService.appendVirtualOutput(pid, 'Hello');
ShellExecutionService.appendVirtualOutput(pid, ' World');
ShellExecutionService.completeVirtualExecution(pid, { exitCode: 0 });
const executionResult = await result;
expect(executionResult.output).toBe('Hello World');
expect(executionResult.backgrounded).toBeUndefined();
expect(executionResult.exitCode).toBe(0);
expect(executionResult.error).toBeNull();
await vi.waitFor(() => {
expect(onExit).toHaveBeenCalledWith(0, undefined);
});
unsubscribe();
});
it('supports backgrounding virtual executions and streaming additional output', async () => {
const { pid, result } = ShellExecutionService.createVirtualExecution();
if (pid === undefined) {
throw new Error('Expected virtual pid to be defined.');
}
const chunks: string[] = [];
const onExit = vi.fn();
const unsubscribeStream = ShellExecutionService.subscribe(pid, (event) => {
if (event.type === 'data' && typeof event.chunk === 'string') {
chunks.push(event.chunk);
}
});
const unsubscribeExit = ShellExecutionService.onExit(pid, onExit);
ShellExecutionService.appendVirtualOutput(pid, 'Chunk 1');
ShellExecutionService.background(pid);
const backgroundResult = await result;
expect(backgroundResult.backgrounded).toBe(true);
expect(backgroundResult.output).toBe('Chunk 1');
ShellExecutionService.appendVirtualOutput(pid, '\nChunk 2');
ShellExecutionService.completeVirtualExecution(pid, { exitCode: 0 });
await vi.waitFor(() => {
expect(chunks.join('')).toContain('Chunk 2');
expect(onExit).toHaveBeenCalledWith(0, undefined);
});
unsubscribeStream();
unsubscribeExit();
});
it('kills virtual executions via the existing kill API', async () => {
const onKill = vi.fn();
const { pid, result } = ShellExecutionService.createVirtualExecution(
'',
onKill,
);
if (pid === undefined) {
throw new Error('Expected virtual pid to be defined.');
}
ShellExecutionService.appendVirtualOutput(pid, 'work');
ShellExecutionService.kill(pid);
const killResult = await result;
expect(onKill).toHaveBeenCalledTimes(1);
expect(killResult.aborted).toBe(true);
expect(killResult.exitCode).toBe(130);
expect(killResult.error?.message).toContain('Operation cancelled by user');
});
});

View File

@@ -152,11 +152,22 @@ interface ActiveChildProcess {
};
}
interface ActiveVirtualProcess {
interface ActiveVirtualProcessState {
output: string;
onKill?: () => void;
}
type ActiveManagedProcess =
| {
kind: 'child';
process: ChildProcess;
state: ActiveChildProcess['state'];
}
| {
kind: 'virtual';
state: ActiveVirtualProcessState;
};
const getFullBufferText = (terminal: pkg.Terminal): string => {
const buffer = terminal.buffer.active;
const lines: string[] = [];
@@ -202,11 +213,7 @@ const getFullBufferText = (terminal: pkg.Terminal): string => {
export class ShellExecutionService {
private static activePtys = new Map<number, ActivePty>();
private static activeChildProcesses = new Map<number, ActiveChildProcess>();
private static activeVirtualProcesses = new Map<
number,
ActiveVirtualProcess
>();
private static activeProcesses = new Map<number, ActiveManagedProcess>();
private static exitedPtyInfo = new Map<
number,
{ exitCode: number; signal?: number }
@@ -302,13 +309,29 @@ export class ShellExecutionService {
}
}
private static getActiveChildProcess(
pid: number,
): ActiveChildProcess | undefined {
const activeProcess = this.activeProcesses.get(pid);
if (activeProcess?.kind !== 'child') {
return undefined;
}
return { process: activeProcess.process, state: activeProcess.state };
}
private static getActiveVirtualProcess(
pid: number,
): ActiveVirtualProcessState | undefined {
const activeProcess = this.activeProcesses.get(pid);
if (activeProcess?.kind !== 'virtual') {
return undefined;
}
return activeProcess.state;
}
private static allocateVirtualPid(): number {
let pid = ++this.nextVirtualPid;
while (
this.activePtys.has(pid) ||
this.activeChildProcesses.has(pid) ||
this.activeVirtualProcesses.has(pid)
) {
while (this.activePtys.has(pid) || this.activeProcesses.has(pid)) {
pid = ++this.nextVirtualPid;
}
return pid;
@@ -319,9 +342,12 @@ export class ShellExecutionService {
onKill?: () => void,
): ShellExecutionHandle {
const pid = this.allocateVirtualPid();
this.activeVirtualProcesses.set(pid, {
output: initialOutput,
onKill,
this.activeProcesses.set(pid, {
kind: 'virtual',
state: {
output: initialOutput,
onKill,
},
});
const result = new Promise<ShellExecutionResult>((resolve) => {
@@ -332,7 +358,7 @@ export class ShellExecutionService {
}
static appendVirtualOutput(pid: number, chunk: string): void {
const virtual = this.activeVirtualProcesses.get(pid);
const virtual = this.getActiveVirtualProcess(pid);
if (!virtual || chunk.length === 0) {
return;
}
@@ -349,7 +375,7 @@ export class ShellExecutionService {
aborted?: boolean;
},
): void {
const virtual = this.activeVirtualProcesses.get(pid);
const virtual = this.getActiveVirtualProcess(pid);
if (!virtual) {
return;
}
@@ -382,7 +408,7 @@ export class ShellExecutionService {
signal,
});
this.activeListeners.delete(pid);
this.activeVirtualProcesses.delete(pid);
this.activeProcesses.delete(pid);
this.exitedPtyInfo.set(pid, {
exitCode: exitCode ?? 0,
@@ -432,7 +458,8 @@ export class ShellExecutionService {
};
if (child.pid) {
this.activeChildProcesses.set(child.pid, {
this.activeProcesses.set(child.pid, {
kind: 'child',
process: child,
state,
});
@@ -542,7 +569,7 @@ export class ShellExecutionService {
onOutputEvent(event);
ShellExecutionService.emitEvent(child.pid, event);
this.activeChildProcesses.delete(child.pid);
this.activeProcesses.delete(child.pid);
this.activeResolvers.delete(child.pid);
this.activeListeners.delete(child.pid);
}
@@ -1018,11 +1045,9 @@ export class ShellExecutionService {
* @param input The string to write to the terminal.
*/
static writeToPty(pid: number, input: string): void {
if (this.activeChildProcesses.has(pid)) {
const activeChild = this.activeChildProcesses.get(pid);
if (activeChild) {
activeChild.process.stdin?.write(input);
}
const activeChild = this.getActiveChildProcess(pid);
if (activeChild) {
activeChild.process.stdin?.write(input);
return;
}
@@ -1037,11 +1062,11 @@ export class ShellExecutionService {
}
static isPtyActive(pid: number): boolean {
if (this.activeVirtualProcesses.has(pid)) {
if (this.getActiveVirtualProcess(pid)) {
return true;
}
if (this.activeChildProcesses.has(pid)) {
if (this.getActiveChildProcess(pid)) {
try {
return process.kill(pid, 0);
} catch {
@@ -1079,8 +1104,10 @@ export class ShellExecutionService {
},
);
return () => disposable.dispose();
} else if (this.activeChildProcesses.has(pid)) {
const activeChild = this.activeChildProcesses.get(pid);
}
const activeChild = this.getActiveChildProcess(pid);
if (activeChild) {
const listener = (code: number | null, signal: NodeJS.Signals | null) => {
let signalNumber: number | undefined;
if (signal) {
@@ -1092,7 +1119,9 @@ export class ShellExecutionService {
return () => {
activeChild?.process.removeListener('exit', listener);
};
} else if (this.activeVirtualProcesses.has(pid)) {
}
if (this.getActiveVirtualProcess(pid)) {
const listener = (event: ShellOutputEvent) => {
if (event.type === 'exit') {
callback(event.exitCode ?? 0, event.signal ?? undefined);
@@ -1101,14 +1130,14 @@ export class ShellExecutionService {
};
const unsubscribe = this.subscribe(pid, listener);
return unsubscribe;
} else {
// Check if it already exited recently
const exitedInfo = this.exitedPtyInfo.get(pid);
if (exitedInfo) {
callback(exitedInfo.exitCode, exitedInfo.signal);
}
return () => {};
}
// Check if it already exited recently
const exitedInfo = this.exitedPtyInfo.get(pid);
if (exitedInfo) {
callback(exitedInfo.exitCode, exitedInfo.signal);
}
return () => {};
}
/**
@@ -1118,8 +1147,8 @@ export class ShellExecutionService {
*/
static kill(pid: number): void {
const activePty = this.activePtys.get(pid);
const activeChild = this.activeChildProcesses.get(pid);
const activeVirtual = this.activeVirtualProcesses.get(pid);
const activeChild = this.getActiveChildProcess(pid);
const activeVirtual = this.getActiveVirtualProcess(pid);
if (activeVirtual) {
activeVirtual.onKill?.();
@@ -1131,7 +1160,7 @@ export class ShellExecutionService {
return;
} else if (activeChild) {
killProcessGroup({ pid }).catch(() => {});
this.activeChildProcesses.delete(pid);
this.activeProcesses.delete(pid);
} else if (activePty) {
killProcessGroup({ pid, pty: activePty.ptyProcess }).catch(() => {});
this.activePtys.delete(pid);
@@ -1154,8 +1183,8 @@ export class ShellExecutionService {
const rawOutput = Buffer.from('');
const activePty = this.activePtys.get(pid);
const activeChild = this.activeChildProcesses.get(pid);
const activeVirtual = this.activeVirtualProcesses.get(pid);
const activeChild = this.getActiveChildProcess(pid);
const activeVirtual = this.getActiveVirtualProcess(pid);
if (activePty) {
output = getFullBufferText(activePty.headlessTerminal);
@@ -1214,8 +1243,8 @@ export class ShellExecutionService {
// Send current buffer content immediately
const activePty = this.activePtys.get(pid);
const activeChild = this.activeChildProcesses.get(pid);
const activeVirtual = this.activeVirtualProcesses.get(pid);
const activeChild = this.getActiveChildProcess(pid);
const activeVirtual = this.getActiveVirtualProcess(pid);
if (activePty) {
// Use serializeTerminalToObject to preserve colors and structure

View File

@@ -1,81 +0,0 @@
/**
* @license
* Copyright 2026 Google LLC
* SPDX-License-Identifier: Apache-2.0
*/
import { describe, it, expect, vi } from 'vitest';
import { ShellExecutionService } from './shellExecutionService.js';
describe('ShellExecutionService virtual executions', () => {
it('completes a virtual execution in the foreground', async () => {
const { pid, result } = ShellExecutionService.createVirtualExecution();
const onExit = vi.fn();
const unsubscribe = ShellExecutionService.onExit(pid!, onExit);
ShellExecutionService.appendVirtualOutput(pid!, 'Hello');
ShellExecutionService.appendVirtualOutput(pid!, ' World');
ShellExecutionService.completeVirtualExecution(pid!, { exitCode: 0 });
const executionResult = await result;
expect(executionResult.output).toBe('Hello World');
expect(executionResult.backgrounded).toBeUndefined();
expect(executionResult.exitCode).toBe(0);
expect(executionResult.error).toBeNull();
await vi.waitFor(() => {
expect(onExit).toHaveBeenCalledWith(0, undefined);
});
unsubscribe();
});
it('supports backgrounding virtual executions and streaming additional output', async () => {
const { pid, result } = ShellExecutionService.createVirtualExecution();
const chunks: string[] = [];
const onExit = vi.fn();
const unsubscribeStream = ShellExecutionService.subscribe(pid!, (event) => {
if (event.type === 'data' && typeof event.chunk === 'string') {
chunks.push(event.chunk);
}
});
const unsubscribeExit = ShellExecutionService.onExit(pid!, onExit);
ShellExecutionService.appendVirtualOutput(pid!, 'Chunk 1');
ShellExecutionService.background(pid!);
const backgroundResult = await result;
expect(backgroundResult.backgrounded).toBe(true);
expect(backgroundResult.output).toBe('Chunk 1');
ShellExecutionService.appendVirtualOutput(pid!, '\nChunk 2');
ShellExecutionService.completeVirtualExecution(pid!, { exitCode: 0 });
await vi.waitFor(() => {
expect(chunks.join('')).toContain('Chunk 2');
expect(onExit).toHaveBeenCalledWith(0, undefined);
});
unsubscribeStream();
unsubscribeExit();
});
it('kills virtual executions via the existing kill API', async () => {
const onKill = vi.fn();
const { pid, result } = ShellExecutionService.createVirtualExecution(
'',
onKill,
);
ShellExecutionService.appendVirtualOutput(pid!, 'work');
ShellExecutionService.kill(pid!);
const killResult = await result;
expect(onKill).toHaveBeenCalledTimes(1);
expect(killResult.aborted).toBe(true);
expect(killResult.exitCode).toBe(130);
expect(killResult.error?.message).toContain('Operation cancelled by user');
});
});