From 78ec69035c8440c866135c199cc6ea7c7ee01fc5 Mon Sep 17 00:00:00 2001 From: Sandy Tao Date: Tue, 3 Mar 2026 11:33:29 -0800 Subject: [PATCH] feat: integrate A2A listener into Forever Mode - Port external-listener.ts from st/a2a-listen branch - Auto-start A2A HTTP server when Forever Mode + Sisyphus enabled - Add a2aPort config to SisyphusModeSettings (GEMINI.md frontmatter) - Bridge external messages into session and capture responses - Display A2A port in status bar alongside Sisyphus timer - Add ExternalMessage and A2AListenerStarted app events --- packages/cli/src/config/config.ts | 4 + packages/cli/src/external-listener.ts | 426 ++++++++++++++++++ packages/cli/src/gemini.tsx | 21 + packages/cli/src/ui/AppContainer.tsx | 63 +++ .../cli/src/ui/components/Composer.test.tsx | 1 + .../src/ui/components/StatusDisplay.test.tsx | 27 ++ .../cli/src/ui/components/StatusDisplay.tsx | 6 + .../cli/src/ui/contexts/UIStateContext.tsx | 1 + packages/cli/src/utils/events.ts | 4 + packages/core/src/config/config.ts | 1 + 10 files changed, 554 insertions(+) create mode 100644 packages/cli/src/external-listener.ts diff --git a/packages/cli/src/config/config.ts b/packages/cli/src/config/config.ts index dcdae8f449..5a2d100401 100755 --- a/packages/cli/src/config/config.ts +++ b/packages/cli/src/config/config.ts @@ -575,6 +575,10 @@ export async function loadCliConfig( typeof sisyphusSettings['prompt'] === 'string' ? sisyphusSettings['prompt'] : undefined, + a2aPort: + typeof sisyphusSettings['a2aPort'] === 'number' + ? sisyphusSettings['a2aPort'] + : undefined, }; } } diff --git a/packages/cli/src/external-listener.ts b/packages/cli/src/external-listener.ts new file mode 100644 index 0000000000..5262d78970 --- /dev/null +++ b/packages/cli/src/external-listener.ts @@ -0,0 +1,426 @@ +/** + * @license + * Copyright 2025 Google LLC + * SPDX-License-Identifier: Apache-2.0 + */ + +import http from 'node:http'; +import { writeFileSync, mkdirSync, unlinkSync } from 'node:fs'; +import { join } from 'node:path'; +import os from 'node:os'; +import crypto from 'node:crypto'; +import { appEvents, AppEvent } from './utils/events.js'; + +// --- A2A Task management --- + +interface A2AResponseMessage { + kind: 'message'; + role: 'agent'; + parts: Array<{ kind: 'text'; text: string }>; + messageId: string; +} + +interface A2ATask { + id: string; + contextId: string; + status: { + state: 'submitted' | 'working' | 'completed' | 'failed'; + timestamp: string; + message?: A2AResponseMessage; + }; +} + +const tasks = new Map(); + +const TASK_CLEANUP_DELAY_MS = 10 * 60 * 1000; // 10 minutes +const DEFAULT_BLOCKING_TIMEOUT_MS = 5 * 60 * 1000; // 5 minutes + +interface ResponseWaiter { + taskId: string; + resolve: (text: string) => void; +} + +const responseWaiters: ResponseWaiter[] = []; + +/** + * Called by AppContainer when streaming transitions from non-Idle to Idle. + * Resolves the oldest blocking waiter (FIFO) and completes its task. + */ +export function notifyResponse(responseText: string): void { + const waiter = responseWaiters.shift(); + if (!waiter) return; + + const task = tasks.get(waiter.taskId); + if (task) { + task.status = { + state: 'completed', + timestamp: new Date().toISOString(), + message: { + kind: 'message', + role: 'agent', + parts: [{ kind: 'text', text: responseText }], + messageId: crypto.randomUUID(), + }, + }; + scheduleTaskCleanup(task.id); + } + + waiter.resolve(responseText); +} + +/** + * Returns true if there are any in-flight tasks waiting for a response. + */ +export function hasPendingTasks(): boolean { + return responseWaiters.length > 0; +} + +/** + * Called when streaming starts (Idle -> non-Idle) to mark the oldest + * submitted task as "working". + */ +export function markTasksWorking(): void { + const waiter = responseWaiters[0]; + if (!waiter) return; + const task = tasks.get(waiter.taskId); + if (task && task.status.state === 'submitted') { + task.status = { + state: 'working', + timestamp: new Date().toISOString(), + }; + } +} + +function scheduleTaskCleanup(taskId: string): void { + setTimeout(() => { + tasks.delete(taskId); + }, TASK_CLEANUP_DELAY_MS); +} + +function createTask(): A2ATask { + const task: A2ATask = { + id: crypto.randomUUID(), + contextId: `session-${process.pid}`, + status: { + state: 'submitted', + timestamp: new Date().toISOString(), + }, + }; + tasks.set(task.id, task); + return task; +} + +function formatTaskResult(task: A2ATask): object { + return { + kind: 'task', + id: task.id, + contextId: task.contextId, + status: task.status, + }; +} + +// --- JSON-RPC helpers --- + +interface JsonRpcRequest { + jsonrpc?: string; + id?: string | number | null; + method?: string; + params?: Record; +} + +function jsonRpcSuccess(id: string | number | null, result: object): object { + return { jsonrpc: '2.0', id, result }; +} + +function jsonRpcError( + id: string | number | null, + code: number, + message: string, +): object { + return { jsonrpc: '2.0', id, error: { code, message } }; +} + +// --- HTTP utilities --- + +function getSessionsDir(): string { + return join(os.homedir(), '.gemini', 'sessions'); +} + +function getPortFilePath(): string { + return join(getSessionsDir(), `interactive-${process.pid}.port`); +} + +function buildAgentCard(port: number): object { + return { + name: 'Gemini CLI Interactive Session', + url: `http://localhost:${port}/`, + protocolVersion: '0.3.0', + provider: { organization: 'Google', url: 'https://google.com' }, + capabilities: { streaming: false, pushNotifications: false }, + defaultInputModes: ['text'], + defaultOutputModes: ['text'], + skills: [ + { + id: 'interactive_session', + name: 'Interactive Session', + description: 'Send messages to the live interactive Gemini CLI session', + }, + ], + }; +} + +interface A2AMessagePart { + kind?: string; + text?: string; +} + +function extractTextFromParts( + parts: A2AMessagePart[] | undefined, +): string | null { + if (!Array.isArray(parts)) { + return null; + } + const texts: string[] = []; + for (const part of parts) { + if (part.kind === 'text' && typeof part.text === 'string') { + texts.push(part.text); + } + } + return texts.length > 0 ? texts.join('\n') : null; +} + +function sendJson( + res: http.ServerResponse, + statusCode: number, + data: object, +): void { + const body = JSON.stringify(data); + res.writeHead(statusCode, { + 'Content-Type': 'application/json', + 'Content-Length': Buffer.byteLength(body), + }); + res.end(body); +} + +function readBody(req: http.IncomingMessage): Promise { + return new Promise((resolve, reject) => { + const chunks: Buffer[] = []; + let size = 0; + const maxSize = 1024 * 1024; // 1MB limit + req.on('data', (chunk: Buffer) => { + size += chunk.length; + if (size > maxSize) { + req.destroy(); + reject(new Error('Request body too large')); + return; + } + chunks.push(chunk); + }); + req.on('end', () => resolve(Buffer.concat(chunks).toString('utf-8'))); + req.on('error', reject); + }); +} + +// --- JSON-RPC request handlers --- + +function handleMessageSend( + rpcId: string | number | null, + params: Record, + res: http.ServerResponse, +): void { + const messageVal = params['message']; + const message = + messageVal && typeof messageVal === 'object' + ? (messageVal as { role?: string; parts?: A2AMessagePart[] }) + : undefined; + const text = extractTextFromParts(message?.parts); + if (!text) { + sendJson( + res, + 200, + jsonRpcError( + rpcId, + -32602, + 'Missing or empty text. Expected: params.message.parts with kind "text".', + ), + ); + return; + } + + const task = createTask(); + + // Inject message into the session + appEvents.emit(AppEvent.ExternalMessage, text); + + // Block until response (standard A2A message/send semantics) + const timer = setTimeout(() => { + const idx = responseWaiters.findIndex((w) => w.taskId === task.id); + if (idx !== -1) { + responseWaiters.splice(idx, 1); + } + task.status = { + state: 'failed', + timestamp: new Date().toISOString(), + }; + scheduleTaskCleanup(task.id); + sendJson(res, 200, jsonRpcError(rpcId, -32000, 'Request timed out')); + }, DEFAULT_BLOCKING_TIMEOUT_MS); + + responseWaiters.push({ + taskId: task.id, + resolve: () => { + clearTimeout(timer); + // Task is already updated in notifyResponse + const updatedTask = tasks.get(task.id); + sendJson( + res, + 200, + jsonRpcSuccess(rpcId, formatTaskResult(updatedTask ?? task)), + ); + }, + }); +} + +function handleTasksGet( + rpcId: string | number | null, + params: Record, + res: http.ServerResponse, +): void { + const taskId = params['id']; + if (typeof taskId !== 'string') { + sendJson( + res, + 200, + jsonRpcError(rpcId, -32602, 'Missing or invalid params.id'), + ); + return; + } + + const task = tasks.get(taskId); + if (!task) { + sendJson(res, 200, jsonRpcError(rpcId, -32001, 'Task not found')); + return; + } + + sendJson(res, 200, jsonRpcSuccess(rpcId, formatTaskResult(task))); +} + +// --- Server --- + +export interface ExternalListenerResult { + port: number; + cleanup: () => void; +} + +/** + * Start an embedded HTTP server that accepts A2A-format JSON-RPC messages + * and bridges them into the interactive session's message queue. + */ +export function startExternalListener(options?: { + port?: number; +}): Promise { + const port = options?.port ?? 0; + + return new Promise((resolve, reject) => { + const server = http.createServer( + (req: http.IncomingMessage, res: http.ServerResponse) => { + const url = new URL(req.url ?? '/', `http://localhost`); + + // GET /.well-known/agent-card.json + if ( + req.method === 'GET' && + url.pathname === '/.well-known/agent-card.json' + ) { + const address = server.address(); + const actualPort = + typeof address === 'object' && address ? address.port : port; + sendJson(res, 200, buildAgentCard(actualPort)); + return; + } + + // POST / — JSON-RPC 2.0 routing + if (req.method === 'POST' && url.pathname === '/') { + readBody(req) + .then((rawBody) => { + let parsed: JsonRpcRequest; + try { + // eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion + parsed = JSON.parse(rawBody) as JsonRpcRequest; + } catch { + sendJson( + res, + 200, + jsonRpcError(null, -32700, 'Parse error: invalid JSON'), + ); + return; + } + + const rpcId = parsed.id ?? null; + const method = parsed.method; + const params = parsed.params ?? {}; + + switch (method) { + case 'message/send': + handleMessageSend(rpcId, params, res); + break; + case 'tasks/get': + handleTasksGet(rpcId, params, res); + break; + default: + sendJson( + res, + 200, + jsonRpcError( + rpcId, + -32601, + `Method not found: ${method ?? '(none)'}`, + ), + ); + } + }) + .catch(() => { + sendJson( + res, + 200, + jsonRpcError(null, -32603, 'Failed to read request body'), + ); + }); + return; + } + + // 404 for everything else + sendJson(res, 404, { error: 'Not found' }); + }, + ); + + server.listen(port, '127.0.0.1', () => { + const address = server.address(); + const actualPort = + typeof address === 'object' && address ? address.port : port; + + // Write port file + try { + const sessionsDir = getSessionsDir(); + mkdirSync(sessionsDir, { recursive: true }); + writeFileSync(getPortFilePath(), String(actualPort), 'utf-8'); + } catch { + // Non-fatal: port file is a convenience, not a requirement + } + + const cleanup = () => { + server.close(); + try { + unlinkSync(getPortFilePath()); + } catch { + // Ignore: file may already be deleted + } + }; + + resolve({ port: actualPort, cleanup }); + }); + + server.on('error', (err) => { + reject(err); + }); + }); +} diff --git a/packages/cli/src/gemini.tsx b/packages/cli/src/gemini.tsx index 88f9f404cd..85a236cbc8 100644 --- a/packages/cli/src/gemini.tsx +++ b/packages/cli/src/gemini.tsx @@ -84,6 +84,7 @@ import { validateNonInteractiveAuth } from './validateNonInterActiveAuth.js'; import { checkForUpdates } from './ui/utils/updateCheck.js'; import { handleAutoUpdate } from './utils/handleAutoUpdate.js'; import { appEvents, AppEvent } from './utils/events.js'; +import { startExternalListener } from './external-listener.js'; import { SessionSelector } from './utils/sessionUtils.js'; import { SettingsContext } from './ui/contexts/SettingsContext.js'; import { MouseProvider } from './ui/contexts/MouseContext.js'; @@ -323,6 +324,26 @@ export async function startInteractiveUI( registerCleanup(() => instance.unmount()); registerCleanup(setupTtyCheck()); + + // Auto-start A2A HTTP listener in Forever Mode + const sisyphusMode = config.getSisyphusMode(); + if (config.getIsForeverMode() && sisyphusMode.enabled) { + const a2aPort = sisyphusMode.a2aPort ?? 0; + try { + const listener = await startExternalListener({ port: a2aPort }); + registerCleanup(listener.cleanup); + appEvents.emit(AppEvent.A2AListenerStarted, listener.port); + coreEvents.emitFeedback( + 'info', + `A2A endpoint listening on port ${listener.port}`, + ); + } catch (err) { + coreEvents.emitFeedback( + 'warning', + `Failed to start A2A listener: ${err instanceof Error ? err.message : String(err)}`, + ); + } + } } export async function main() { diff --git a/packages/cli/src/ui/AppContainer.tsx b/packages/cli/src/ui/AppContainer.tsx index 9fd0325bbd..a78cb16d0f 100644 --- a/packages/cli/src/ui/AppContainer.tsx +++ b/packages/cli/src/ui/AppContainer.tsx @@ -126,6 +126,11 @@ import { useFolderTrust } from './hooks/useFolderTrust.js'; import { useIdeTrustListener } from './hooks/useIdeTrustListener.js'; import { type IdeIntegrationNudgeResult } from './IdeIntegrationNudge.js'; import { appEvents, AppEvent, TransientMessageType } from '../utils/events.js'; +import { + notifyResponse, + hasPendingTasks, + markTasksWorking, +} from '../external-listener.js'; import { type UpdateObject } from './utils/updateCheck.js'; import { setUpdateHandler } from '../utils/handleAutoUpdate.js'; import { registerCleanup, runExitCleanup } from '../utils/cleanup.js'; @@ -235,6 +240,19 @@ export const AppContainer = (props: AppContainerProps) => { const [isOnboardingForeverMode, setIsOnboardingForeverMode] = useState( () => config.getIsForeverMode() && !config.getIsForeverModeConfigured(), ); + const [a2aListenerPort, setA2aListenerPort] = useState(null); + + // Listen for A2A listener startup to display port in status bar + useEffect(() => { + const handler = (port: number) => { + setA2aListenerPort(port); + }; + appEvents.on(AppEvent.A2AListenerStarted, handler); + return () => { + appEvents.off(AppEvent.A2AListenerStarted, handler); + }; + }, []); + const [forceRerenderKey, setForceRerenderKey] = useState(0); const [debugMessage, setDebugMessage] = useState(''); const [quittingMessages, setQuittingMessages] = useState< @@ -1204,6 +1222,49 @@ Logging in with Google... Restarting Gemini CLI to continue. isMcpReady, }); + // Bridge external messages from A2A HTTP listener to message queue + useEffect(() => { + const handler = (text: string) => { + addMessage(text); + }; + appEvents.on(AppEvent.ExternalMessage, handler); + return () => { + appEvents.off(AppEvent.ExternalMessage, handler); + }; + }, [addMessage]); + + // Track streaming state transitions for A2A response capture + const prevStreamingStateRef = useRef(streamingState); + + useEffect(() => { + const prev = prevStreamingStateRef.current; + prevStreamingStateRef.current = streamingState; + + // Mark tasks as "working" when streaming starts + if ( + prev === StreamingState.Idle && + streamingState !== StreamingState.Idle + ) { + markTasksWorking(); + } + + // Capture response when streaming ends + if ( + prev !== StreamingState.Idle && + streamingState === StreamingState.Idle && + hasPendingTasks() + ) { + const lastResponse = historyManager.history + .slice() + .reverse() + .find((item) => item.type === 'gemini'); + notifyResponse( + typeof lastResponse?.text === 'string' ? lastResponse.text : '', + ); + } + // eslint-disable-next-line react-hooks/exhaustive-deps + }, [streamingState]); + cancelHandlerRef.current = useCallback( (shouldRestorePrompt: boolean = true) => { const pendingHistoryItems = [ @@ -2317,6 +2378,7 @@ Logging in with Google... Restarting Gemini CLI to continue. ]), hintBuffer: '', sisyphusSecondsRemaining, + a2aListenerPort, }), [ isThemeDialogOpen, @@ -2441,6 +2503,7 @@ Logging in with Google... Restarting Gemini CLI to continue. newAgents, showIsExpandableHint, sisyphusSecondsRemaining, + a2aListenerPort, isOnboardingForeverMode, ], ); diff --git a/packages/cli/src/ui/components/Composer.test.tsx b/packages/cli/src/ui/components/Composer.test.tsx index c3d0db67e9..646fd0e583 100644 --- a/packages/cli/src/ui/components/Composer.test.tsx +++ b/packages/cli/src/ui/components/Composer.test.tsx @@ -209,6 +209,7 @@ const createMockUIState = (overrides: Partial = {}): UIState => validationRequest: null, }, sisyphusSecondsRemaining: null, + a2aListenerPort: null, ...overrides, }) as UIState; diff --git a/packages/cli/src/ui/components/StatusDisplay.test.tsx b/packages/cli/src/ui/components/StatusDisplay.test.tsx index 90f28b7436..b955a887fd 100644 --- a/packages/cli/src/ui/components/StatusDisplay.test.tsx +++ b/packages/cli/src/ui/components/StatusDisplay.test.tsx @@ -55,6 +55,7 @@ const createMockUIState = (overrides: UIStateOverrides = {}): UIState => buffer: { text: '' }, history: [{ id: 1, type: 'user', text: 'test' }], sisyphusSecondsRemaining: null, + a2aListenerPort: null, ...overrides, }) as UIState; @@ -183,4 +184,30 @@ describe('StatusDisplay', () => { expect(lastFrame()).toContain('✦ Resuming work in 01:05'); unmount(); }); + + it('renders A2A listener port when active', async () => { + const uiState = createMockUIState({ + a2aListenerPort: 8080, + }); + const { lastFrame, unmount } = await renderStatusDisplay( + { hideContextSummary: false }, + uiState, + ); + expect(lastFrame()).toContain('⚡ A2A :8080'); + unmount(); + }); + + it('renders both A2A port and Sisyphus timer together', async () => { + const uiState = createMockUIState({ + a2aListenerPort: 3000, + sisyphusSecondsRemaining: 120, // 02:00 + }); + const { lastFrame, unmount } = await renderStatusDisplay( + { hideContextSummary: false }, + uiState, + ); + expect(lastFrame()).toContain('⚡ A2A :3000'); + expect(lastFrame()).toContain('✦ Resuming work in 02:00'); + unmount(); + }); }); diff --git a/packages/cli/src/ui/components/StatusDisplay.tsx b/packages/cli/src/ui/components/StatusDisplay.tsx index ab44fccf6e..da777d3895 100644 --- a/packages/cli/src/ui/components/StatusDisplay.tsx +++ b/packages/cli/src/ui/components/StatusDisplay.tsx @@ -37,6 +37,12 @@ export const StatusDisplay: React.FC = ({ items.push(); } + if (uiState.a2aListenerPort !== null) { + items.push( + ⚡ A2A :{uiState.a2aListenerPort}, + ); + } + if (uiState.sisyphusSecondsRemaining !== null) { const mins = Math.floor(uiState.sisyphusSecondsRemaining / 60); const secs = uiState.sisyphusSecondsRemaining % 60; diff --git a/packages/cli/src/ui/contexts/UIStateContext.tsx b/packages/cli/src/ui/contexts/UIStateContext.tsx index df8a95caf8..537d3a130d 100644 --- a/packages/cli/src/ui/contexts/UIStateContext.tsx +++ b/packages/cli/src/ui/contexts/UIStateContext.tsx @@ -231,6 +231,7 @@ export interface UIState { type: TransientMessageType; } | null; sisyphusSecondsRemaining: number | null; + a2aListenerPort: number | null; } export const UIStateContext = createContext(null); diff --git a/packages/cli/src/utils/events.ts b/packages/cli/src/utils/events.ts index 8291528ac1..42340ee5c1 100644 --- a/packages/cli/src/utils/events.ts +++ b/packages/cli/src/utils/events.ts @@ -23,6 +23,8 @@ export enum AppEvent { PasteTimeout = 'paste-timeout', TerminalBackground = 'terminal-background', TransientMessage = 'transient-message', + ExternalMessage = 'external-message', + A2AListenerStarted = 'a2a-listener-started', } export interface AppEvents { @@ -32,6 +34,8 @@ export interface AppEvents { [AppEvent.PasteTimeout]: never[]; [AppEvent.TerminalBackground]: [string]; [AppEvent.TransientMessage]: [TransientMessagePayload]; + [AppEvent.ExternalMessage]: [string]; + [AppEvent.A2AListenerStarted]: [number]; } export const appEvents = new EventEmitter(); diff --git a/packages/core/src/config/config.ts b/packages/core/src/config/config.ts index 86f9bbaebe..24425635e4 100644 --- a/packages/core/src/config/config.ts +++ b/packages/core/src/config/config.ts @@ -239,6 +239,7 @@ export interface SisyphusModeSettings { enabled: boolean; idleTimeout?: number; prompt?: string; + a2aPort?: number; } export interface CustomTheme {