fix(core): stream grep/ripgrep output to prevent OOM (#17146)

This commit is contained in:
Adam Weidman
2026-01-26 16:52:19 -05:00
committed by GitHub
parent c2d0783965
commit 018dc0d5cf
7 changed files with 888 additions and 994 deletions

View File

@@ -0,0 +1,67 @@
/**
* @license
* Copyright 2026 Google LLC
* SPDX-License-Identifier: Apache-2.0
*/
import { describe, it, expect } from 'vitest';
import { execStreaming } from './shell-utils.js';
// Integration tests using real child processes
describe('execStreaming (Integration)', () => {
it('should yield lines from stdout', async () => {
// Use node to echo for cross-platform support
const generator = execStreaming(process.execPath, [
'-e',
'console.log("line 1\\nline 2")',
]);
const lines = [];
for await (const line of generator) {
lines.push(line);
}
expect(lines).toEqual(['line 1', 'line 2']);
});
it('should throw error on non-zero exit code', async () => {
// exit 2 via node
const generator = execStreaming(process.execPath, [
'-e',
'process.exit(2)',
]);
await expect(async () => {
for await (const _ of generator) {
// consume
}
}).rejects.toThrow();
});
it('should abort cleanly when signal is aborted', async () => {
const controller = new AbortController();
// sleep for 2s via node
const generator = execStreaming(
process.execPath,
['-e', 'setTimeout(() => {}, 2000)'],
{ signal: controller.signal },
);
// Start reading
const readPromise = (async () => {
const lines = [];
try {
for await (const line of generator) {
lines.push(line);
}
} catch (_e) {
// ignore
}
return lines;
})();
setTimeout(() => {
controller.abort();
}, 100);
const lines = await readPromise;
expect(lines).toEqual([]);
});
});

View File

@@ -13,6 +13,7 @@ import {
spawnSync,
type SpawnOptionsWithoutStdio,
} from 'node:child_process';
import * as readline from 'node:readline';
import type { Node, Tree } from 'web-tree-sitter';
import { Language, Parser, Query } from 'web-tree-sitter';
import { loadWasmBinary } from './fileUtils.js';
@@ -765,3 +766,123 @@ export const spawnAsync = (
reject(err);
});
});
/**
* Executes a command and yields lines of output as they appear.
* Use for large outputs where buffering is not feasible.
*
* @param command The executable to run
* @param args Arguments for the executable
* @param options Spawn options (cwd, env, etc.)
*/
export async function* execStreaming(
command: string,
args: string[],
options?: SpawnOptionsWithoutStdio & {
signal?: AbortSignal;
allowedExitCodes?: number[];
},
): AsyncGenerator<string, void, void> {
const child = spawn(command, args, {
...options,
// ensure we don't open a window on windows if possible/relevant
windowsHide: true,
});
const rl = readline.createInterface({
input: child.stdout,
terminal: false,
});
const errorChunks: Buffer[] = [];
let stderrTotalBytes = 0;
const MAX_STDERR_BYTES = 20 * 1024; // 20KB limit
child.stderr.on('data', (chunk) => {
if (stderrTotalBytes < MAX_STDERR_BYTES) {
errorChunks.push(chunk);
stderrTotalBytes += chunk.length;
}
});
let error: Error | null = null;
child.on('error', (err) => {
error = err;
});
const onAbort = () => {
// If manually aborted by signal, we kill immediately.
if (!child.killed) child.kill();
};
if (options?.signal?.aborted) {
onAbort();
} else {
options?.signal?.addEventListener('abort', onAbort);
}
let finished = false;
try {
for await (const line of rl) {
if (options?.signal?.aborted) break;
yield line;
}
finished = true;
} finally {
rl.close();
options?.signal?.removeEventListener('abort', onAbort);
// Ensure process is killed when the generator is closed (consumer breaks loop)
let killedByGenerator = false;
if (!finished && child.exitCode === null && !child.killed) {
try {
child.kill();
} catch (_e) {
// ignore error if process is already dead
}
killedByGenerator = true;
}
// Ensure we wait for the process to exit to check codes
await new Promise<void>((resolve, reject) => {
// If an error occurred before we got here (e.g. spawn failure), reject immediately.
if (error) {
reject(error);
return;
}
function checkExit(code: number | null) {
// If we aborted or killed it manually, we treat it as success (stop waiting)
if (options?.signal?.aborted || killedByGenerator) {
resolve();
return;
}
const allowed = options?.allowedExitCodes ?? [0];
if (code !== null && allowed.includes(code)) {
resolve();
} else {
// If we have an accumulated error or explicit error event
if (error) reject(error);
else {
const stderr = Buffer.concat(errorChunks).toString('utf8');
const truncatedMsg =
stderrTotalBytes >= MAX_STDERR_BYTES ? '...[truncated]' : '';
reject(
new Error(
`Process exited with code ${code}: ${stderr}${truncatedMsg}`,
),
);
}
}
}
if (child.exitCode !== null) {
checkExit(child.exitCode);
} else {
child.on('close', (code) => checkExit(code));
child.on('error', (err) => reject(err));
}
});
}
}