feat(cli): add WebSocket-based network logging and streaming chunk support (#18383)

This commit is contained in:
Sandy Tao
2026-02-06 16:20:22 -08:00
committed by GitHub
parent e3796d137a
commit 7409ce5df6
7 changed files with 402 additions and 78 deletions
+1 -1
View File
@@ -125,7 +125,7 @@ export function evalTest(policy: EvalPolicy, evalCase: EvalCase) {
approvalMode: evalCase.approvalMode ?? 'yolo', approvalMode: evalCase.approvalMode ?? 'yolo',
timeout: evalCase.timeout, timeout: evalCase.timeout,
env: { env: {
GEMINI_CLI_ACTIVITY_LOG_FILE: activityLogFile, GEMINI_CLI_ACTIVITY_LOG_TARGET: activityLogFile,
}, },
}); });
+12
View File
@@ -4352,6 +4352,16 @@
"boxen": "^7.1.1" "boxen": "^7.1.1"
} }
}, },
"node_modules/@types/ws": {
"version": "8.18.1",
"resolved": "https://registry.npmjs.org/@types/ws/-/ws-8.18.1.tgz",
"integrity": "sha512-ThVF6DCVhA8kUGy+aazFQ4kXQ7E1Ty7A3ypFOe0IcJV8O/M511G99AW24irKrW56Wt44yG9+ij8FaqoBGkuBXg==",
"dev": true,
"license": "MIT",
"dependencies": {
"@types/node": "*"
}
},
"node_modules/@types/yargs": { "node_modules/@types/yargs": {
"version": "17.0.33", "version": "17.0.33",
"resolved": "https://registry.npmjs.org/@types/yargs/-/yargs-17.0.33.tgz", "resolved": "https://registry.npmjs.org/@types/yargs/-/yargs-17.0.33.tgz",
@@ -18161,6 +18171,7 @@
"tinygradient": "^1.1.5", "tinygradient": "^1.1.5",
"undici": "^7.10.0", "undici": "^7.10.0",
"wrap-ansi": "9.0.2", "wrap-ansi": "9.0.2",
"ws": "^8.16.0",
"yargs": "^17.7.2", "yargs": "^17.7.2",
"zod": "^3.23.8" "zod": "^3.23.8"
}, },
@@ -18179,6 +18190,7 @@
"@types/semver": "^7.7.0", "@types/semver": "^7.7.0",
"@types/shell-quote": "^1.7.5", "@types/shell-quote": "^1.7.5",
"@types/tar": "^6.1.13", "@types/tar": "^6.1.13",
"@types/ws": "^8.5.10",
"@types/yargs": "^17.0.32", "@types/yargs": "^17.0.32",
"archiver": "^7.0.1", "archiver": "^7.0.1",
"ink-testing-library": "^4.0.0", "ink-testing-library": "^4.0.0",
+2
View File
@@ -65,6 +65,7 @@
"tinygradient": "^1.1.5", "tinygradient": "^1.1.5",
"undici": "^7.10.0", "undici": "^7.10.0",
"wrap-ansi": "9.0.2", "wrap-ansi": "9.0.2",
"ws": "^8.16.0",
"yargs": "^17.7.2", "yargs": "^17.7.2",
"zod": "^3.23.8" "zod": "^3.23.8"
}, },
@@ -80,6 +81,7 @@
"@types/semver": "^7.7.0", "@types/semver": "^7.7.0",
"@types/shell-quote": "^1.7.5", "@types/shell-quote": "^1.7.5",
"@types/tar": "^6.1.13", "@types/tar": "^6.1.13",
"@types/ws": "^8.5.10",
"@types/yargs": "^17.0.32", "@types/yargs": "^17.0.32",
"archiver": "^7.0.1", "archiver": "^7.0.1",
"ink-testing-library": "^4.0.0", "ink-testing-library": "^4.0.0",
+1 -1
View File
@@ -518,7 +518,7 @@ export async function main() {
adminControlsListner.setConfig(config); adminControlsListner.setConfig(config);
if (config.isInteractive() && config.storage && config.getDebugMode()) { if (config.isInteractive() && config.getDebugMode()) {
const { registerActivityLogger } = await import( const { registerActivityLogger } = await import(
'./utils/activityLogger.js' './utils/activityLogger.js'
); );
+4 -4
View File
@@ -267,8 +267,8 @@ describe('runNonInteractive', () => {
// so we no longer expect shutdownTelemetry to be called directly here // so we no longer expect shutdownTelemetry to be called directly here
}); });
it('should register activity logger when GEMINI_CLI_ACTIVITY_LOG_FILE is set', async () => { it('should register activity logger when GEMINI_CLI_ACTIVITY_LOG_TARGET is set', async () => {
vi.stubEnv('GEMINI_CLI_ACTIVITY_LOG_FILE', '/tmp/test.jsonl'); vi.stubEnv('GEMINI_CLI_ACTIVITY_LOG_TARGET', '/tmp/test.jsonl');
const events: ServerGeminiStreamEvent[] = [ const events: ServerGeminiStreamEvent[] = [
{ {
type: GeminiEventType.Finished, type: GeminiEventType.Finished,
@@ -290,8 +290,8 @@ describe('runNonInteractive', () => {
vi.unstubAllEnvs(); vi.unstubAllEnvs();
}); });
it('should not register activity logger when GEMINI_CLI_ACTIVITY_LOG_FILE is not set', async () => { it('should not register activity logger when GEMINI_CLI_ACTIVITY_LOG_TARGET is not set', async () => {
vi.stubEnv('GEMINI_CLI_ACTIVITY_LOG_FILE', ''); vi.stubEnv('GEMINI_CLI_ACTIVITY_LOG_TARGET', '');
const events: ServerGeminiStreamEvent[] = [ const events: ServerGeminiStreamEvent[] = [
{ {
type: GeminiEventType.Finished, type: GeminiEventType.Finished,
+1 -1
View File
@@ -71,7 +71,7 @@ export async function runNonInteractive({
}, },
}); });
if (config.storage && process.env['GEMINI_CLI_ACTIVITY_LOG_FILE']) { if (process.env['GEMINI_CLI_ACTIVITY_LOG_TARGET']) {
const { registerActivityLogger } = await import( const { registerActivityLogger } = await import(
'./utils/activityLogger.js' './utils/activityLogger.js'
); );
+381 -71
View File
@@ -16,8 +16,33 @@ import path from 'node:path';
import { EventEmitter } from 'node:events'; import { EventEmitter } from 'node:events';
import { CoreEvent, coreEvents, debugLogger } from '@google/gemini-cli-core'; import { CoreEvent, coreEvents, debugLogger } from '@google/gemini-cli-core';
import type { Config } from '@google/gemini-cli-core'; import type { Config } from '@google/gemini-cli-core';
import WebSocket from 'ws';
const ACTIVITY_ID_HEADER = 'x-activity-request-id'; const ACTIVITY_ID_HEADER = 'x-activity-request-id';
const MAX_BUFFER_SIZE = 100;
/**
* Parse a host:port string into its components.
* Uses the URL constructor for robust handling of IPv4, IPv6, and hostnames.
* Returns null for file paths or values without a valid port.
*/
function parseHostPort(value: string): { host: string; port: number } | null {
if (value.startsWith('/') || value.startsWith('.')) return null;
try {
const url = new URL(`ws://${value}`);
if (!url.port) return null;
const port = parseInt(url.port, 10);
if (url.hostname && !isNaN(port) && port > 0 && port <= 65535) {
return { host: url.hostname, port };
}
} catch {
// Not a valid host:port
}
return null;
}
export interface NetworkLog { export interface NetworkLog {
id: string; id: string;
@@ -27,6 +52,11 @@ export interface NetworkLog {
headers: Record<string, string>; headers: Record<string, string>;
body?: string; body?: string;
pending?: boolean; pending?: boolean;
chunk?: {
index: number;
data: string;
timestamp: number;
};
response?: { response?: {
status: number; status: number;
headers: Record<string, string>; headers: Record<string, string>;
@@ -44,6 +74,7 @@ export class ActivityLogger extends EventEmitter {
private static instance: ActivityLogger; private static instance: ActivityLogger;
private isInterceptionEnabled = false; private isInterceptionEnabled = false;
private requestStartTimes = new Map<string, number>(); private requestStartTimes = new Map<string, number>();
private networkLoggingEnabled = false;
static getInstance(): ActivityLogger { static getInstance(): ActivityLogger {
if (!ActivityLogger.instance) { if (!ActivityLogger.instance) {
@@ -52,6 +83,19 @@ export class ActivityLogger extends EventEmitter {
return ActivityLogger.instance; return ActivityLogger.instance;
} }
enableNetworkLogging() {
this.networkLoggingEnabled = true;
this.emit('network-logging-enabled');
}
disableNetworkLogging() {
this.networkLoggingEnabled = false;
}
isNetworkLoggingEnabled(): boolean {
return this.networkLoggingEnabled;
}
private stringifyHeaders(headers: unknown): Record<string, string> { private stringifyHeaders(headers: unknown): Record<string, string> {
const result: Record<string, string> = {}; const result: Record<string, string> = {};
if (!headers) return result; if (!headers) return result;
@@ -127,7 +171,8 @@ export class ActivityLogger extends EventEmitter {
: input instanceof URL : input instanceof URL
? input.toString() ? input.toString()
: (input as any).url; : (input as any).url;
if (url.includes('127.0.0.1')) return originalFetch(input, init); if (url.includes('127.0.0.1') || url.includes('localhost'))
return originalFetch(input, init);
const id = Math.random().toString(36).substring(7); const id = Math.random().toString(36).substring(7);
const method = (init?.method || 'GET').toUpperCase(); const method = (init?.method || 'GET').toUpperCase();
@@ -159,32 +204,89 @@ export class ActivityLogger extends EventEmitter {
const response = await originalFetch(input, newInit); const response = await originalFetch(input, newInit);
const clonedRes = response.clone(); const clonedRes = response.clone();
clonedRes // Stream chunks if body is available
.text() if (clonedRes.body) {
.then((text) => { const reader = clonedRes.body.getReader();
const startTime = this.requestStartTimes.get(id); const decoder = new TextDecoder();
const durationMs = startTime ? Date.now() - startTime : 0; const chunks: string[] = [];
this.requestStartTimes.delete(id); let chunkIndex = 0;
this.safeEmitNetwork({ const readStream = async () => {
id, try {
pending: false, while (true) {
response: { const { done, value } = await reader.read();
status: response.status, if (done) break;
headers: this.stringifyHeaders(response.headers),
body: text, const chunkData = decoder.decode(value, { stream: true });
durationMs, chunks.push(chunkData);
},
// Emit chunk update
this.safeEmitNetwork({
id,
pending: true,
chunk: {
index: chunkIndex++,
data: chunkData,
timestamp: Date.now(),
},
});
}
// Final update with complete response
const startTime = this.requestStartTimes.get(id);
const durationMs = startTime ? Date.now() - startTime : 0;
this.requestStartTimes.delete(id);
this.safeEmitNetwork({
id,
pending: false,
response: {
status: response.status,
headers: this.stringifyHeaders(response.headers),
body: chunks.join(''),
durationMs,
},
});
} catch (err) {
const message = err instanceof Error ? err.message : String(err);
this.safeEmitNetwork({
id,
pending: false,
error: `Failed to read response body: ${message}`,
});
}
};
void readStream();
} else {
// Fallback for responses without body stream
clonedRes
.text()
.then((text) => {
const startTime = this.requestStartTimes.get(id);
const durationMs = startTime ? Date.now() - startTime : 0;
this.requestStartTimes.delete(id);
this.safeEmitNetwork({
id,
pending: false,
response: {
status: response.status,
headers: this.stringifyHeaders(response.headers),
body: text,
durationMs,
},
});
})
.catch((err) => {
const message = err instanceof Error ? err.message : String(err);
this.safeEmitNetwork({
id,
pending: false,
error: `Failed to read response body: ${message}`,
});
}); });
}) }
.catch((err) => {
const message = err instanceof Error ? err.message : String(err);
this.safeEmitNetwork({
id,
pending: false,
error: `Failed to read response body: ${message}`,
});
});
return response; return response;
} catch (err: unknown) { } catch (err: unknown) {
@@ -209,7 +311,8 @@ export class ActivityLogger extends EventEmitter {
: options.href || : options.href ||
`${protocol}//${options.hostname || options.host || 'localhost'}${options.path || '/'}`; `${protocol}//${options.hostname || options.host || 'localhost'}${options.path || '/'}`;
if (url.includes('127.0.0.1')) return originalFn.apply(http, args); if (url.includes('127.0.0.1') || url.includes('localhost'))
return originalFn.apply(http, args);
const headers = const headers =
typeof options === 'object' && typeof options !== 'function' typeof options === 'object' && typeof options !== 'function'
@@ -263,9 +366,24 @@ export class ActivityLogger extends EventEmitter {
req.on('response', (res: any) => { req.on('response', (res: any) => {
const responseChunks: Buffer[] = []; const responseChunks: Buffer[] = [];
res.on('data', (chunk: Buffer) => let chunkIndex = 0;
responseChunks.push(Buffer.from(chunk)),
); res.on('data', (chunk: Buffer) => {
const chunkBuffer = Buffer.from(chunk);
responseChunks.push(chunkBuffer);
// Emit chunk update for streaming
self.safeEmitNetwork({
id,
pending: true,
chunk: {
index: chunkIndex++,
data: chunkBuffer.toString('utf8'),
timestamp: Date.now(),
},
});
});
res.on('end', () => { res.on('end', () => {
const buffer = Buffer.concat(responseChunks); const buffer = Buffer.concat(responseChunks);
const encoding = res.headers['content-encoding']; const encoding = res.headers['content-encoding'];
@@ -323,53 +441,245 @@ export class ActivityLogger extends EventEmitter {
} }
/** /**
* Registers the activity logger. * Setup file-based logging to JSONL
* Captures network and console logs to a session-specific JSONL file. */
function setupFileLogging(
capture: ActivityLogger,
config: Config,
customPath?: string,
) {
const logFile =
customPath ||
(config.storage
? path.join(
config.storage.getProjectTempLogsDir(),
`session-${config.getSessionId()}.jsonl`,
)
: null);
if (!logFile) return;
const logsDir = path.dirname(logFile);
if (!fs.existsSync(logsDir)) {
fs.mkdirSync(logsDir, { recursive: true });
}
const writeToLog = (type: 'console' | 'network', payload: unknown) => {
try {
const entry =
JSON.stringify({
type,
payload,
sessionId: config.getSessionId(),
timestamp: Date.now(),
}) + '\n';
fs.promises.appendFile(logFile, entry).catch((err) => {
debugLogger.error('Failed to write to activity log:', err);
});
} catch (err) {
debugLogger.error('Failed to prepare activity log entry:', err);
}
};
capture.on('console', (payload) => writeToLog('console', payload));
capture.on('network', (payload) => writeToLog('network', payload));
}
/**
* Setup network-based logging via WebSocket
*/
function setupNetworkLogging(
capture: ActivityLogger,
host: string,
port: number,
config: Config,
) {
const buffer: Array<Record<string, unknown>> = [];
let ws: WebSocket | null = null;
let reconnectTimer: NodeJS.Timeout | null = null;
let sessionId: string | null = null;
let pingInterval: NodeJS.Timeout | null = null;
const connect = () => {
try {
ws = new WebSocket(`ws://${host}:${port}/ws`);
ws.on('open', () => {
debugLogger.debug(`WebSocket connected to ${host}:${port}`);
// Register with CLI's session ID
sendMessage({
type: 'register',
sessionId: config.getSessionId(),
timestamp: Date.now(),
});
});
ws.on('message', (data: Buffer) => {
try {
const message = JSON.parse(data.toString());
handleServerMessage(message);
} catch (err) {
debugLogger.debug('Invalid WebSocket message:', err);
}
});
ws.on('close', () => {
debugLogger.debug(`WebSocket disconnected from ${host}:${port}`);
cleanup();
scheduleReconnect();
});
ws.on('error', (err) => {
debugLogger.debug(`WebSocket error:`, err);
});
} catch (err) {
debugLogger.debug(`Failed to connect WebSocket:`, err);
scheduleReconnect();
}
};
const handleServerMessage = (message: any) => {
switch (message.type) {
case 'registered':
sessionId = message.sessionId;
debugLogger.debug(`WebSocket session registered: ${sessionId}`);
// Start ping interval
if (pingInterval) clearInterval(pingInterval);
pingInterval = setInterval(() => {
sendMessage({ type: 'pong', timestamp: Date.now() });
}, 15000);
// Flush buffered logs
flushBuffer();
break;
case 'ping':
sendMessage({ type: 'pong', timestamp: Date.now() });
break;
default:
// Ignore unknown message types
break;
}
};
const sendMessage = (message: any) => {
if (ws && ws.readyState === WebSocket.OPEN) {
ws.send(JSON.stringify(message));
}
};
const sendToNetwork = (type: 'console' | 'network', payload: unknown) => {
const message = {
type,
payload,
sessionId: sessionId || config.getSessionId(),
timestamp: Date.now(),
};
// If not connected or network logging not enabled, buffer
if (
!ws ||
ws.readyState !== WebSocket.OPEN ||
!capture.isNetworkLoggingEnabled()
) {
buffer.push(message);
if (buffer.length > MAX_BUFFER_SIZE) buffer.shift();
return;
}
sendMessage(message);
};
const flushBuffer = () => {
if (
!ws ||
ws.readyState !== WebSocket.OPEN ||
!capture.isNetworkLoggingEnabled()
) {
return;
}
debugLogger.debug(`Flushing ${buffer.length} buffered logs...`);
while (buffer.length > 0) {
const message = buffer.shift()!;
sendMessage(message);
}
};
const cleanup = () => {
if (pingInterval) {
clearInterval(pingInterval);
pingInterval = null;
}
ws = null;
};
const scheduleReconnect = () => {
if (reconnectTimer) return;
reconnectTimer = setTimeout(() => {
reconnectTimer = null;
debugLogger.debug('Reconnecting WebSocket...');
connect();
}, 5000);
};
// Initial connection
connect();
capture.on('console', (payload) => sendToNetwork('console', payload));
capture.on('network', (payload) => sendToNetwork('network', payload));
capture.on('network-logging-enabled', () => {
debugLogger.debug('Network logging enabled, flushing buffer...');
flushBuffer();
});
// Cleanup on process exit
process.on('exit', () => {
if (reconnectTimer) clearTimeout(reconnectTimer);
if (ws) ws.close();
cleanup();
});
}
/**
* Registers the activity logger if debug mode and interactive session are enabled.
* Captures network and console logs to a session-specific JSONL file or sends to network.
* *
* The log file location can be overridden via the GEMINI_CLI_ACTIVITY_LOG_FILE * Environment variable GEMINI_CLI_ACTIVITY_LOG_TARGET controls the output:
* environment variable. If not set, defaults to logs/session-{sessionId}.jsonl * - host:port format (e.g., "localhost:25417") network mode (auto-enabled)
* in the project's temp directory. * - file path (e.g., "/tmp/logs.jsonl") file mode (immediate)
* - not set uses default file location in project temp logs dir
* *
* @param config The CLI configuration * @param config The CLI configuration
*/ */
export function registerActivityLogger(config: Config) { export function registerActivityLogger(config: Config) {
if (config.storage) { const target = process.env['GEMINI_CLI_ACTIVITY_LOG_TARGET'];
const capture = ActivityLogger.getInstance(); const hostPort = target ? parseHostPort(target) : null;
capture.enable();
const logsDir = config.storage.getProjectTempLogsDir(); // Network mode doesn't need storage; file mode does
if (!fs.existsSync(logsDir)) { if (!hostPort && !config.storage) {
fs.mkdirSync(logsDir, { recursive: true }); return;
}
const logFile =
process.env['GEMINI_CLI_ACTIVITY_LOG_FILE'] ||
path.join(logsDir, `session-${config.getSessionId()}.jsonl`);
const writeToLog = (type: 'console' | 'network', payload: unknown) => {
try {
const entry =
JSON.stringify({
type,
payload,
timestamp: Date.now(),
}) + '\n';
// Use asynchronous fire-and-forget to avoid blocking the event loop
fs.promises.appendFile(logFile, entry).catch((err) => {
debugLogger.error('Failed to write to activity log:', err);
});
} catch (err) {
debugLogger.error('Failed to prepare activity log entry:', err);
}
};
capture.on('console', (payload) => writeToLog('console', payload));
capture.on('network', (payload) => writeToLog('network', payload));
// Bridge CoreEvents to local capture
coreEvents.on(CoreEvent.ConsoleLog, (payload) => {
capture.logConsole(payload);
});
} }
const capture = ActivityLogger.getInstance();
capture.enable();
if (hostPort) {
// Network mode: send logs via WebSocket
setupNetworkLogging(capture, hostPort.host, hostPort.port, config);
// Auto-enable network logging when target is explicitly configured
capture.enableNetworkLogging();
} else {
// File mode: write to JSONL file
setupFileLogging(capture, config, target);
}
// Bridge CoreEvents to local capture
coreEvents.on(CoreEvent.ConsoleLog, (payload) => {
capture.logConsole(payload);
});
} }