mirror of
https://github.com/google-gemini/gemini-cli.git
synced 2026-04-24 12:04:56 -07:00
859c7c3a70
fix(cli): Write shell command output to a file and limit memory buffered in UI Fixes. Checkpoint. fix(core, cli): await outputStream.end() to prevent race conditions This commit fixes a critical race condition where was called synchronously without being awaited. This led to potential file truncation or EBUSY errors on Windows when attempting to manipulate the file immediately after the call. Additionally, this change removes fixed wait times (`setTimeout`) that were previously used in test files as a band-aid. fix(core): stream processed xterm output to file to remove spurious escape codes test(core): update shell regression tests to use file_data events
640 lines
17 KiB
TypeScript
640 lines
17 KiB
TypeScript
/**
|
|
* @license
|
|
* Copyright 2025 Google LLC
|
|
* SPDX-License-Identifier: Apache-2.0
|
|
*/
|
|
|
|
import type { InjectionService } from '../config/injectionService.js';
|
|
import type { AnsiOutput } from '../utils/terminalSerializer.js';
|
|
import { debugLogger } from '../utils/debugLogger.js';
|
|
import { sanitizeOutput } from '../utils/textUtils.js';
|
|
|
|
export type ExecutionMethod =
|
|
| 'lydell-node-pty'
|
|
| 'node-pty'
|
|
| 'child_process'
|
|
| 'remote_agent'
|
|
| 'none';
|
|
|
|
export interface ExecutionResult {
|
|
rawOutput?: Buffer;
|
|
output: string;
|
|
ansiOutput?: AnsiOutput;
|
|
exitCode: number | null;
|
|
signal: number | null;
|
|
error: Error | null;
|
|
aborted: boolean;
|
|
pid: number | undefined;
|
|
executionMethod: ExecutionMethod;
|
|
backgrounded?: boolean;
|
|
}
|
|
|
|
export interface ExecutionHandle {
|
|
pid: number | undefined;
|
|
result: Promise<ExecutionResult>;
|
|
}
|
|
|
|
export type ExecutionOutputEvent =
|
|
| {
|
|
type: 'raw_data';
|
|
chunk: string;
|
|
}
|
|
| {
|
|
type: 'file_data';
|
|
chunk: string;
|
|
}
|
|
| {
|
|
type: 'data';
|
|
chunk: string | AnsiOutput;
|
|
}
|
|
| {
|
|
type: 'binary_detected';
|
|
}
|
|
| {
|
|
type: 'binary_progress';
|
|
bytesReceived: number;
|
|
}
|
|
| {
|
|
type: 'exit';
|
|
exitCode: number | null;
|
|
signal: number | null;
|
|
};
|
|
|
|
export interface ExecutionCompletionOptions {
|
|
exitCode?: number | null;
|
|
signal?: number | null;
|
|
error?: Error | null;
|
|
aborted?: boolean;
|
|
}
|
|
|
|
export interface ExternalExecutionRegistration {
|
|
executionMethod: ExecutionMethod;
|
|
/** Human-readable label for the background task UI (e.g. the command string). */
|
|
label?: string;
|
|
initialOutput?: string;
|
|
getBackgroundOutput?: () => string;
|
|
getSubscriptionSnapshot?: () => string | AnsiOutput | undefined;
|
|
writeInput?: (input: string) => void;
|
|
kill?: () => void;
|
|
isActive?: () => boolean;
|
|
formatInjection?: FormatInjectionFn;
|
|
completionBehavior?: CompletionBehavior;
|
|
}
|
|
|
|
/**
|
|
* Callback that an execution creator provides to control how its output
|
|
* is formatted when reinjected into the model conversation after backgrounding.
|
|
* Return `null` to skip injection entirely.
|
|
*/
|
|
export type FormatInjectionFn = (
|
|
output: string,
|
|
error: Error | null,
|
|
) => string | null;
|
|
|
|
/**
|
|
* Controls what happens when a backgrounded execution completes:
|
|
* - `'inject'` — full formatted output is injected into the conversation; task auto-dismisses from UI.
|
|
* - `'notify'` — a short pointer (e.g. "output saved to /tmp/...") is injected; task auto-dismisses from UI.
|
|
* - `'silent'` — nothing is injected; task stays in the UI until manually dismissed.
|
|
*
|
|
* The distinction between `inject` and `notify` is semantic for now (both inject + dismiss),
|
|
* but enables the system to treat them differently in the future (e.g. LLM-decided injection).
|
|
*/
|
|
export type CompletionBehavior = 'inject' | 'notify' | 'silent';
|
|
|
|
interface ManagedExecutionBase {
|
|
executionMethod: ExecutionMethod;
|
|
label?: string;
|
|
output: string;
|
|
backgrounded?: boolean;
|
|
formatInjection?: FormatInjectionFn;
|
|
completionBehavior?: CompletionBehavior;
|
|
getBackgroundOutput?: () => string;
|
|
getSubscriptionSnapshot?: () => string | AnsiOutput | undefined;
|
|
}
|
|
|
|
/**
|
|
* Payload emitted when an execution is moved to the background.
|
|
*/
|
|
export interface BackgroundStartInfo {
|
|
executionId: number;
|
|
executionMethod: ExecutionMethod;
|
|
label: string;
|
|
output: string;
|
|
completionBehavior: CompletionBehavior;
|
|
}
|
|
|
|
export type BackgroundStartListener = (info: BackgroundStartInfo) => void;
|
|
|
|
/**
|
|
* Payload emitted when a previously-backgrounded execution settles.
|
|
*/
|
|
export interface BackgroundCompletionInfo {
|
|
executionId: number;
|
|
executionMethod: ExecutionMethod;
|
|
output: string;
|
|
error: Error | null;
|
|
/** Pre-formatted injection text from the execution creator, or `null` if skipped. */
|
|
injectionText: string | null;
|
|
completionBehavior: CompletionBehavior;
|
|
}
|
|
|
|
export type BackgroundCompletionListener = (
|
|
info: BackgroundCompletionInfo,
|
|
) => void;
|
|
|
|
interface VirtualExecutionState extends ManagedExecutionBase {
|
|
kind: 'virtual';
|
|
onKill?: () => void;
|
|
}
|
|
|
|
interface ExternalExecutionState extends ManagedExecutionBase {
|
|
kind: 'external';
|
|
writeInput?: (input: string) => void;
|
|
kill?: () => void;
|
|
isActive?: () => boolean;
|
|
}
|
|
|
|
type ManagedExecutionState = VirtualExecutionState | ExternalExecutionState;
|
|
|
|
const NON_PROCESS_EXECUTION_ID_START = 2_000_000_000;
|
|
|
|
/**
|
|
* Central owner for execution backgrounding lifecycle across shell and tools.
|
|
*/
|
|
export class ExecutionLifecycleService {
|
|
private static readonly EXIT_INFO_TTL_MS = 5 * 60 * 1000;
|
|
private static nextExecutionId = NON_PROCESS_EXECUTION_ID_START;
|
|
private static injectionService: InjectionService | null = null;
|
|
|
|
/**
|
|
* Connects the lifecycle service to the injection service so that
|
|
* backgrounded executions are reinjected into the model conversation
|
|
* directly from the backend — no UI hop needed.
|
|
*/
|
|
static setInjectionService(service: InjectionService): void {
|
|
this.injectionService = service;
|
|
}
|
|
|
|
private static activeExecutions = new Map<number, ManagedExecutionState>();
|
|
private static activeResolvers = new Map<
|
|
number,
|
|
(result: ExecutionResult) => void
|
|
>();
|
|
private static activeListeners = new Map<
|
|
number,
|
|
Set<(event: ExecutionOutputEvent) => void>
|
|
>();
|
|
private static exitedExecutionInfo = new Map<
|
|
number,
|
|
{ exitCode: number; signal?: number }
|
|
>();
|
|
private static backgroundCompletionListeners =
|
|
new Set<BackgroundCompletionListener>();
|
|
|
|
private static backgroundStartListeners = new Set<BackgroundStartListener>();
|
|
|
|
/**
|
|
* Registers a listener that fires when any execution is moved to the background.
|
|
* This is the hook for the UI to automatically discover backgrounded executions.
|
|
*/
|
|
static onBackground(listener: BackgroundStartListener): void {
|
|
this.backgroundStartListeners.add(listener);
|
|
}
|
|
|
|
/**
|
|
* Unregisters a background start listener.
|
|
*/
|
|
static offBackground(listener: BackgroundStartListener): void {
|
|
this.backgroundStartListeners.delete(listener);
|
|
}
|
|
|
|
/**
|
|
* Registers a listener that fires when a previously-backgrounded
|
|
* execution settles (completes or errors).
|
|
*/
|
|
static onBackgroundComplete(listener: BackgroundCompletionListener): void {
|
|
this.backgroundCompletionListeners.add(listener);
|
|
}
|
|
|
|
/**
|
|
* Unregisters a background completion listener.
|
|
*/
|
|
static offBackgroundComplete(listener: BackgroundCompletionListener): void {
|
|
this.backgroundCompletionListeners.delete(listener);
|
|
}
|
|
|
|
private static storeExitInfo(
|
|
executionId: number,
|
|
exitCode: number,
|
|
signal?: number,
|
|
): void {
|
|
this.exitedExecutionInfo.set(executionId, {
|
|
exitCode,
|
|
signal,
|
|
});
|
|
setTimeout(() => {
|
|
this.exitedExecutionInfo.delete(executionId);
|
|
}, this.EXIT_INFO_TTL_MS).unref();
|
|
}
|
|
|
|
private static allocateExecutionId(): number {
|
|
let executionId = ++this.nextExecutionId;
|
|
while (this.activeExecutions.has(executionId)) {
|
|
executionId = ++this.nextExecutionId;
|
|
}
|
|
return executionId;
|
|
}
|
|
|
|
private static createPendingResult(
|
|
executionId: number,
|
|
): Promise<ExecutionResult> {
|
|
return new Promise<ExecutionResult>((resolve) => {
|
|
this.activeResolvers.set(executionId, resolve);
|
|
});
|
|
}
|
|
|
|
private static createAbortedResult(
|
|
executionId: number,
|
|
execution: ManagedExecutionState,
|
|
): ExecutionResult {
|
|
const output = execution.getBackgroundOutput?.() ?? execution.output;
|
|
return {
|
|
rawOutput: Buffer.from(output, 'utf8'),
|
|
output,
|
|
exitCode: 130,
|
|
signal: null,
|
|
error: new Error('Operation cancelled by user.'),
|
|
aborted: true,
|
|
pid: executionId,
|
|
executionMethod: execution.executionMethod,
|
|
};
|
|
}
|
|
|
|
/**
|
|
* Resets lifecycle state for isolated unit tests.
|
|
*/
|
|
static resetForTest(): void {
|
|
this.activeExecutions.clear();
|
|
this.activeResolvers.clear();
|
|
this.activeListeners.clear();
|
|
this.exitedExecutionInfo.clear();
|
|
this.backgroundCompletionListeners.clear();
|
|
this.injectionService = null;
|
|
this.backgroundStartListeners.clear();
|
|
this.nextExecutionId = NON_PROCESS_EXECUTION_ID_START;
|
|
}
|
|
|
|
static attachExecution(
|
|
executionId: number,
|
|
registration: ExternalExecutionRegistration,
|
|
): ExecutionHandle {
|
|
if (
|
|
this.activeExecutions.has(executionId) ||
|
|
this.activeResolvers.has(executionId)
|
|
) {
|
|
throw new Error(`Execution ${executionId} is already attached.`);
|
|
}
|
|
this.exitedExecutionInfo.delete(executionId);
|
|
|
|
this.activeExecutions.set(executionId, {
|
|
executionMethod: registration.executionMethod,
|
|
label: registration.label,
|
|
output: registration.initialOutput ?? '',
|
|
kind: 'external',
|
|
getBackgroundOutput: registration.getBackgroundOutput,
|
|
getSubscriptionSnapshot: registration.getSubscriptionSnapshot,
|
|
writeInput: registration.writeInput,
|
|
kill: registration.kill,
|
|
isActive: registration.isActive,
|
|
formatInjection: registration.formatInjection,
|
|
completionBehavior: registration.completionBehavior,
|
|
});
|
|
|
|
return {
|
|
pid: executionId,
|
|
result: this.createPendingResult(executionId),
|
|
};
|
|
}
|
|
|
|
static createExecution(
|
|
initialOutput = '',
|
|
onKill?: () => void,
|
|
executionMethod: ExecutionMethod = 'none',
|
|
formatInjection?: FormatInjectionFn,
|
|
label?: string,
|
|
completionBehavior?: CompletionBehavior,
|
|
): ExecutionHandle {
|
|
const executionId = this.allocateExecutionId();
|
|
|
|
this.activeExecutions.set(executionId, {
|
|
executionMethod,
|
|
label,
|
|
output: initialOutput,
|
|
kind: 'virtual',
|
|
onKill,
|
|
formatInjection,
|
|
completionBehavior,
|
|
getBackgroundOutput: () => {
|
|
const state = this.activeExecutions.get(executionId);
|
|
return state?.output ?? initialOutput;
|
|
},
|
|
getSubscriptionSnapshot: () => {
|
|
const state = this.activeExecutions.get(executionId);
|
|
return state?.output ?? initialOutput;
|
|
},
|
|
});
|
|
|
|
return {
|
|
pid: executionId,
|
|
result: this.createPendingResult(executionId),
|
|
};
|
|
}
|
|
|
|
static appendOutput(executionId: number, chunk: string): void {
|
|
const execution = this.activeExecutions.get(executionId);
|
|
if (!execution || chunk.length === 0) {
|
|
return;
|
|
}
|
|
|
|
execution.output += chunk;
|
|
this.emitEvent(executionId, { type: 'data', chunk });
|
|
}
|
|
|
|
static emitEvent(executionId: number, event: ExecutionOutputEvent): void {
|
|
const listeners = this.activeListeners.get(executionId);
|
|
if (listeners) {
|
|
listeners.forEach((listener) => listener(event));
|
|
}
|
|
}
|
|
|
|
private static resolvePending(
|
|
executionId: number,
|
|
result: ExecutionResult,
|
|
): void {
|
|
const resolve = this.activeResolvers.get(executionId);
|
|
if (!resolve) {
|
|
return;
|
|
}
|
|
|
|
resolve(result);
|
|
this.activeResolvers.delete(executionId);
|
|
}
|
|
|
|
private static settleExecution(
|
|
executionId: number,
|
|
result: ExecutionResult,
|
|
): void {
|
|
const execution = this.activeExecutions.get(executionId);
|
|
if (!execution) {
|
|
return;
|
|
}
|
|
|
|
// Fire background completion listeners if this was a backgrounded execution.
|
|
if (execution.backgrounded && !result.aborted) {
|
|
const behavior =
|
|
execution.completionBehavior ??
|
|
(execution.formatInjection ? 'inject' : 'silent');
|
|
const rawInjection =
|
|
behavior !== 'silent' && execution.formatInjection
|
|
? execution.formatInjection(result.output, result.error)
|
|
: null;
|
|
|
|
const injectionText = rawInjection ? sanitizeOutput(rawInjection) : null;
|
|
|
|
// Inject directly into the model conversation from the backend.
|
|
if (injectionText && this.injectionService) {
|
|
this.injectionService.addInjection(
|
|
injectionText,
|
|
'background_completion',
|
|
);
|
|
}
|
|
|
|
const info: BackgroundCompletionInfo = {
|
|
executionId,
|
|
executionMethod: execution.executionMethod,
|
|
output: result.output,
|
|
error: result.error,
|
|
injectionText,
|
|
completionBehavior: behavior,
|
|
};
|
|
|
|
for (const listener of this.backgroundCompletionListeners) {
|
|
try {
|
|
listener(info);
|
|
} catch (error) {
|
|
debugLogger.warn(`Background completion listener failed: ${error}`);
|
|
}
|
|
}
|
|
}
|
|
|
|
this.resolvePending(executionId, result);
|
|
this.emitEvent(executionId, {
|
|
type: 'exit',
|
|
exitCode: result.exitCode,
|
|
signal: result.signal,
|
|
});
|
|
|
|
this.activeListeners.delete(executionId);
|
|
this.activeExecutions.delete(executionId);
|
|
this.storeExitInfo(
|
|
executionId,
|
|
result.exitCode ?? 0,
|
|
result.signal ?? undefined,
|
|
);
|
|
}
|
|
|
|
static completeExecution(
|
|
executionId: number,
|
|
options?: ExecutionCompletionOptions,
|
|
): void {
|
|
const execution = this.activeExecutions.get(executionId);
|
|
if (!execution) {
|
|
return;
|
|
}
|
|
|
|
const {
|
|
error = null,
|
|
aborted = false,
|
|
exitCode = error ? 1 : 0,
|
|
signal = null,
|
|
} = options ?? {};
|
|
|
|
const output = execution.getBackgroundOutput?.() ?? execution.output;
|
|
const snapshot = execution.getSubscriptionSnapshot?.();
|
|
const ansiOutput = Array.isArray(snapshot) ? snapshot : undefined;
|
|
|
|
this.settleExecution(executionId, {
|
|
rawOutput: Buffer.from(output, 'utf8'),
|
|
output,
|
|
ansiOutput,
|
|
exitCode,
|
|
signal,
|
|
error,
|
|
aborted,
|
|
pid: executionId,
|
|
executionMethod: execution.executionMethod,
|
|
});
|
|
}
|
|
|
|
static completeWithResult(
|
|
executionId: number,
|
|
result: ExecutionResult,
|
|
): void {
|
|
this.settleExecution(executionId, result);
|
|
}
|
|
|
|
static background(executionId: number): void {
|
|
const resolve = this.activeResolvers.get(executionId);
|
|
if (!resolve) {
|
|
return;
|
|
}
|
|
|
|
const execution = this.activeExecutions.get(executionId);
|
|
if (!execution) {
|
|
return;
|
|
}
|
|
|
|
const output = execution.getBackgroundOutput?.() ?? execution.output;
|
|
|
|
resolve({
|
|
rawOutput: Buffer.from(''),
|
|
output,
|
|
exitCode: null,
|
|
signal: null,
|
|
error: null,
|
|
aborted: false,
|
|
pid: executionId,
|
|
executionMethod: execution.executionMethod,
|
|
backgrounded: true,
|
|
});
|
|
|
|
this.activeResolvers.delete(executionId);
|
|
execution.backgrounded = true;
|
|
|
|
// Notify listeners that an execution was moved to the background.
|
|
const info: BackgroundStartInfo = {
|
|
executionId,
|
|
executionMethod: execution.executionMethod,
|
|
label:
|
|
execution.label ?? `${execution.executionMethod} (ID: ${executionId})`,
|
|
output,
|
|
completionBehavior:
|
|
execution.completionBehavior ??
|
|
(execution.formatInjection ? 'inject' : 'silent'),
|
|
};
|
|
for (const listener of this.backgroundStartListeners) {
|
|
listener(info);
|
|
}
|
|
}
|
|
|
|
static subscribe(
|
|
executionId: number,
|
|
listener: (event: ExecutionOutputEvent) => void,
|
|
): () => void {
|
|
if (!this.activeListeners.has(executionId)) {
|
|
this.activeListeners.set(executionId, new Set());
|
|
}
|
|
this.activeListeners.get(executionId)?.add(listener);
|
|
|
|
const execution = this.activeExecutions.get(executionId);
|
|
if (execution) {
|
|
const snapshot =
|
|
execution.getSubscriptionSnapshot?.() ??
|
|
(execution.output.length > 0 ? execution.output : undefined);
|
|
if (snapshot && (typeof snapshot !== 'string' || snapshot.length > 0)) {
|
|
listener({ type: 'data', chunk: snapshot });
|
|
}
|
|
}
|
|
|
|
return () => {
|
|
this.activeListeners.get(executionId)?.delete(listener);
|
|
if (this.activeListeners.get(executionId)?.size === 0) {
|
|
this.activeListeners.delete(executionId);
|
|
}
|
|
};
|
|
}
|
|
|
|
static onExit(
|
|
executionId: number,
|
|
callback: (exitCode: number, signal?: number) => void,
|
|
): () => void {
|
|
if (this.activeExecutions.has(executionId)) {
|
|
const listener = (event: ExecutionOutputEvent) => {
|
|
if (event.type === 'exit') {
|
|
callback(event.exitCode ?? 0, event.signal ?? undefined);
|
|
unsubscribe();
|
|
}
|
|
};
|
|
const unsubscribe = this.subscribe(executionId, listener);
|
|
return unsubscribe;
|
|
}
|
|
|
|
const exitedInfo = this.exitedExecutionInfo.get(executionId);
|
|
if (exitedInfo) {
|
|
callback(exitedInfo.exitCode, exitedInfo.signal);
|
|
}
|
|
|
|
return () => {};
|
|
}
|
|
|
|
static kill(executionId: number): void {
|
|
const execution = this.activeExecutions.get(executionId);
|
|
if (!execution) {
|
|
return;
|
|
}
|
|
|
|
if (execution.kind === 'virtual') {
|
|
execution.onKill?.();
|
|
}
|
|
|
|
if (execution.kind === 'external') {
|
|
execution.kill?.();
|
|
}
|
|
|
|
this.completeWithResult(
|
|
executionId,
|
|
this.createAbortedResult(executionId, execution),
|
|
);
|
|
}
|
|
|
|
static isActive(executionId: number): boolean {
|
|
const execution = this.activeExecutions.get(executionId);
|
|
if (!execution) {
|
|
if (executionId >= NON_PROCESS_EXECUTION_ID_START) {
|
|
return false;
|
|
}
|
|
try {
|
|
return process.kill(executionId, 0);
|
|
} catch {
|
|
return false;
|
|
}
|
|
}
|
|
|
|
if (execution.kind === 'virtual') {
|
|
return true;
|
|
}
|
|
|
|
if (execution.kind === 'external' && execution.isActive) {
|
|
try {
|
|
return execution.isActive();
|
|
} catch {
|
|
return false;
|
|
}
|
|
}
|
|
|
|
try {
|
|
return process.kill(executionId, 0);
|
|
} catch {
|
|
return false;
|
|
}
|
|
}
|
|
|
|
static writeInput(executionId: number, input: string): void {
|
|
const execution = this.activeExecutions.get(executionId);
|
|
if (execution?.kind === 'external') {
|
|
execution.writeInput?.(input);
|
|
}
|
|
}
|
|
}
|