feat: implement background process logging and cleanup (#21189)

This commit is contained in:
Gal Zahavi
2026-03-10 17:13:20 -07:00
committed by GitHub
parent 7c4570339e
commit 524679d23c
15 changed files with 724 additions and 141 deletions
@@ -13,6 +13,7 @@ import {
afterEach,
type Mock,
} from 'vitest';
import EventEmitter from 'node:events';
import type { Readable } from 'node:stream';
import { type ChildProcess } from 'node:child_process';
@@ -28,14 +29,44 @@ const mockPtySpawn = vi.hoisted(() => vi.fn());
const mockCpSpawn = vi.hoisted(() => vi.fn());
const mockIsBinary = vi.hoisted(() => vi.fn());
const mockPlatform = vi.hoisted(() => vi.fn());
const mockHomedir = vi.hoisted(() => vi.fn());
const mockMkdirSync = vi.hoisted(() => vi.fn());
const mockCreateWriteStream = vi.hoisted(() => vi.fn());
const mockGetPty = vi.hoisted(() => vi.fn());
const mockSerializeTerminalToObject = vi.hoisted(() => vi.fn());
const mockResolveExecutable = vi.hoisted(() => vi.fn());
const mockDebugLogger = vi.hoisted(() => ({
log: vi.fn(),
warn: vi.fn(),
error: vi.fn(),
debug: vi.fn(),
}));
// Top-level Mocks
vi.mock('../config/storage.js', () => ({
Storage: {
getGlobalTempDir: vi.fn().mockReturnValue('/mock/temp'),
},
}));
vi.mock('../utils/debugLogger.js', () => ({
debugLogger: mockDebugLogger,
}));
vi.mock('@lydell/node-pty', () => ({
spawn: mockPtySpawn,
}));
vi.mock('node:fs', async (importOriginal) => {
const actual = await importOriginal<typeof import('node:fs')>();
return {
...actual,
default: {
...actual,
mkdirSync: mockMkdirSync,
createWriteStream: mockCreateWriteStream,
},
mkdirSync: mockMkdirSync,
createWriteStream: mockCreateWriteStream,
};
});
vi.mock('../utils/shell-utils.js', async (importOriginal) => {
const actual =
await importOriginal<typeof import('../utils/shell-utils.js')>();
@@ -57,6 +88,7 @@ vi.mock('../utils/textUtils.js', () => ({
vi.mock('node:os', () => ({
default: {
platform: mockPlatform,
homedir: mockHomedir,
constants: {
signals: {
SIGTERM: 15,
@@ -65,6 +97,7 @@ vi.mock('node:os', () => ({
},
},
platform: mockPlatform,
homedir: mockHomedir,
constants: {
signals: {
SIGTERM: 15,
@@ -159,6 +192,8 @@ describe('ShellExecutionService', () => {
buffer: {
active: {
viewportY: number;
length: number;
getLine: Mock;
};
};
};
@@ -201,6 +236,8 @@ describe('ShellExecutionService', () => {
buffer: {
active: {
viewportY: 0,
length: 0,
getLine: vi.fn(),
},
},
};
@@ -432,13 +469,20 @@ describe('ShellExecutionService', () => {
});
describe('pty interaction', () => {
let ptySpy: { mockRestore(): void };
beforeEach(() => {
vi.spyOn(ShellExecutionService['activePtys'], 'get').mockReturnValue({
// eslint-disable-next-line @typescript-eslint/no-explicit-any
ptyProcess: mockPtyProcess as any,
// eslint-disable-next-line @typescript-eslint/no-explicit-any
headlessTerminal: mockHeadlessTerminal as any,
});
ptySpy = vi
.spyOn(ShellExecutionService['activePtys'], 'get')
.mockReturnValue({
// eslint-disable-next-line @typescript-eslint/no-explicit-any
ptyProcess: mockPtyProcess as any,
// eslint-disable-next-line @typescript-eslint/no-explicit-any
headlessTerminal: mockHeadlessTerminal as any,
});
});
afterEach(() => {
ptySpy.mockRestore();
});
it('should write to the pty and trigger a render', async () => {
@@ -667,6 +711,163 @@ describe('ShellExecutionService', () => {
});
});
describe('Backgrounding', () => {
let mockWriteStream: { write: Mock; end: Mock; on: Mock };
let mockBgChildProcess: EventEmitter & Partial<ChildProcess>;
beforeEach(async () => {
mockWriteStream = {
write: vi.fn(),
end: vi.fn().mockImplementation((cb) => cb?.()),
on: vi.fn(),
};
mockMkdirSync.mockReturnValue(undefined);
mockCreateWriteStream.mockReturnValue(
// eslint-disable-next-line @typescript-eslint/no-explicit-any
mockWriteStream as any,
);
mockHomedir.mockReturnValue('/mock/home');
mockBgChildProcess = new EventEmitter() as EventEmitter &
Partial<ChildProcess>;
mockBgChildProcess.stdout = new EventEmitter() as Readable;
mockBgChildProcess.stderr = new EventEmitter() as Readable;
mockBgChildProcess.kill = vi.fn();
Object.defineProperty(mockBgChildProcess, 'pid', {
value: 99999,
configurable: true,
});
mockCpSpawn.mockReturnValue(mockBgChildProcess);
// Explicitly clear state between runs
// eslint-disable-next-line @typescript-eslint/no-explicit-any
(ShellExecutionService as any).backgroundLogStreams.clear();
// eslint-disable-next-line @typescript-eslint/no-explicit-any
(ShellExecutionService as any).activePtys.clear();
// eslint-disable-next-line @typescript-eslint/no-explicit-any
(ShellExecutionService as any).activeChildProcesses.clear();
});
afterEach(() => {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
(ShellExecutionService as any).backgroundLogStreams.clear();
});
it('should move a running pty process to the background and start logging', async () => {
const abortController = new AbortController();
const handle = await ShellExecutionService.execute(
'long-running-pty',
'/',
onOutputEventMock,
abortController.signal,
true,
shellExecutionConfig,
);
// Use the registered onData listener
const onDataListener = mockPtyProcess.onData.mock.calls[0][0];
onDataListener('initial pty output');
// Wait for async write to headless terminal
await new Promise((resolve) => setTimeout(resolve, 100));
mockSerializeTerminalToObject.mockReturnValue([
[{ text: 'initial pty output', fg: '', bg: '' }],
]);
// Background the process
ShellExecutionService.background(handle.pid!);
const result = await handle.result;
expect(result.backgrounded).toBe(true);
expect(result.output).toContain('initial pty output');
expect(mockMkdirSync).toHaveBeenCalledWith(
expect.stringContaining('background-processes'),
{ recursive: true },
);
// Verify initial output was written
expect(
mockWriteStream.write.mock.calls.some((call) =>
call[0].includes('initial pty output'),
),
).toBe(true);
await ShellExecutionService.kill(handle.pid!);
expect(mockWriteStream.end).toHaveBeenCalled();
});
it('should continue logging after backgrounding for child_process', async () => {
mockGetPty.mockResolvedValue(null); // Force child_process fallback
const abortController = new AbortController();
const handle = await ShellExecutionService.execute(
'long-running-cp',
'/',
onOutputEventMock,
abortController.signal,
true,
shellExecutionConfig,
);
// Trigger data before backgrounding
mockBgChildProcess.stdout?.emit('data', Buffer.from('initial cp output'));
await new Promise((resolve) => process.nextTick(resolve));
ShellExecutionService.background(handle.pid!);
const result = await handle.result;
expect(result.backgrounded).toBe(true);
expect(result.output).toBe('initial cp output');
expect(
mockWriteStream.write.mock.calls.some((call) =>
call[0].includes('initial cp output'),
),
).toBe(true);
// Subsequent output
mockBgChildProcess.stdout?.emit('data', Buffer.from('more cp output'));
await new Promise((resolve) => process.nextTick(resolve));
expect(mockWriteStream.write).toHaveBeenCalledWith('more cp output');
await ShellExecutionService.kill(handle.pid!);
expect(mockWriteStream.end).toHaveBeenCalled();
});
it('should log a warning if background log setup fails', async () => {
const abortController = new AbortController();
const handle = await ShellExecutionService.execute(
'failing-log-setup',
'/',
onOutputEventMock,
abortController.signal,
true,
shellExecutionConfig,
);
// Mock mkdirSync to fail
const error = new Error('Permission denied');
mockMkdirSync.mockImplementationOnce(() => {
throw error;
});
// Background the process
ShellExecutionService.background(handle.pid!);
const result = await handle.result;
expect(result.backgrounded).toBe(true);
expect(mockDebugLogger.warn).toHaveBeenCalledWith(
'Failed to setup background logging:',
error,
);
await ShellExecutionService.kill(handle.pid!);
});
});
describe('Binary Output', () => {
it('should detect binary output and switch to progress events', async () => {
mockIsBinary.mockReturnValueOnce(true);
@@ -894,7 +1095,7 @@ describe('ShellExecutionService', () => {
'destroy',
);
ShellExecutionService.kill(pid);
await ShellExecutionService.kill(pid);
expect(storedDestroySpy).toHaveBeenCalled();
expect(ShellExecutionService['activePtys'].has(pid)).toBe(false);
@@ -974,7 +1175,10 @@ describe('ShellExecutionService child_process fallback', () => {
// Helper function to run a standard execution simulation
const simulateExecution = async (
command: string,
simulation: (cp: typeof mockChildProcess, ac: AbortController) => void,
simulation: (
cp: typeof mockChildProcess,
ac: AbortController,
) => void | Promise<void>,
) => {
const abortController = new AbortController();
const handle = await ShellExecutionService.execute(
@@ -987,7 +1191,7 @@ describe('ShellExecutionService child_process fallback', () => {
);
await new Promise((resolve) => process.nextTick(resolve));
simulation(mockChildProcess, abortController);
await simulation(mockChildProcess, abortController);
const result = await handle.result;
return { result, handle, abortController };
};
@@ -1315,9 +1519,9 @@ describe('ShellExecutionService child_process fallback', () => {
describe('Platform-Specific Behavior', () => {
it('should use powershell.exe on Windows', async () => {
mockPlatform.mockReturnValue('win32');
await simulateExecution('dir "foo bar"', (cp) =>
cp.emit('exit', 0, null),
);
await simulateExecution('dir "foo bar"', (cp) => {
cp.emit('exit', 0, null);
});
expect(mockCpSpawn).toHaveBeenCalledWith(
'powershell.exe',
@@ -1332,7 +1536,9 @@ describe('ShellExecutionService child_process fallback', () => {
it('should use bash and detached process group on Linux', async () => {
mockPlatform.mockReturnValue('linux');
await simulateExecution('ls "foo bar"', (cp) => cp.emit('exit', 0, null));
await simulateExecution('ls "foo bar"', (cp) => {
cp.emit('exit', 0, null);
});
expect(mockCpSpawn).toHaveBeenCalledWith(
'bash',
@@ -9,6 +9,8 @@ import { getPty, type PtyImplementation } from '../utils/getPty.js';
import { spawn as cpSpawn, type ChildProcess } from 'node:child_process';
import { TextDecoder } from 'node:util';
import os from 'node:os';
import fs, { mkdirSync } from 'node:fs';
import path from 'node:path';
import type { IPty } from '@lydell/node-pty';
import { getCachedEncodingForBuffer } from '../utils/systemEncoding.js';
import {
@@ -18,6 +20,8 @@ import {
} from '../utils/shell-utils.js';
import { isBinary } from '../utils/textUtils.js';
import pkg from '@xterm/headless';
import { debugLogger } from '../utils/debugLogger.js';
import { Storage } from '../config/storage.js';
import {
serializeTerminalToObject,
type AnsiOutput,
@@ -152,20 +156,37 @@ interface ActiveChildProcess {
};
}
const getFullBufferText = (terminal: pkg.Terminal): string => {
const findLastContentLine = (
buffer: pkg.IBuffer,
startLine: number,
): number => {
const lineCount = buffer.length;
for (let i = lineCount - 1; i >= startLine; i--) {
const line = buffer.getLine(i);
if (line && line.translateToString(true).length > 0) {
return i;
}
}
return -1;
};
const getFullBufferText = (terminal: pkg.Terminal, startLine = 0): string => {
const buffer = terminal.buffer.active;
const lines: string[] = [];
for (let i = 0; i < buffer.length; i++) {
const lastContentLine = findLastContentLine(buffer, startLine);
if (lastContentLine === -1 || lastContentLine < startLine) return '';
for (let i = startLine; i <= lastContentLine; i++) {
const line = buffer.getLine(i);
if (!line) {
lines.push('');
continue;
}
// If the NEXT line is wrapped, it means it's a continuation of THIS line.
// We should not trim the right side of this line because trailing spaces
// might be significant parts of the wrapped content.
// If it's not wrapped, we trim normally.
let trimRight = true;
if (i + 1 < buffer.length) {
if (i + 1 <= lastContentLine) {
const nextLine = buffer.getLine(i + 1);
if (nextLine?.isWrapped) {
trimRight = false;
@@ -181,12 +202,56 @@ const getFullBufferText = (terminal: pkg.Terminal): string => {
}
}
// Remove trailing empty lines
while (lines.length > 0 && lines[lines.length - 1] === '') {
lines.pop();
return lines.join('\n');
};
const writeBufferToLogStream = (
terminal: pkg.Terminal,
stream: fs.WriteStream,
startLine = 0,
): number => {
const buffer = terminal.buffer.active;
const lastContentLine = findLastContentLine(buffer, startLine);
if (lastContentLine === -1 || lastContentLine < startLine) return startLine;
for (let i = startLine; i <= lastContentLine; i++) {
const line = buffer.getLine(i);
if (!line) {
stream.write('\n');
continue;
}
let trimRight = true;
if (i + 1 <= lastContentLine) {
const nextLine = buffer.getLine(i + 1);
if (nextLine?.isWrapped) {
trimRight = false;
}
}
const lineContent = line.translateToString(trimRight);
const stripped = stripAnsi(lineContent);
if (line.isWrapped) {
stream.write(stripped);
} else {
if (i > startLine) {
stream.write('\n');
}
stream.write(stripped);
}
}
return lines.join('\n');
// Ensure it ends with a newline if we wrote anything and the next line is not wrapped
if (lastContentLine >= startLine) {
const nextLine = terminal.buffer.active.getLine(lastContentLine + 1);
if (!nextLine?.isWrapped) {
stream.write('\n');
}
}
return lastContentLine + 1;
};
/**
@@ -198,10 +263,43 @@ const getFullBufferText = (terminal: pkg.Terminal): string => {
export class ShellExecutionService {
private static activePtys = new Map<number, ActivePty>();
private static activeChildProcesses = new Map<number, ActiveChildProcess>();
private static backgroundLogPids = new Set<number>();
private static backgroundLogStreams = new Map<number, fs.WriteStream>();
private static exitedPtyInfo = new Map<
number,
{ exitCode: number; signal?: number }
>();
static getLogDir(): string {
return path.join(Storage.getGlobalTempDir(), 'background-processes');
}
static getLogFilePath(pid: number): string {
return path.join(this.getLogDir(), `background-${pid}.log`);
}
private static syncBackgroundLog(pid: number, content: string): void {
if (!this.backgroundLogPids.has(pid)) return;
const stream = this.backgroundLogStreams.get(pid);
if (stream && content) {
// Strip ANSI escape codes before logging
stream.write(stripAnsi(content));
}
}
private static async cleanupLogStream(pid: number): Promise<void> {
const stream = this.backgroundLogStreams.get(pid);
if (stream) {
await new Promise<void>((resolve) => {
stream.end(() => resolve());
});
this.backgroundLogStreams.delete(pid);
}
this.backgroundLogPids.delete(pid);
}
private static activeResolvers = new Map<
number,
(res: ShellExecutionResult) => void
@@ -432,7 +530,15 @@ export class ShellExecutionService {
chunk: decodedChunk,
};
onOutputEvent(event);
if (child.pid) ShellExecutionService.emitEvent(child.pid, event);
if (child.pid) {
ShellExecutionService.emitEvent(child.pid, event);
if (ShellExecutionService.backgroundLogPids.has(child.pid)) {
ShellExecutionService.syncBackgroundLog(
child.pid,
decodedChunk,
);
}
}
}
} else {
const totalBytes = state.outputChunks.reduce(
@@ -468,17 +574,21 @@ export class ShellExecutionService {
const exitSignal = signal ? os.constants.signals[signal] : null;
if (child.pid) {
const pid = child.pid;
const event: ShellOutputEvent = {
type: 'exit',
exitCode,
signal: exitSignal,
};
onOutputEvent(event);
ShellExecutionService.emitEvent(child.pid, event);
ShellExecutionService.emitEvent(pid, event);
this.activeChildProcesses.delete(child.pid);
this.activeResolvers.delete(child.pid);
this.activeListeners.delete(child.pid);
// eslint-disable-next-line @typescript-eslint/no-floating-promises
ShellExecutionService.cleanupLogStream(pid).then(() => {
this.activeChildProcesses.delete(pid);
this.activeResolvers.delete(pid);
this.activeListeners.delete(pid);
});
}
resolve({
@@ -800,6 +910,16 @@ export class ShellExecutionService {
resolve();
return;
}
if (
ShellExecutionService.backgroundLogPids.has(ptyProcess.pid)
) {
ShellExecutionService.syncBackgroundLog(
ptyProcess.pid,
decodedChunk,
);
}
isWriting = true;
headlessTerminal.write(decodedChunk, () => {
render();
@@ -832,7 +952,6 @@ export class ShellExecutionService {
({ exitCode, signal }: { exitCode: number; signal?: number }) => {
exited = true;
abortSignal.removeEventListener('abort', abortHandler);
this.activePtys.delete(ptyProcess.pid);
// Attempt to destroy the PTY to ensure FD is closed
try {
// eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion
@@ -853,31 +972,36 @@ export class ShellExecutionService {
5 * 60 * 1000,
).unref();
this.activePtys.delete(ptyProcess.pid);
this.activeResolvers.delete(ptyProcess.pid);
// eslint-disable-next-line @typescript-eslint/no-floating-promises
ShellExecutionService.cleanupLogStream(ptyProcess.pid).then(
() => {
this.activePtys.delete(ptyProcess.pid);
this.activeResolvers.delete(ptyProcess.pid);
const event: ShellOutputEvent = {
type: 'exit',
exitCode,
signal: signal ?? null,
};
onOutputEvent(event);
ShellExecutionService.emitEvent(ptyProcess.pid, event);
this.activeListeners.delete(ptyProcess.pid);
const event: ShellOutputEvent = {
type: 'exit',
exitCode,
signal: signal ?? null,
};
onOutputEvent(event);
ShellExecutionService.emitEvent(ptyProcess.pid, event);
this.activeListeners.delete(ptyProcess.pid);
const finalBuffer = Buffer.concat(outputChunks);
const finalBuffer = Buffer.concat(outputChunks);
resolve({
rawOutput: finalBuffer,
output: getFullBufferText(headlessTerminal),
exitCode,
signal: signal ?? null,
error,
aborted: abortSignal.aborted,
// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment
pid: ptyProcess.pid,
executionMethod: ptyInfo?.name ?? 'node-pty',
});
resolve({
rawOutput: finalBuffer,
output: getFullBufferText(headlessTerminal),
exitCode,
signal: signal ?? null,
error,
aborted: abortSignal.aborted,
// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment
pid: ptyProcess.pid,
executionMethod: ptyInfo?.name ?? 'node-pty',
});
},
);
};
if (abortSignal.aborted) {
@@ -1050,10 +1174,12 @@ export class ShellExecutionService {
*
* @param pid The process ID to kill.
*/
static kill(pid: number): void {
static async kill(pid: number): Promise<void> {
const activePty = this.activePtys.get(pid);
const activeChild = this.activeChildProcesses.get(pid);
await this.cleanupLogStream(pid);
if (activeChild) {
killProcessGroup({ pid }).catch(() => {});
this.activeChildProcesses.delete(pid);
@@ -1079,44 +1205,53 @@ export class ShellExecutionService {
*/
static background(pid: number): void {
const resolve = this.activeResolvers.get(pid);
if (resolve) {
let output = '';
const rawOutput = Buffer.from('');
if (!resolve) return;
const activePty = this.activePtys.get(pid);
const activeChild = this.activeChildProcesses.get(pid);
const activePty = this.activePtys.get(pid);
const activeChild = this.activeChildProcesses.get(pid);
if (!activePty && !activeChild) return;
const output = activePty
? getFullBufferText(activePty.headlessTerminal)
: (activeChild?.state.output ?? '');
const executionMethod = activePty ? 'node-pty' : 'child_process';
const logPath = this.getLogFilePath(pid);
const logDir = this.getLogDir();
try {
mkdirSync(logDir, { recursive: true });
const stream = fs.createWriteStream(logPath, { flags: 'w' });
stream.on('error', (err) => {
debugLogger.warn('Background log stream error:', err);
});
this.backgroundLogStreams.set(pid, stream);
if (activePty) {
output = getFullBufferText(activePty.headlessTerminal);
resolve({
rawOutput,
output,
exitCode: null,
signal: null,
error: null,
aborted: false,
pid,
executionMethod: 'node-pty',
backgrounded: true,
});
writeBufferToLogStream(activePty.headlessTerminal, stream, 0);
} else if (activeChild) {
output = activeChild.state.output;
resolve({
rawOutput,
output,
exitCode: null,
signal: null,
error: null,
aborted: false,
pid,
executionMethod: 'child_process',
backgrounded: true,
});
if (output) {
stream.write(stripAnsi(output) + '\n');
}
}
this.activeResolvers.delete(pid);
} catch (e) {
debugLogger.warn('Failed to setup background logging:', e);
}
this.backgroundLogPids.add(pid);
resolve({
rawOutput: Buffer.from(''),
output,
exitCode: null,
signal: null,
error: null,
aborted: false,
pid,
executionMethod,
backgrounded: true,
});
this.activeResolvers.delete(pid);
}
static subscribe(