diff --git a/evals/test-helper.ts b/evals/test-helper.ts index 2526e1c374..b0f865ffa5 100644 --- a/evals/test-helper.ts +++ b/evals/test-helper.ts @@ -125,7 +125,7 @@ export function evalTest(policy: EvalPolicy, evalCase: EvalCase) { approvalMode: evalCase.approvalMode ?? 'yolo', timeout: evalCase.timeout, env: { - GEMINI_CLI_ACTIVITY_LOG_FILE: activityLogFile, + GEMINI_CLI_ACTIVITY_LOG_TARGET: activityLogFile, }, }); diff --git a/package-lock.json b/package-lock.json index 012115c83d..b59d5a3c3a 100644 --- a/package-lock.json +++ b/package-lock.json @@ -4352,6 +4352,16 @@ "boxen": "^7.1.1" } }, + "node_modules/@types/ws": { + "version": "8.18.1", + "resolved": "https://registry.npmjs.org/@types/ws/-/ws-8.18.1.tgz", + "integrity": "sha512-ThVF6DCVhA8kUGy+aazFQ4kXQ7E1Ty7A3ypFOe0IcJV8O/M511G99AW24irKrW56Wt44yG9+ij8FaqoBGkuBXg==", + "dev": true, + "license": "MIT", + "dependencies": { + "@types/node": "*" + } + }, "node_modules/@types/yargs": { "version": "17.0.33", "resolved": "https://registry.npmjs.org/@types/yargs/-/yargs-17.0.33.tgz", @@ -18161,6 +18171,7 @@ "tinygradient": "^1.1.5", "undici": "^7.10.0", "wrap-ansi": "9.0.2", + "ws": "^8.16.0", "yargs": "^17.7.2", "zod": "^3.23.8" }, @@ -18179,6 +18190,7 @@ "@types/semver": "^7.7.0", "@types/shell-quote": "^1.7.5", "@types/tar": "^6.1.13", + "@types/ws": "^8.5.10", "@types/yargs": "^17.0.32", "archiver": "^7.0.1", "ink-testing-library": "^4.0.0", diff --git a/packages/cli/package.json b/packages/cli/package.json index 9dd3984b1e..e9bbf63deb 100644 --- a/packages/cli/package.json +++ b/packages/cli/package.json @@ -65,6 +65,7 @@ "tinygradient": "^1.1.5", "undici": "^7.10.0", "wrap-ansi": "9.0.2", + "ws": "^8.16.0", "yargs": "^17.7.2", "zod": "^3.23.8" }, @@ -80,6 +81,7 @@ "@types/semver": "^7.7.0", "@types/shell-quote": "^1.7.5", "@types/tar": "^6.1.13", + "@types/ws": "^8.5.10", "@types/yargs": "^17.0.32", "archiver": "^7.0.1", "ink-testing-library": "^4.0.0", diff --git a/packages/cli/src/gemini.tsx b/packages/cli/src/gemini.tsx index 1e0f4ecd06..1887c8796e 100644 --- a/packages/cli/src/gemini.tsx +++ b/packages/cli/src/gemini.tsx @@ -518,7 +518,7 @@ export async function main() { adminControlsListner.setConfig(config); - if (config.isInteractive() && config.storage && config.getDebugMode()) { + if (config.isInteractive() && config.getDebugMode()) { const { registerActivityLogger } = await import( './utils/activityLogger.js' ); diff --git a/packages/cli/src/nonInteractiveCli.test.ts b/packages/cli/src/nonInteractiveCli.test.ts index d0e21b6b6d..0824788503 100644 --- a/packages/cli/src/nonInteractiveCli.test.ts +++ b/packages/cli/src/nonInteractiveCli.test.ts @@ -267,8 +267,8 @@ describe('runNonInteractive', () => { // so we no longer expect shutdownTelemetry to be called directly here }); - it('should register activity logger when GEMINI_CLI_ACTIVITY_LOG_FILE is set', async () => { - vi.stubEnv('GEMINI_CLI_ACTIVITY_LOG_FILE', '/tmp/test.jsonl'); + it('should register activity logger when GEMINI_CLI_ACTIVITY_LOG_TARGET is set', async () => { + vi.stubEnv('GEMINI_CLI_ACTIVITY_LOG_TARGET', '/tmp/test.jsonl'); const events: ServerGeminiStreamEvent[] = [ { type: GeminiEventType.Finished, @@ -290,8 +290,8 @@ describe('runNonInteractive', () => { vi.unstubAllEnvs(); }); - it('should not register activity logger when GEMINI_CLI_ACTIVITY_LOG_FILE is not set', async () => { - vi.stubEnv('GEMINI_CLI_ACTIVITY_LOG_FILE', ''); + it('should not register activity logger when GEMINI_CLI_ACTIVITY_LOG_TARGET is not set', async () => { + vi.stubEnv('GEMINI_CLI_ACTIVITY_LOG_TARGET', ''); const events: ServerGeminiStreamEvent[] = [ { type: GeminiEventType.Finished, diff --git a/packages/cli/src/nonInteractiveCli.ts b/packages/cli/src/nonInteractiveCli.ts index a2ca92a4e8..eca75ac739 100644 --- a/packages/cli/src/nonInteractiveCli.ts +++ b/packages/cli/src/nonInteractiveCli.ts @@ -71,7 +71,7 @@ export async function runNonInteractive({ }, }); - if (config.storage && process.env['GEMINI_CLI_ACTIVITY_LOG_FILE']) { + if (process.env['GEMINI_CLI_ACTIVITY_LOG_TARGET']) { const { registerActivityLogger } = await import( './utils/activityLogger.js' ); diff --git a/packages/cli/src/utils/activityLogger.ts b/packages/cli/src/utils/activityLogger.ts index 6bd4cc1318..fb35cd881c 100644 --- a/packages/cli/src/utils/activityLogger.ts +++ b/packages/cli/src/utils/activityLogger.ts @@ -16,8 +16,33 @@ import path from 'node:path'; import { EventEmitter } from 'node:events'; import { CoreEvent, coreEvents, debugLogger } from '@google/gemini-cli-core'; import type { Config } from '@google/gemini-cli-core'; +import WebSocket from 'ws'; const ACTIVITY_ID_HEADER = 'x-activity-request-id'; +const MAX_BUFFER_SIZE = 100; + +/** + * Parse a host:port string into its components. + * Uses the URL constructor for robust handling of IPv4, IPv6, and hostnames. + * Returns null for file paths or values without a valid port. + */ +function parseHostPort(value: string): { host: string; port: number } | null { + if (value.startsWith('/') || value.startsWith('.')) return null; + + try { + const url = new URL(`ws://${value}`); + if (!url.port) return null; + + const port = parseInt(url.port, 10); + if (url.hostname && !isNaN(port) && port > 0 && port <= 65535) { + return { host: url.hostname, port }; + } + } catch { + // Not a valid host:port + } + + return null; +} export interface NetworkLog { id: string; @@ -27,6 +52,11 @@ export interface NetworkLog { headers: Record; body?: string; pending?: boolean; + chunk?: { + index: number; + data: string; + timestamp: number; + }; response?: { status: number; headers: Record; @@ -44,6 +74,7 @@ export class ActivityLogger extends EventEmitter { private static instance: ActivityLogger; private isInterceptionEnabled = false; private requestStartTimes = new Map(); + private networkLoggingEnabled = false; static getInstance(): ActivityLogger { if (!ActivityLogger.instance) { @@ -52,6 +83,19 @@ export class ActivityLogger extends EventEmitter { return ActivityLogger.instance; } + enableNetworkLogging() { + this.networkLoggingEnabled = true; + this.emit('network-logging-enabled'); + } + + disableNetworkLogging() { + this.networkLoggingEnabled = false; + } + + isNetworkLoggingEnabled(): boolean { + return this.networkLoggingEnabled; + } + private stringifyHeaders(headers: unknown): Record { const result: Record = {}; if (!headers) return result; @@ -127,7 +171,8 @@ export class ActivityLogger extends EventEmitter { : input instanceof URL ? input.toString() : (input as any).url; - if (url.includes('127.0.0.1')) return originalFetch(input, init); + if (url.includes('127.0.0.1') || url.includes('localhost')) + return originalFetch(input, init); const id = Math.random().toString(36).substring(7); const method = (init?.method || 'GET').toUpperCase(); @@ -159,32 +204,89 @@ export class ActivityLogger extends EventEmitter { const response = await originalFetch(input, newInit); const clonedRes = response.clone(); - clonedRes - .text() - .then((text) => { - const startTime = this.requestStartTimes.get(id); - const durationMs = startTime ? Date.now() - startTime : 0; - this.requestStartTimes.delete(id); + // Stream chunks if body is available + if (clonedRes.body) { + const reader = clonedRes.body.getReader(); + const decoder = new TextDecoder(); + const chunks: string[] = []; + let chunkIndex = 0; - this.safeEmitNetwork({ - id, - pending: false, - response: { - status: response.status, - headers: this.stringifyHeaders(response.headers), - body: text, - durationMs, - }, + const readStream = async () => { + try { + while (true) { + const { done, value } = await reader.read(); + if (done) break; + + const chunkData = decoder.decode(value, { stream: true }); + chunks.push(chunkData); + + // Emit chunk update + this.safeEmitNetwork({ + id, + pending: true, + chunk: { + index: chunkIndex++, + data: chunkData, + timestamp: Date.now(), + }, + }); + } + + // Final update with complete response + const startTime = this.requestStartTimes.get(id); + const durationMs = startTime ? Date.now() - startTime : 0; + this.requestStartTimes.delete(id); + + this.safeEmitNetwork({ + id, + pending: false, + response: { + status: response.status, + headers: this.stringifyHeaders(response.headers), + body: chunks.join(''), + durationMs, + }, + }); + } catch (err) { + const message = err instanceof Error ? err.message : String(err); + this.safeEmitNetwork({ + id, + pending: false, + error: `Failed to read response body: ${message}`, + }); + } + }; + + void readStream(); + } else { + // Fallback for responses without body stream + clonedRes + .text() + .then((text) => { + const startTime = this.requestStartTimes.get(id); + const durationMs = startTime ? Date.now() - startTime : 0; + this.requestStartTimes.delete(id); + + this.safeEmitNetwork({ + id, + pending: false, + response: { + status: response.status, + headers: this.stringifyHeaders(response.headers), + body: text, + durationMs, + }, + }); + }) + .catch((err) => { + const message = err instanceof Error ? err.message : String(err); + this.safeEmitNetwork({ + id, + pending: false, + error: `Failed to read response body: ${message}`, + }); }); - }) - .catch((err) => { - const message = err instanceof Error ? err.message : String(err); - this.safeEmitNetwork({ - id, - pending: false, - error: `Failed to read response body: ${message}`, - }); - }); + } return response; } catch (err: unknown) { @@ -209,7 +311,8 @@ export class ActivityLogger extends EventEmitter { : options.href || `${protocol}//${options.hostname || options.host || 'localhost'}${options.path || '/'}`; - if (url.includes('127.0.0.1')) return originalFn.apply(http, args); + if (url.includes('127.0.0.1') || url.includes('localhost')) + return originalFn.apply(http, args); const headers = typeof options === 'object' && typeof options !== 'function' @@ -263,9 +366,24 @@ export class ActivityLogger extends EventEmitter { req.on('response', (res: any) => { const responseChunks: Buffer[] = []; - res.on('data', (chunk: Buffer) => - responseChunks.push(Buffer.from(chunk)), - ); + let chunkIndex = 0; + + res.on('data', (chunk: Buffer) => { + const chunkBuffer = Buffer.from(chunk); + responseChunks.push(chunkBuffer); + + // Emit chunk update for streaming + self.safeEmitNetwork({ + id, + pending: true, + chunk: { + index: chunkIndex++, + data: chunkBuffer.toString('utf8'), + timestamp: Date.now(), + }, + }); + }); + res.on('end', () => { const buffer = Buffer.concat(responseChunks); const encoding = res.headers['content-encoding']; @@ -323,53 +441,245 @@ export class ActivityLogger extends EventEmitter { } /** - * Registers the activity logger. - * Captures network and console logs to a session-specific JSONL file. + * Setup file-based logging to JSONL + */ +function setupFileLogging( + capture: ActivityLogger, + config: Config, + customPath?: string, +) { + const logFile = + customPath || + (config.storage + ? path.join( + config.storage.getProjectTempLogsDir(), + `session-${config.getSessionId()}.jsonl`, + ) + : null); + + if (!logFile) return; + + const logsDir = path.dirname(logFile); + if (!fs.existsSync(logsDir)) { + fs.mkdirSync(logsDir, { recursive: true }); + } + + const writeToLog = (type: 'console' | 'network', payload: unknown) => { + try { + const entry = + JSON.stringify({ + type, + payload, + sessionId: config.getSessionId(), + timestamp: Date.now(), + }) + '\n'; + + fs.promises.appendFile(logFile, entry).catch((err) => { + debugLogger.error('Failed to write to activity log:', err); + }); + } catch (err) { + debugLogger.error('Failed to prepare activity log entry:', err); + } + }; + + capture.on('console', (payload) => writeToLog('console', payload)); + capture.on('network', (payload) => writeToLog('network', payload)); +} + +/** + * Setup network-based logging via WebSocket + */ +function setupNetworkLogging( + capture: ActivityLogger, + host: string, + port: number, + config: Config, +) { + const buffer: Array> = []; + let ws: WebSocket | null = null; + let reconnectTimer: NodeJS.Timeout | null = null; + let sessionId: string | null = null; + let pingInterval: NodeJS.Timeout | null = null; + + const connect = () => { + try { + ws = new WebSocket(`ws://${host}:${port}/ws`); + + ws.on('open', () => { + debugLogger.debug(`WebSocket connected to ${host}:${port}`); + // Register with CLI's session ID + sendMessage({ + type: 'register', + sessionId: config.getSessionId(), + timestamp: Date.now(), + }); + }); + + ws.on('message', (data: Buffer) => { + try { + const message = JSON.parse(data.toString()); + handleServerMessage(message); + } catch (err) { + debugLogger.debug('Invalid WebSocket message:', err); + } + }); + + ws.on('close', () => { + debugLogger.debug(`WebSocket disconnected from ${host}:${port}`); + cleanup(); + scheduleReconnect(); + }); + + ws.on('error', (err) => { + debugLogger.debug(`WebSocket error:`, err); + }); + } catch (err) { + debugLogger.debug(`Failed to connect WebSocket:`, err); + scheduleReconnect(); + } + }; + + const handleServerMessage = (message: any) => { + switch (message.type) { + case 'registered': + sessionId = message.sessionId; + debugLogger.debug(`WebSocket session registered: ${sessionId}`); + + // Start ping interval + if (pingInterval) clearInterval(pingInterval); + pingInterval = setInterval(() => { + sendMessage({ type: 'pong', timestamp: Date.now() }); + }, 15000); + + // Flush buffered logs + flushBuffer(); + break; + + case 'ping': + sendMessage({ type: 'pong', timestamp: Date.now() }); + break; + + default: + // Ignore unknown message types + break; + } + }; + + const sendMessage = (message: any) => { + if (ws && ws.readyState === WebSocket.OPEN) { + ws.send(JSON.stringify(message)); + } + }; + + const sendToNetwork = (type: 'console' | 'network', payload: unknown) => { + const message = { + type, + payload, + sessionId: sessionId || config.getSessionId(), + timestamp: Date.now(), + }; + + // If not connected or network logging not enabled, buffer + if ( + !ws || + ws.readyState !== WebSocket.OPEN || + !capture.isNetworkLoggingEnabled() + ) { + buffer.push(message); + if (buffer.length > MAX_BUFFER_SIZE) buffer.shift(); + return; + } + + sendMessage(message); + }; + + const flushBuffer = () => { + if ( + !ws || + ws.readyState !== WebSocket.OPEN || + !capture.isNetworkLoggingEnabled() + ) { + return; + } + + debugLogger.debug(`Flushing ${buffer.length} buffered logs...`); + while (buffer.length > 0) { + const message = buffer.shift()!; + sendMessage(message); + } + }; + + const cleanup = () => { + if (pingInterval) { + clearInterval(pingInterval); + pingInterval = null; + } + ws = null; + }; + + const scheduleReconnect = () => { + if (reconnectTimer) return; + + reconnectTimer = setTimeout(() => { + reconnectTimer = null; + debugLogger.debug('Reconnecting WebSocket...'); + connect(); + }, 5000); + }; + + // Initial connection + connect(); + + capture.on('console', (payload) => sendToNetwork('console', payload)); + capture.on('network', (payload) => sendToNetwork('network', payload)); + capture.on('network-logging-enabled', () => { + debugLogger.debug('Network logging enabled, flushing buffer...'); + flushBuffer(); + }); + + // Cleanup on process exit + process.on('exit', () => { + if (reconnectTimer) clearTimeout(reconnectTimer); + if (ws) ws.close(); + cleanup(); + }); +} + +/** + * Registers the activity logger if debug mode and interactive session are enabled. + * Captures network and console logs to a session-specific JSONL file or sends to network. * - * The log file location can be overridden via the GEMINI_CLI_ACTIVITY_LOG_FILE - * environment variable. If not set, defaults to logs/session-{sessionId}.jsonl - * in the project's temp directory. + * Environment variable GEMINI_CLI_ACTIVITY_LOG_TARGET controls the output: + * - host:port format (e.g., "localhost:25417") → network mode (auto-enabled) + * - file path (e.g., "/tmp/logs.jsonl") → file mode (immediate) + * - not set → uses default file location in project temp logs dir * * @param config The CLI configuration */ export function registerActivityLogger(config: Config) { - if (config.storage) { - const capture = ActivityLogger.getInstance(); - capture.enable(); + const target = process.env['GEMINI_CLI_ACTIVITY_LOG_TARGET']; + const hostPort = target ? parseHostPort(target) : null; - const logsDir = config.storage.getProjectTempLogsDir(); - if (!fs.existsSync(logsDir)) { - fs.mkdirSync(logsDir, { recursive: true }); - } - - const logFile = - process.env['GEMINI_CLI_ACTIVITY_LOG_FILE'] || - path.join(logsDir, `session-${config.getSessionId()}.jsonl`); - - const writeToLog = (type: 'console' | 'network', payload: unknown) => { - try { - const entry = - JSON.stringify({ - type, - payload, - timestamp: Date.now(), - }) + '\n'; - - // Use asynchronous fire-and-forget to avoid blocking the event loop - fs.promises.appendFile(logFile, entry).catch((err) => { - debugLogger.error('Failed to write to activity log:', err); - }); - } catch (err) { - debugLogger.error('Failed to prepare activity log entry:', err); - } - }; - - capture.on('console', (payload) => writeToLog('console', payload)); - capture.on('network', (payload) => writeToLog('network', payload)); - - // Bridge CoreEvents to local capture - coreEvents.on(CoreEvent.ConsoleLog, (payload) => { - capture.logConsole(payload); - }); + // Network mode doesn't need storage; file mode does + if (!hostPort && !config.storage) { + return; } + + const capture = ActivityLogger.getInstance(); + capture.enable(); + + if (hostPort) { + // Network mode: send logs via WebSocket + setupNetworkLogging(capture, hostPort.host, hostPort.port, config); + // Auto-enable network logging when target is explicitly configured + capture.enableNetworkLogging(); + } else { + // File mode: write to JSONL file + setupFileLogging(capture, config, target); + } + + // Bridge CoreEvents to local capture + coreEvents.on(CoreEvent.ConsoleLog, (payload) => { + capture.logConsole(payload); + }); }