diff --git a/packages/cli/src/external-listener.ts b/packages/cli/src/external-listener.ts new file mode 100644 index 0000000000..72fbe8b13e --- /dev/null +++ b/packages/cli/src/external-listener.ts @@ -0,0 +1,453 @@ +/** + * @license + * Copyright 2026 Google LLC + * SPDX-License-Identifier: Apache-2.0 + */ + +import http from 'node:http'; +import { writeFileSync, mkdirSync, unlinkSync } from 'node:fs'; +import { join } from 'node:path'; +import os from 'node:os'; +import crypto from 'node:crypto'; +import { appEvents, AppEvent } from './utils/events.js'; + +// --- A2A Task management --- + +interface A2AResponseMessage { + kind: 'message'; + role: 'agent'; + parts: Array<{ kind: 'text'; text: string }>; + messageId: string; +} + +interface A2ATask { + id: string; + contextId: string; + status: { + state: 'submitted' | 'working' | 'completed' | 'failed'; + timestamp: string; + message?: A2AResponseMessage; + }; +} + +const tasks = new Map(); + +const TASK_CLEANUP_DELAY_MS = 10 * 60 * 1000; // 10 minutes +const DEFAULT_BLOCKING_TIMEOUT_MS = 5 * 60 * 1000; // 5 minutes + +interface ResponseWaiter { + taskId: string; + resolve: (text: string) => void; +} + +const responseWaiters: ResponseWaiter[] = []; + +// Queue for unsolicited responses (e.g. Sisyphus auto-resume output) +const unsolicitedResponses: string[] = []; + +/** + * Called by AppContainer when streaming transitions from non-Idle to Idle. + * If there's a pending A2A task, resolves it. Otherwise queues as unsolicited. + */ +export function notifyResponse(responseText: string): void { + if (!responseText) return; + + const waiter = responseWaiters.shift(); + if (!waiter) { + // No A2A task waiting — queue as unsolicited (Sisyphus, etc.) + unsolicitedResponses.push(responseText); + return; + } + + const task = tasks.get(waiter.taskId); + if (task) { + task.status = { + state: 'completed', + timestamp: new Date().toISOString(), + message: { + kind: 'message', + role: 'agent', + parts: [{ kind: 'text', text: responseText }], + messageId: crypto.randomUUID(), + }, + }; + scheduleTaskCleanup(task.id); + } + + waiter.resolve(responseText); +} + +/** + * Drain all unsolicited responses (from Sisyphus auto-resume, etc.). + */ +export function drainUnsolicitedResponses(): string[] { + return unsolicitedResponses.splice(0, unsolicitedResponses.length); +} + +/** + * Returns true if there are any in-flight tasks waiting for a response. + */ +export function hasPendingTasks(): boolean { + return responseWaiters.length > 0; +} + +/** + * Called when streaming starts (Idle -> non-Idle) to mark the oldest + * submitted task as "working". + */ +export function markTasksWorking(): void { + const waiter = responseWaiters[0]; + if (!waiter) return; + const task = tasks.get(waiter.taskId); + if (task && task.status.state === 'submitted') { + task.status = { + state: 'working', + timestamp: new Date().toISOString(), + }; + } +} + +function scheduleTaskCleanup(taskId: string): void { + setTimeout(() => { + tasks.delete(taskId); + }, TASK_CLEANUP_DELAY_MS); +} + +function createTask(): A2ATask { + const task: A2ATask = { + id: crypto.randomUUID(), + contextId: `session-${process.pid}`, + status: { + state: 'submitted', + timestamp: new Date().toISOString(), + }, + }; + tasks.set(task.id, task); + return task; +} + +function formatTaskResult(task: A2ATask): object { + return { + kind: 'task', + id: task.id, + contextId: task.contextId, + status: task.status, + }; +} + +// --- JSON-RPC helpers --- + +interface JsonRpcRequest { + jsonrpc?: string; + id?: string | number | null; + method?: string; + params?: Record; +} + +function jsonRpcSuccess(id: string | number | null, result: object): object { + return { jsonrpc: '2.0', id, result }; +} + +function jsonRpcError( + id: string | number | null, + code: number, + message: string, +): object { + return { jsonrpc: '2.0', id, error: { code, message } }; +} + +// --- HTTP utilities --- + +function getSessionsDir(): string { + return join(os.homedir(), '.gemini', 'sessions'); +} + +function getPortFilePath(): string { + return join(getSessionsDir(), `interactive-${process.pid}.port`); +} + +function buildAgentCard(port: number): object { + return { + name: 'Gemini CLI Interactive Session', + url: `http://localhost:${port}/`, + protocolVersion: '0.3.0', + provider: { organization: 'Google', url: 'https://google.com' }, + capabilities: { streaming: false, pushNotifications: false }, + defaultInputModes: ['text'], + defaultOutputModes: ['text'], + skills: [ + { + id: 'interactive_session', + name: 'Interactive Session', + description: 'Send messages to the live interactive Gemini CLI session', + }, + ], + }; +} + +interface A2AMessagePart { + kind?: string; + text?: string; +} + +function extractTextFromParts( + parts: A2AMessagePart[] | undefined, +): string | null { + if (!Array.isArray(parts)) { + return null; + } + const texts: string[] = []; + for (const part of parts) { + if (part.kind === 'text' && typeof part.text === 'string') { + texts.push(part.text); + } + } + return texts.length > 0 ? texts.join('\n') : null; +} + +function sendJson( + res: http.ServerResponse, + statusCode: number, + data: object, +): void { + const body = JSON.stringify(data); + res.writeHead(statusCode, { + 'Content-Type': 'application/json', + 'Content-Length': Buffer.byteLength(body), + }); + res.end(body); +} + +function readBody(req: http.IncomingMessage): Promise { + return new Promise((resolve, reject) => { + const chunks: Buffer[] = []; + let size = 0; + const maxSize = 1024 * 1024; // 1MB limit + req.on('data', (chunk: Buffer) => { + size += chunk.length; + if (size > maxSize) { + req.destroy(); + reject(new Error('Request body too large')); + return; + } + chunks.push(chunk); + }); + req.on('end', () => resolve(Buffer.concat(chunks).toString('utf-8'))); + req.on('error', reject); + }); +} + +// --- JSON-RPC request handlers --- + +function handleMessageSend( + rpcId: string | number | null, + params: Record, + res: http.ServerResponse, +): void { + const messageVal = params['message']; + const message = + messageVal && typeof messageVal === 'object' + ? (messageVal as { role?: string; parts?: A2AMessagePart[] }) + : undefined; + const text = extractTextFromParts(message?.parts); + if (!text) { + sendJson( + res, + 200, + jsonRpcError( + rpcId, + -32602, + 'Missing or empty text. Expected: params.message.parts with kind "text".', + ), + ); + return; + } + + const task = createTask(); + + // Inject message into the session + appEvents.emit(AppEvent.ExternalMessage, text); + + // Block until response (standard A2A message/send semantics) + const timer = setTimeout(() => { + const idx = responseWaiters.findIndex((w) => w.taskId === task.id); + if (idx !== -1) { + responseWaiters.splice(idx, 1); + } + task.status = { + state: 'failed', + timestamp: new Date().toISOString(), + }; + scheduleTaskCleanup(task.id); + sendJson(res, 200, jsonRpcError(rpcId, -32000, 'Request timed out')); + }, DEFAULT_BLOCKING_TIMEOUT_MS); + + responseWaiters.push({ + taskId: task.id, + resolve: () => { + clearTimeout(timer); + // Task is already updated in notifyResponse + const updatedTask = tasks.get(task.id); + sendJson( + res, + 200, + jsonRpcSuccess(rpcId, formatTaskResult(updatedTask ?? task)), + ); + }, + }); +} + +function handleResponsesPoll( + rpcId: string | number | null, + res: http.ServerResponse, +): void { + const responses = drainUnsolicitedResponses(); + sendJson(res, 200, jsonRpcSuccess(rpcId, { responses })); +} + +function handleTasksGet( + rpcId: string | number | null, + params: Record, + res: http.ServerResponse, +): void { + const taskId = params['id']; + if (typeof taskId !== 'string') { + sendJson( + res, + 200, + jsonRpcError(rpcId, -32602, 'Missing or invalid params.id'), + ); + return; + } + + const task = tasks.get(taskId); + if (!task) { + sendJson(res, 200, jsonRpcError(rpcId, -32001, 'Task not found')); + return; + } + + sendJson(res, 200, jsonRpcSuccess(rpcId, formatTaskResult(task))); +} + +// --- Server --- + +export interface ExternalListenerResult { + port: number; + cleanup: () => void; +} + +/** + * Start an embedded HTTP server that accepts A2A-format JSON-RPC messages + * and bridges them into the interactive session's message queue. + */ +export function startExternalListener(options?: { + port?: number; +}): Promise { + const port = options?.port ?? 0; + + return new Promise((resolve, reject) => { + const server = http.createServer( + (req: http.IncomingMessage, res: http.ServerResponse) => { + const url = new URL(req.url ?? '/', `http://localhost`); + + // GET /.well-known/agent-card.json + if ( + req.method === 'GET' && + url.pathname === '/.well-known/agent-card.json' + ) { + const address = server.address(); + const actualPort = + typeof address === 'object' && address ? address.port : port; + sendJson(res, 200, buildAgentCard(actualPort)); + return; + } + + // POST / — JSON-RPC 2.0 routing + if (req.method === 'POST' && url.pathname === '/') { + readBody(req) + .then((rawBody) => { + let parsed: JsonRpcRequest; + try { + // eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion + parsed = JSON.parse(rawBody) as JsonRpcRequest; + } catch { + sendJson( + res, + 200, + jsonRpcError(null, -32700, 'Parse error: invalid JSON'), + ); + return; + } + + const rpcId = parsed.id ?? null; + const method = parsed.method; + const params = parsed.params ?? {}; + + switch (method) { + case 'message/send': + handleMessageSend(rpcId, params, res); + break; + case 'tasks/get': + handleTasksGet(rpcId, params, res); + break; + case 'responses/poll': + handleResponsesPoll(rpcId, res); + break; + default: + sendJson( + res, + 200, + jsonRpcError( + rpcId, + -32601, + `Method not found: ${method ?? '(none)'}`, + ), + ); + } + }) + .catch(() => { + sendJson( + res, + 200, + jsonRpcError(null, -32603, 'Failed to read request body'), + ); + }); + return; + } + + // 404 for everything else + sendJson(res, 404, { error: 'Not found' }); + }, + ); + + server.listen(port, '127.0.0.1', () => { + const address = server.address(); + const actualPort = + typeof address === 'object' && address ? address.port : port; + + // Write port file + try { + const sessionsDir = getSessionsDir(); + mkdirSync(sessionsDir, { recursive: true }); + writeFileSync(getPortFilePath(), String(actualPort), 'utf-8'); + } catch { + // Non-fatal: port file is a convenience, not a requirement + } + + const cleanup = () => { + server.close(); + try { + unlinkSync(getPortFilePath()); + } catch { + // Ignore: file may already be deleted + } + }; + + resolve({ port: actualPort, cleanup }); + }); + + server.on('error', (err) => { + reject(err); + }); + }); +} diff --git a/packages/cli/src/gemini.tsx b/packages/cli/src/gemini.tsx index 331ec0c018..067688306c 100644 --- a/packages/cli/src/gemini.tsx +++ b/packages/cli/src/gemini.tsx @@ -26,6 +26,7 @@ import { getStartupWarnings } from './utils/startupWarnings.js'; import { getUserStartupWarnings } from './utils/userStartupWarnings.js'; import { ConsolePatcher } from './ui/utils/ConsolePatcher.js'; import { runNonInteractive } from './nonInteractiveCli.js'; +import { startExternalListener } from './external-listener.js'; import { cleanupCheckpoints, registerCleanup, @@ -322,6 +323,26 @@ export async function startInteractiveUI( registerCleanup(() => instance.unmount()); + // Auto-start A2A HTTP listener in Forever Mode + if (config.getIsForeverMode()) { + const sisyphusMode = config.getSisyphusMode(); + const a2aPort = sisyphusMode.a2aPort ?? 0; + try { + const listener = await startExternalListener({ port: a2aPort }); + registerCleanup(listener.cleanup); + appEvents.emit(AppEvent.A2AListenerStarted, listener.port); + coreEvents.emitFeedback( + 'info', + `A2A endpoint listening on port ${listener.port}`, + ); + } catch (err) { + coreEvents.emitFeedback( + 'warning', + `Failed to start A2A listener: ${err instanceof Error ? err.message : String(err)}`, + ); + } + } + registerCleanup(setupTtyCheck()); } diff --git a/packages/cli/src/ui/AppContainer.tsx b/packages/cli/src/ui/AppContainer.tsx index 67f2d5dd84..40a63bf607 100644 --- a/packages/cli/src/ui/AppContainer.tsx +++ b/packages/cli/src/ui/AppContainer.tsx @@ -126,6 +126,7 @@ import { useFolderTrust } from './hooks/useFolderTrust.js'; import { useIdeTrustListener } from './hooks/useIdeTrustListener.js'; import { type IdeIntegrationNudgeResult } from './IdeIntegrationNudge.js'; import { appEvents, AppEvent, TransientMessageType } from '../utils/events.js'; +import { notifyResponse, markTasksWorking } from '../external-listener.js'; import { type UpdateObject } from './utils/updateCheck.js'; import { setUpdateHandler } from '../utils/handleAutoUpdate.js'; import { registerCleanup, runExitCleanup } from '../utils/cleanup.js'; @@ -1203,6 +1204,63 @@ Logging in with Google... Restarting Gemini CLI to continue. isMcpReady, }); + // --- A2A listener integration --- + const [a2aListenerPort, setA2aListenerPort] = useState(null); + + useEffect(() => { + const handler = (port: number) => { + setA2aListenerPort(port); + }; + appEvents.on(AppEvent.A2AListenerStarted, handler); + return () => { + appEvents.off(AppEvent.A2AListenerStarted, handler); + }; + }, []); + + // Bridge external messages from A2A HTTP listener to message queue + useEffect(() => { + const handler = (text: string) => { + addMessage(text); + }; + appEvents.on(AppEvent.ExternalMessage, handler); + return () => { + appEvents.off(AppEvent.ExternalMessage, handler); + }; + }, [addMessage]); + + // Track streaming state transitions for A2A response capture + const prevStreamingStateRef = useRef(streamingState); + + useEffect(() => { + const prev = prevStreamingStateRef.current; + prevStreamingStateRef.current = streamingState; + + // Mark tasks as "working" when streaming starts + if ( + prev === StreamingState.Idle && + streamingState !== StreamingState.Idle + ) { + markTasksWorking(); + } + + // Capture response when streaming ends (for A2A tasks or unsolicited output) + if ( + prev !== StreamingState.Idle && + streamingState === StreamingState.Idle + ) { + const history = historyManager.history; + const parts: string[] = []; + for (let i = history.length - 1; i >= 0; i--) { + const item = history[i]; + if (item.type !== 'gemini' && item.type !== 'gemini_content') break; + if (typeof item.text === 'string' && item.text) { + parts.unshift(item.text); + } + } + notifyResponse(parts.join('\n')); + } + }, [streamingState, historyManager.history]); + cancelHandlerRef.current = useCallback( (shouldRestorePrompt: boolean = true) => { const pendingHistoryItems = [ @@ -2265,6 +2323,7 @@ Logging in with Google... Restarting Gemini CLI to continue. adminSettingsChanged, newAgents, showIsExpandableHint, + a2aListenerPort, hintMode: config.isModelSteeringEnabled() && isToolExecuting([ @@ -2393,6 +2452,7 @@ Logging in with Google... Restarting Gemini CLI to continue. adminSettingsChanged, newAgents, showIsExpandableHint, + a2aListenerPort, ], ); diff --git a/packages/cli/src/ui/components/Composer.test.tsx b/packages/cli/src/ui/components/Composer.test.tsx index b1f804dd42..4eaa899d51 100644 --- a/packages/cli/src/ui/components/Composer.test.tsx +++ b/packages/cli/src/ui/components/Composer.test.tsx @@ -202,6 +202,7 @@ const createMockUIState = (overrides: Partial = {}): UIState => activeHooks: [], isBackgroundShellVisible: false, embeddedShellFocused: false, + a2aListenerPort: null, quota: { userTier: undefined, stats: undefined, diff --git a/packages/cli/src/ui/components/StatusDisplay.test.tsx b/packages/cli/src/ui/components/StatusDisplay.test.tsx index fcb66ea0b2..2a063df688 100644 --- a/packages/cli/src/ui/components/StatusDisplay.test.tsx +++ b/packages/cli/src/ui/components/StatusDisplay.test.tsx @@ -52,6 +52,7 @@ const createMockUIState = (overrides: UIStateOverrides = {}): UIState => geminiMdFileCount: 0, contextFileNames: [], backgroundShellCount: 0, + a2aListenerPort: null, buffer: { text: '' }, history: [{ id: 1, type: 'user', text: 'test' }], ...overrides, @@ -171,4 +172,16 @@ describe('StatusDisplay', () => { expect(lastFrame()).toContain('Shells: 3'); unmount(); }); + + it('renders A2A listener port when a2aListenerPort is set', async () => { + const uiState = createMockUIState({ + a2aListenerPort: 8080, + }); + const { lastFrame, unmount } = await renderStatusDisplay( + { hideContextSummary: false }, + uiState, + ); + expect(lastFrame()).toContain('A2A :8080'); + unmount(); + }); }); diff --git a/packages/cli/src/ui/components/StatusDisplay.tsx b/packages/cli/src/ui/components/StatusDisplay.tsx index 223340c039..7a0baa524a 100644 --- a/packages/cli/src/ui/components/StatusDisplay.tsx +++ b/packages/cli/src/ui/components/StatusDisplay.tsx @@ -28,6 +28,12 @@ export const StatusDisplay: React.FC = ({ return |⌐■_■|; } + if (uiState.a2aListenerPort !== null) { + return ( + ⚡ A2A :{uiState.a2aListenerPort} + ); + } + if ( uiState.activeHooks.length > 0 && settings.merged.hooksConfig.notifications diff --git a/packages/cli/src/ui/contexts/UIStateContext.tsx b/packages/cli/src/ui/contexts/UIStateContext.tsx index ea9025aa6b..9b9e138aae 100644 --- a/packages/cli/src/ui/contexts/UIStateContext.tsx +++ b/packages/cli/src/ui/contexts/UIStateContext.tsx @@ -223,6 +223,7 @@ export interface UIState { showIsExpandableHint: boolean; hintMode: boolean; hintBuffer: string; + a2aListenerPort: number | null; transientMessage: { text: string; type: TransientMessageType; diff --git a/packages/cli/src/utils/events.ts b/packages/cli/src/utils/events.ts index 8291528ac1..42340ee5c1 100644 --- a/packages/cli/src/utils/events.ts +++ b/packages/cli/src/utils/events.ts @@ -23,6 +23,8 @@ export enum AppEvent { PasteTimeout = 'paste-timeout', TerminalBackground = 'terminal-background', TransientMessage = 'transient-message', + ExternalMessage = 'external-message', + A2AListenerStarted = 'a2a-listener-started', } export interface AppEvents { @@ -32,6 +34,8 @@ export interface AppEvents { [AppEvent.PasteTimeout]: never[]; [AppEvent.TerminalBackground]: [string]; [AppEvent.TransientMessage]: [TransientMessagePayload]; + [AppEvent.ExternalMessage]: [string]; + [AppEvent.A2AListenerStarted]: [number]; } export const appEvents = new EventEmitter();