diff --git a/packages/cli/src/config/config.test.ts b/packages/cli/src/config/config.test.ts index 6e1132f5ae..0f24165df4 100644 --- a/packages/cli/src/config/config.test.ts +++ b/packages/cli/src/config/config.test.ts @@ -668,11 +668,20 @@ describe('parseArguments', () => { expect(argv.isCommand).toBe(true); }); - it('should correctly parse the --forever flag', async () => { + it('should correctly parse the --forever flag and set default a2aPort to 0', async () => { process.argv = ['node', 'script.js', '--forever']; const settings = createTestMergedSettings({}); const argv = await parseArguments(settings); expect(argv.forever).toBe(true); + expect(argv.a2aPort).toBe(0); + }); + + it('should not override explicit a2aPort when --forever is specified', async () => { + process.argv = ['node', 'script.js', '--forever', '--a2a-port', '8080']; + const settings = createTestMergedSettings({}); + const argv = await parseArguments(settings); + expect(argv.forever).toBe(true); + expect(argv.a2aPort).toBe(8080); }); }); diff --git a/packages/cli/src/config/config.ts b/packages/cli/src/config/config.ts index fe5034b415..44dd66cbe0 100755 --- a/packages/cli/src/config/config.ts +++ b/packages/cli/src/config/config.ts @@ -103,6 +103,7 @@ export interface CliArgs { rawOutput: boolean | undefined; acceptRawOutputRisk: boolean | undefined; isCommand: boolean | undefined; + a2aPort: number | undefined; } export async function parseArguments( @@ -299,6 +300,12 @@ export async function parseArguments( .option('accept-raw-output-risk', { type: 'boolean', description: 'Suppress the security warning when using --raw-output.', + }) + .option('a2a-port', { + type: 'number', + nargs: 1, + description: + 'Enable the embedded A2A HTTP listener on the specified port (0 for random). Implies --a2a enabled.', }), ) // Register MCP subcommands @@ -400,8 +407,15 @@ export async function parseArguments( (result as Record)['query'] = q || undefined; (result as Record)['startupMessages'] = startupMessages; - // The import format is now only controlled by settings.memoryImportFormat - // We no longer accept it as a CLI argument + // Enable A2A listener by default in Forever Mode + if ( + result['forever'] && + result['a2aPort'] === undefined && + result['a2a-port'] === undefined + ) { + (result as Record)['a2aPort'] = 0; + } + // The import format is now only controlled by settings.memoryImportFormat // We no longer accept it as a CLI argument // eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion return result as unknown as CliArgs; } diff --git a/packages/cli/src/external-listener.ts b/packages/cli/src/external-listener.ts new file mode 100644 index 0000000000..5262d78970 --- /dev/null +++ b/packages/cli/src/external-listener.ts @@ -0,0 +1,426 @@ +/** + * @license + * Copyright 2025 Google LLC + * SPDX-License-Identifier: Apache-2.0 + */ + +import http from 'node:http'; +import { writeFileSync, mkdirSync, unlinkSync } from 'node:fs'; +import { join } from 'node:path'; +import os from 'node:os'; +import crypto from 'node:crypto'; +import { appEvents, AppEvent } from './utils/events.js'; + +// --- A2A Task management --- + +interface A2AResponseMessage { + kind: 'message'; + role: 'agent'; + parts: Array<{ kind: 'text'; text: string }>; + messageId: string; +} + +interface A2ATask { + id: string; + contextId: string; + status: { + state: 'submitted' | 'working' | 'completed' | 'failed'; + timestamp: string; + message?: A2AResponseMessage; + }; +} + +const tasks = new Map(); + +const TASK_CLEANUP_DELAY_MS = 10 * 60 * 1000; // 10 minutes +const DEFAULT_BLOCKING_TIMEOUT_MS = 5 * 60 * 1000; // 5 minutes + +interface ResponseWaiter { + taskId: string; + resolve: (text: string) => void; +} + +const responseWaiters: ResponseWaiter[] = []; + +/** + * Called by AppContainer when streaming transitions from non-Idle to Idle. + * Resolves the oldest blocking waiter (FIFO) and completes its task. + */ +export function notifyResponse(responseText: string): void { + const waiter = responseWaiters.shift(); + if (!waiter) return; + + const task = tasks.get(waiter.taskId); + if (task) { + task.status = { + state: 'completed', + timestamp: new Date().toISOString(), + message: { + kind: 'message', + role: 'agent', + parts: [{ kind: 'text', text: responseText }], + messageId: crypto.randomUUID(), + }, + }; + scheduleTaskCleanup(task.id); + } + + waiter.resolve(responseText); +} + +/** + * Returns true if there are any in-flight tasks waiting for a response. + */ +export function hasPendingTasks(): boolean { + return responseWaiters.length > 0; +} + +/** + * Called when streaming starts (Idle -> non-Idle) to mark the oldest + * submitted task as "working". + */ +export function markTasksWorking(): void { + const waiter = responseWaiters[0]; + if (!waiter) return; + const task = tasks.get(waiter.taskId); + if (task && task.status.state === 'submitted') { + task.status = { + state: 'working', + timestamp: new Date().toISOString(), + }; + } +} + +function scheduleTaskCleanup(taskId: string): void { + setTimeout(() => { + tasks.delete(taskId); + }, TASK_CLEANUP_DELAY_MS); +} + +function createTask(): A2ATask { + const task: A2ATask = { + id: crypto.randomUUID(), + contextId: `session-${process.pid}`, + status: { + state: 'submitted', + timestamp: new Date().toISOString(), + }, + }; + tasks.set(task.id, task); + return task; +} + +function formatTaskResult(task: A2ATask): object { + return { + kind: 'task', + id: task.id, + contextId: task.contextId, + status: task.status, + }; +} + +// --- JSON-RPC helpers --- + +interface JsonRpcRequest { + jsonrpc?: string; + id?: string | number | null; + method?: string; + params?: Record; +} + +function jsonRpcSuccess(id: string | number | null, result: object): object { + return { jsonrpc: '2.0', id, result }; +} + +function jsonRpcError( + id: string | number | null, + code: number, + message: string, +): object { + return { jsonrpc: '2.0', id, error: { code, message } }; +} + +// --- HTTP utilities --- + +function getSessionsDir(): string { + return join(os.homedir(), '.gemini', 'sessions'); +} + +function getPortFilePath(): string { + return join(getSessionsDir(), `interactive-${process.pid}.port`); +} + +function buildAgentCard(port: number): object { + return { + name: 'Gemini CLI Interactive Session', + url: `http://localhost:${port}/`, + protocolVersion: '0.3.0', + provider: { organization: 'Google', url: 'https://google.com' }, + capabilities: { streaming: false, pushNotifications: false }, + defaultInputModes: ['text'], + defaultOutputModes: ['text'], + skills: [ + { + id: 'interactive_session', + name: 'Interactive Session', + description: 'Send messages to the live interactive Gemini CLI session', + }, + ], + }; +} + +interface A2AMessagePart { + kind?: string; + text?: string; +} + +function extractTextFromParts( + parts: A2AMessagePart[] | undefined, +): string | null { + if (!Array.isArray(parts)) { + return null; + } + const texts: string[] = []; + for (const part of parts) { + if (part.kind === 'text' && typeof part.text === 'string') { + texts.push(part.text); + } + } + return texts.length > 0 ? texts.join('\n') : null; +} + +function sendJson( + res: http.ServerResponse, + statusCode: number, + data: object, +): void { + const body = JSON.stringify(data); + res.writeHead(statusCode, { + 'Content-Type': 'application/json', + 'Content-Length': Buffer.byteLength(body), + }); + res.end(body); +} + +function readBody(req: http.IncomingMessage): Promise { + return new Promise((resolve, reject) => { + const chunks: Buffer[] = []; + let size = 0; + const maxSize = 1024 * 1024; // 1MB limit + req.on('data', (chunk: Buffer) => { + size += chunk.length; + if (size > maxSize) { + req.destroy(); + reject(new Error('Request body too large')); + return; + } + chunks.push(chunk); + }); + req.on('end', () => resolve(Buffer.concat(chunks).toString('utf-8'))); + req.on('error', reject); + }); +} + +// --- JSON-RPC request handlers --- + +function handleMessageSend( + rpcId: string | number | null, + params: Record, + res: http.ServerResponse, +): void { + const messageVal = params['message']; + const message = + messageVal && typeof messageVal === 'object' + ? (messageVal as { role?: string; parts?: A2AMessagePart[] }) + : undefined; + const text = extractTextFromParts(message?.parts); + if (!text) { + sendJson( + res, + 200, + jsonRpcError( + rpcId, + -32602, + 'Missing or empty text. Expected: params.message.parts with kind "text".', + ), + ); + return; + } + + const task = createTask(); + + // Inject message into the session + appEvents.emit(AppEvent.ExternalMessage, text); + + // Block until response (standard A2A message/send semantics) + const timer = setTimeout(() => { + const idx = responseWaiters.findIndex((w) => w.taskId === task.id); + if (idx !== -1) { + responseWaiters.splice(idx, 1); + } + task.status = { + state: 'failed', + timestamp: new Date().toISOString(), + }; + scheduleTaskCleanup(task.id); + sendJson(res, 200, jsonRpcError(rpcId, -32000, 'Request timed out')); + }, DEFAULT_BLOCKING_TIMEOUT_MS); + + responseWaiters.push({ + taskId: task.id, + resolve: () => { + clearTimeout(timer); + // Task is already updated in notifyResponse + const updatedTask = tasks.get(task.id); + sendJson( + res, + 200, + jsonRpcSuccess(rpcId, formatTaskResult(updatedTask ?? task)), + ); + }, + }); +} + +function handleTasksGet( + rpcId: string | number | null, + params: Record, + res: http.ServerResponse, +): void { + const taskId = params['id']; + if (typeof taskId !== 'string') { + sendJson( + res, + 200, + jsonRpcError(rpcId, -32602, 'Missing or invalid params.id'), + ); + return; + } + + const task = tasks.get(taskId); + if (!task) { + sendJson(res, 200, jsonRpcError(rpcId, -32001, 'Task not found')); + return; + } + + sendJson(res, 200, jsonRpcSuccess(rpcId, formatTaskResult(task))); +} + +// --- Server --- + +export interface ExternalListenerResult { + port: number; + cleanup: () => void; +} + +/** + * Start an embedded HTTP server that accepts A2A-format JSON-RPC messages + * and bridges them into the interactive session's message queue. + */ +export function startExternalListener(options?: { + port?: number; +}): Promise { + const port = options?.port ?? 0; + + return new Promise((resolve, reject) => { + const server = http.createServer( + (req: http.IncomingMessage, res: http.ServerResponse) => { + const url = new URL(req.url ?? '/', `http://localhost`); + + // GET /.well-known/agent-card.json + if ( + req.method === 'GET' && + url.pathname === '/.well-known/agent-card.json' + ) { + const address = server.address(); + const actualPort = + typeof address === 'object' && address ? address.port : port; + sendJson(res, 200, buildAgentCard(actualPort)); + return; + } + + // POST / — JSON-RPC 2.0 routing + if (req.method === 'POST' && url.pathname === '/') { + readBody(req) + .then((rawBody) => { + let parsed: JsonRpcRequest; + try { + // eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion + parsed = JSON.parse(rawBody) as JsonRpcRequest; + } catch { + sendJson( + res, + 200, + jsonRpcError(null, -32700, 'Parse error: invalid JSON'), + ); + return; + } + + const rpcId = parsed.id ?? null; + const method = parsed.method; + const params = parsed.params ?? {}; + + switch (method) { + case 'message/send': + handleMessageSend(rpcId, params, res); + break; + case 'tasks/get': + handleTasksGet(rpcId, params, res); + break; + default: + sendJson( + res, + 200, + jsonRpcError( + rpcId, + -32601, + `Method not found: ${method ?? '(none)'}`, + ), + ); + } + }) + .catch(() => { + sendJson( + res, + 200, + jsonRpcError(null, -32603, 'Failed to read request body'), + ); + }); + return; + } + + // 404 for everything else + sendJson(res, 404, { error: 'Not found' }); + }, + ); + + server.listen(port, '127.0.0.1', () => { + const address = server.address(); + const actualPort = + typeof address === 'object' && address ? address.port : port; + + // Write port file + try { + const sessionsDir = getSessionsDir(); + mkdirSync(sessionsDir, { recursive: true }); + writeFileSync(getPortFilePath(), String(actualPort), 'utf-8'); + } catch { + // Non-fatal: port file is a convenience, not a requirement + } + + const cleanup = () => { + server.close(); + try { + unlinkSync(getPortFilePath()); + } catch { + // Ignore: file may already be deleted + } + }; + + resolve({ port: actualPort, cleanup }); + }); + + server.on('error', (err) => { + reject(err); + }); + }); +} diff --git a/packages/cli/src/gemini.test.tsx b/packages/cli/src/gemini.test.tsx index 80a5b8d090..d6755aa25e 100644 --- a/packages/cli/src/gemini.test.tsx +++ b/packages/cli/src/gemini.test.tsx @@ -499,6 +499,7 @@ describe('gemini.tsx main function kitty protocol', () => { rawOutput: undefined, acceptRawOutputRisk: undefined, isCommand: undefined, + a2aPort: undefined, }); await act(async () => { diff --git a/packages/cli/src/gemini.tsx b/packages/cli/src/gemini.tsx index aa830c0250..6f897b526f 100644 --- a/packages/cli/src/gemini.tsx +++ b/packages/cli/src/gemini.tsx @@ -82,6 +82,7 @@ import { validateNonInteractiveAuth } from './validateNonInterActiveAuth.js'; import { checkForUpdates } from './ui/utils/updateCheck.js'; import { handleAutoUpdate } from './utils/handleAutoUpdate.js'; import { appEvents, AppEvent } from './utils/events.js'; +import { startExternalListener } from './external-listener.js'; import { SessionSelector } from './utils/sessionUtils.js'; import { SettingsContext } from './ui/contexts/SettingsContext.js'; import { MouseProvider } from './ui/contexts/MouseContext.js'; @@ -188,6 +189,7 @@ export async function startInteractiveUI( workspaceRoot: string = process.cwd(), resumedSessionData: ResumedSessionData | undefined, initializationResult: InitializationResult, + a2aPort?: number, ) { // Never enter Ink alternate buffer mode when screen reader mode is enabled // as there is no benefit of alternate buffer mode when using a screen reader @@ -319,6 +321,23 @@ export async function startInteractiveUI( }); registerCleanup(() => instance.unmount()); + + // Start embedded A2A HTTP listener if enabled via --a2a-port + if (a2aPort !== undefined) { + try { + const listener = await startExternalListener({ port: a2aPort }); + registerCleanup(listener.cleanup); + coreEvents.emitFeedback( + 'info', + `A2A endpoint listening on port ${listener.port}`, + ); + } catch (err) { + coreEvents.emitFeedback( + 'warning', + `Failed to start A2A listener: ${err instanceof Error ? err.message : String(err)}`, + ); + } + } } export async function main() { @@ -722,6 +741,7 @@ export async function main() { process.cwd(), resumedSessionData, initializationResult, + argv.a2aPort, ); return; } diff --git a/packages/cli/src/ui/AppContainer.tsx b/packages/cli/src/ui/AppContainer.tsx index 0c3a6e1d9d..e49aa62ecd 100644 --- a/packages/cli/src/ui/AppContainer.tsx +++ b/packages/cli/src/ui/AppContainer.tsx @@ -123,6 +123,11 @@ import { useFolderTrust } from './hooks/useFolderTrust.js'; import { useIdeTrustListener } from './hooks/useIdeTrustListener.js'; import { type IdeIntegrationNudgeResult } from './IdeIntegrationNudge.js'; import { appEvents, AppEvent, TransientMessageType } from '../utils/events.js'; +import { + notifyResponse, + hasPendingTasks, + markTasksWorking, +} from '../external-listener.js'; import { type UpdateObject } from './utils/updateCheck.js'; import { setUpdateHandler } from '../utils/handleAutoUpdate.js'; import { registerCleanup, runExitCleanup } from '../utils/cleanup.js'; @@ -1199,6 +1204,49 @@ Logging in with Google... Restarting Gemini CLI to continue. isMcpReady, }); + // Bridge external messages from A2A HTTP listener to message queue + useEffect(() => { + const handler = (text: string) => { + addMessage(text); + }; + appEvents.on(AppEvent.ExternalMessage, handler); + return () => { + appEvents.off(AppEvent.ExternalMessage, handler); + }; + }, [addMessage]); + + // Track streaming state transitions for A2A response capture + const prevStreamingStateRef = useRef(streamingState); + + useEffect(() => { + const prev = prevStreamingStateRef.current; + prevStreamingStateRef.current = streamingState; + + // Mark tasks as "working" when streaming starts + if ( + prev === StreamingState.Idle && + streamingState !== StreamingState.Idle + ) { + markTasksWorking(); + } + + // Capture response when streaming ends + if ( + prev !== StreamingState.Idle && + streamingState === StreamingState.Idle && + hasPendingTasks() + ) { + const lastResponse = historyManager.history + .slice() + .reverse() + .find((item) => item.type === 'gemini'); + notifyResponse( + typeof lastResponse?.text === 'string' ? lastResponse.text : '', + ); + } + // eslint-disable-next-line react-hooks/exhaustive-deps + }, [streamingState]); + cancelHandlerRef.current = useCallback( (shouldRestorePrompt: boolean = true) => { const pendingHistoryItems = [ diff --git a/packages/cli/src/utils/events.ts b/packages/cli/src/utils/events.ts index 8291528ac1..b8088da24f 100644 --- a/packages/cli/src/utils/events.ts +++ b/packages/cli/src/utils/events.ts @@ -23,6 +23,7 @@ export enum AppEvent { PasteTimeout = 'paste-timeout', TerminalBackground = 'terminal-background', TransientMessage = 'transient-message', + ExternalMessage = 'external-message', } export interface AppEvents { @@ -32,6 +33,7 @@ export interface AppEvents { [AppEvent.PasteTimeout]: never[]; [AppEvent.TerminalBackground]: [string]; [AppEvent.TransientMessage]: [TransientMessagePayload]; + [AppEvent.ExternalMessage]: [string]; } export const appEvents = new EventEmitter();