feat: integrate A2A listener into Forever Mode

- Port external-listener.ts from st/a2a-listen branch
- Auto-start A2A HTTP server when Forever Mode + Sisyphus enabled
- Add a2aPort config to SisyphusModeSettings (GEMINI.md frontmatter)
- Bridge external messages into session and capture responses
- Display A2A port in status bar alongside Sisyphus timer
- Add ExternalMessage and A2AListenerStarted app events
This commit is contained in:
Sandy Tao
2026-03-03 11:33:29 -08:00
parent 2ed06d69dd
commit 78ec69035c
10 changed files with 554 additions and 0 deletions

View File

@@ -575,6 +575,10 @@ export async function loadCliConfig(
typeof sisyphusSettings['prompt'] === 'string'
? sisyphusSettings['prompt']
: undefined,
a2aPort:
typeof sisyphusSettings['a2aPort'] === 'number'
? sisyphusSettings['a2aPort']
: undefined,
};
}
}

View File

@@ -0,0 +1,426 @@
/**
* @license
* Copyright 2025 Google LLC
* SPDX-License-Identifier: Apache-2.0
*/
import http from 'node:http';
import { writeFileSync, mkdirSync, unlinkSync } from 'node:fs';
import { join } from 'node:path';
import os from 'node:os';
import crypto from 'node:crypto';
import { appEvents, AppEvent } from './utils/events.js';
// --- A2A Task management ---
interface A2AResponseMessage {
kind: 'message';
role: 'agent';
parts: Array<{ kind: 'text'; text: string }>;
messageId: string;
}
interface A2ATask {
id: string;
contextId: string;
status: {
state: 'submitted' | 'working' | 'completed' | 'failed';
timestamp: string;
message?: A2AResponseMessage;
};
}
const tasks = new Map<string, A2ATask>();
const TASK_CLEANUP_DELAY_MS = 10 * 60 * 1000; // 10 minutes
const DEFAULT_BLOCKING_TIMEOUT_MS = 5 * 60 * 1000; // 5 minutes
interface ResponseWaiter {
taskId: string;
resolve: (text: string) => void;
}
const responseWaiters: ResponseWaiter[] = [];
/**
* Called by AppContainer when streaming transitions from non-Idle to Idle.
* Resolves the oldest blocking waiter (FIFO) and completes its task.
*/
export function notifyResponse(responseText: string): void {
const waiter = responseWaiters.shift();
if (!waiter) return;
const task = tasks.get(waiter.taskId);
if (task) {
task.status = {
state: 'completed',
timestamp: new Date().toISOString(),
message: {
kind: 'message',
role: 'agent',
parts: [{ kind: 'text', text: responseText }],
messageId: crypto.randomUUID(),
},
};
scheduleTaskCleanup(task.id);
}
waiter.resolve(responseText);
}
/**
* Returns true if there are any in-flight tasks waiting for a response.
*/
export function hasPendingTasks(): boolean {
return responseWaiters.length > 0;
}
/**
* Called when streaming starts (Idle -> non-Idle) to mark the oldest
* submitted task as "working".
*/
export function markTasksWorking(): void {
const waiter = responseWaiters[0];
if (!waiter) return;
const task = tasks.get(waiter.taskId);
if (task && task.status.state === 'submitted') {
task.status = {
state: 'working',
timestamp: new Date().toISOString(),
};
}
}
function scheduleTaskCleanup(taskId: string): void {
setTimeout(() => {
tasks.delete(taskId);
}, TASK_CLEANUP_DELAY_MS);
}
function createTask(): A2ATask {
const task: A2ATask = {
id: crypto.randomUUID(),
contextId: `session-${process.pid}`,
status: {
state: 'submitted',
timestamp: new Date().toISOString(),
},
};
tasks.set(task.id, task);
return task;
}
function formatTaskResult(task: A2ATask): object {
return {
kind: 'task',
id: task.id,
contextId: task.contextId,
status: task.status,
};
}
// --- JSON-RPC helpers ---
interface JsonRpcRequest {
jsonrpc?: string;
id?: string | number | null;
method?: string;
params?: Record<string, unknown>;
}
function jsonRpcSuccess(id: string | number | null, result: object): object {
return { jsonrpc: '2.0', id, result };
}
function jsonRpcError(
id: string | number | null,
code: number,
message: string,
): object {
return { jsonrpc: '2.0', id, error: { code, message } };
}
// --- HTTP utilities ---
function getSessionsDir(): string {
return join(os.homedir(), '.gemini', 'sessions');
}
function getPortFilePath(): string {
return join(getSessionsDir(), `interactive-${process.pid}.port`);
}
function buildAgentCard(port: number): object {
return {
name: 'Gemini CLI Interactive Session',
url: `http://localhost:${port}/`,
protocolVersion: '0.3.0',
provider: { organization: 'Google', url: 'https://google.com' },
capabilities: { streaming: false, pushNotifications: false },
defaultInputModes: ['text'],
defaultOutputModes: ['text'],
skills: [
{
id: 'interactive_session',
name: 'Interactive Session',
description: 'Send messages to the live interactive Gemini CLI session',
},
],
};
}
interface A2AMessagePart {
kind?: string;
text?: string;
}
function extractTextFromParts(
parts: A2AMessagePart[] | undefined,
): string | null {
if (!Array.isArray(parts)) {
return null;
}
const texts: string[] = [];
for (const part of parts) {
if (part.kind === 'text' && typeof part.text === 'string') {
texts.push(part.text);
}
}
return texts.length > 0 ? texts.join('\n') : null;
}
function sendJson(
res: http.ServerResponse,
statusCode: number,
data: object,
): void {
const body = JSON.stringify(data);
res.writeHead(statusCode, {
'Content-Type': 'application/json',
'Content-Length': Buffer.byteLength(body),
});
res.end(body);
}
function readBody(req: http.IncomingMessage): Promise<string> {
return new Promise((resolve, reject) => {
const chunks: Buffer[] = [];
let size = 0;
const maxSize = 1024 * 1024; // 1MB limit
req.on('data', (chunk: Buffer) => {
size += chunk.length;
if (size > maxSize) {
req.destroy();
reject(new Error('Request body too large'));
return;
}
chunks.push(chunk);
});
req.on('end', () => resolve(Buffer.concat(chunks).toString('utf-8')));
req.on('error', reject);
});
}
// --- JSON-RPC request handlers ---
function handleMessageSend(
rpcId: string | number | null,
params: Record<string, unknown>,
res: http.ServerResponse,
): void {
const messageVal = params['message'];
const message =
messageVal && typeof messageVal === 'object'
? (messageVal as { role?: string; parts?: A2AMessagePart[] })
: undefined;
const text = extractTextFromParts(message?.parts);
if (!text) {
sendJson(
res,
200,
jsonRpcError(
rpcId,
-32602,
'Missing or empty text. Expected: params.message.parts with kind "text".',
),
);
return;
}
const task = createTask();
// Inject message into the session
appEvents.emit(AppEvent.ExternalMessage, text);
// Block until response (standard A2A message/send semantics)
const timer = setTimeout(() => {
const idx = responseWaiters.findIndex((w) => w.taskId === task.id);
if (idx !== -1) {
responseWaiters.splice(idx, 1);
}
task.status = {
state: 'failed',
timestamp: new Date().toISOString(),
};
scheduleTaskCleanup(task.id);
sendJson(res, 200, jsonRpcError(rpcId, -32000, 'Request timed out'));
}, DEFAULT_BLOCKING_TIMEOUT_MS);
responseWaiters.push({
taskId: task.id,
resolve: () => {
clearTimeout(timer);
// Task is already updated in notifyResponse
const updatedTask = tasks.get(task.id);
sendJson(
res,
200,
jsonRpcSuccess(rpcId, formatTaskResult(updatedTask ?? task)),
);
},
});
}
function handleTasksGet(
rpcId: string | number | null,
params: Record<string, unknown>,
res: http.ServerResponse,
): void {
const taskId = params['id'];
if (typeof taskId !== 'string') {
sendJson(
res,
200,
jsonRpcError(rpcId, -32602, 'Missing or invalid params.id'),
);
return;
}
const task = tasks.get(taskId);
if (!task) {
sendJson(res, 200, jsonRpcError(rpcId, -32001, 'Task not found'));
return;
}
sendJson(res, 200, jsonRpcSuccess(rpcId, formatTaskResult(task)));
}
// --- Server ---
export interface ExternalListenerResult {
port: number;
cleanup: () => void;
}
/**
* Start an embedded HTTP server that accepts A2A-format JSON-RPC messages
* and bridges them into the interactive session's message queue.
*/
export function startExternalListener(options?: {
port?: number;
}): Promise<ExternalListenerResult> {
const port = options?.port ?? 0;
return new Promise((resolve, reject) => {
const server = http.createServer(
(req: http.IncomingMessage, res: http.ServerResponse) => {
const url = new URL(req.url ?? '/', `http://localhost`);
// GET /.well-known/agent-card.json
if (
req.method === 'GET' &&
url.pathname === '/.well-known/agent-card.json'
) {
const address = server.address();
const actualPort =
typeof address === 'object' && address ? address.port : port;
sendJson(res, 200, buildAgentCard(actualPort));
return;
}
// POST / — JSON-RPC 2.0 routing
if (req.method === 'POST' && url.pathname === '/') {
readBody(req)
.then((rawBody) => {
let parsed: JsonRpcRequest;
try {
// eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion
parsed = JSON.parse(rawBody) as JsonRpcRequest;
} catch {
sendJson(
res,
200,
jsonRpcError(null, -32700, 'Parse error: invalid JSON'),
);
return;
}
const rpcId = parsed.id ?? null;
const method = parsed.method;
const params = parsed.params ?? {};
switch (method) {
case 'message/send':
handleMessageSend(rpcId, params, res);
break;
case 'tasks/get':
handleTasksGet(rpcId, params, res);
break;
default:
sendJson(
res,
200,
jsonRpcError(
rpcId,
-32601,
`Method not found: ${method ?? '(none)'}`,
),
);
}
})
.catch(() => {
sendJson(
res,
200,
jsonRpcError(null, -32603, 'Failed to read request body'),
);
});
return;
}
// 404 for everything else
sendJson(res, 404, { error: 'Not found' });
},
);
server.listen(port, '127.0.0.1', () => {
const address = server.address();
const actualPort =
typeof address === 'object' && address ? address.port : port;
// Write port file
try {
const sessionsDir = getSessionsDir();
mkdirSync(sessionsDir, { recursive: true });
writeFileSync(getPortFilePath(), String(actualPort), 'utf-8');
} catch {
// Non-fatal: port file is a convenience, not a requirement
}
const cleanup = () => {
server.close();
try {
unlinkSync(getPortFilePath());
} catch {
// Ignore: file may already be deleted
}
};
resolve({ port: actualPort, cleanup });
});
server.on('error', (err) => {
reject(err);
});
});
}

View File

@@ -84,6 +84,7 @@ import { validateNonInteractiveAuth } from './validateNonInterActiveAuth.js';
import { checkForUpdates } from './ui/utils/updateCheck.js';
import { handleAutoUpdate } from './utils/handleAutoUpdate.js';
import { appEvents, AppEvent } from './utils/events.js';
import { startExternalListener } from './external-listener.js';
import { SessionSelector } from './utils/sessionUtils.js';
import { SettingsContext } from './ui/contexts/SettingsContext.js';
import { MouseProvider } from './ui/contexts/MouseContext.js';
@@ -323,6 +324,26 @@ export async function startInteractiveUI(
registerCleanup(() => instance.unmount());
registerCleanup(setupTtyCheck());
// Auto-start A2A HTTP listener in Forever Mode
const sisyphusMode = config.getSisyphusMode();
if (config.getIsForeverMode() && sisyphusMode.enabled) {
const a2aPort = sisyphusMode.a2aPort ?? 0;
try {
const listener = await startExternalListener({ port: a2aPort });
registerCleanup(listener.cleanup);
appEvents.emit(AppEvent.A2AListenerStarted, listener.port);
coreEvents.emitFeedback(
'info',
`A2A endpoint listening on port ${listener.port}`,
);
} catch (err) {
coreEvents.emitFeedback(
'warning',
`Failed to start A2A listener: ${err instanceof Error ? err.message : String(err)}`,
);
}
}
}
export async function main() {

View File

@@ -126,6 +126,11 @@ import { useFolderTrust } from './hooks/useFolderTrust.js';
import { useIdeTrustListener } from './hooks/useIdeTrustListener.js';
import { type IdeIntegrationNudgeResult } from './IdeIntegrationNudge.js';
import { appEvents, AppEvent, TransientMessageType } from '../utils/events.js';
import {
notifyResponse,
hasPendingTasks,
markTasksWorking,
} from '../external-listener.js';
import { type UpdateObject } from './utils/updateCheck.js';
import { setUpdateHandler } from '../utils/handleAutoUpdate.js';
import { registerCleanup, runExitCleanup } from '../utils/cleanup.js';
@@ -235,6 +240,19 @@ export const AppContainer = (props: AppContainerProps) => {
const [isOnboardingForeverMode, setIsOnboardingForeverMode] = useState(
() => config.getIsForeverMode() && !config.getIsForeverModeConfigured(),
);
const [a2aListenerPort, setA2aListenerPort] = useState<number | null>(null);
// Listen for A2A listener startup to display port in status bar
useEffect(() => {
const handler = (port: number) => {
setA2aListenerPort(port);
};
appEvents.on(AppEvent.A2AListenerStarted, handler);
return () => {
appEvents.off(AppEvent.A2AListenerStarted, handler);
};
}, []);
const [forceRerenderKey, setForceRerenderKey] = useState(0);
const [debugMessage, setDebugMessage] = useState<string>('');
const [quittingMessages, setQuittingMessages] = useState<
@@ -1204,6 +1222,49 @@ Logging in with Google... Restarting Gemini CLI to continue.
isMcpReady,
});
// Bridge external messages from A2A HTTP listener to message queue
useEffect(() => {
const handler = (text: string) => {
addMessage(text);
};
appEvents.on(AppEvent.ExternalMessage, handler);
return () => {
appEvents.off(AppEvent.ExternalMessage, handler);
};
}, [addMessage]);
// Track streaming state transitions for A2A response capture
const prevStreamingStateRef = useRef(streamingState);
useEffect(() => {
const prev = prevStreamingStateRef.current;
prevStreamingStateRef.current = streamingState;
// Mark tasks as "working" when streaming starts
if (
prev === StreamingState.Idle &&
streamingState !== StreamingState.Idle
) {
markTasksWorking();
}
// Capture response when streaming ends
if (
prev !== StreamingState.Idle &&
streamingState === StreamingState.Idle &&
hasPendingTasks()
) {
const lastResponse = historyManager.history
.slice()
.reverse()
.find((item) => item.type === 'gemini');
notifyResponse(
typeof lastResponse?.text === 'string' ? lastResponse.text : '',
);
}
// eslint-disable-next-line react-hooks/exhaustive-deps
}, [streamingState]);
cancelHandlerRef.current = useCallback(
(shouldRestorePrompt: boolean = true) => {
const pendingHistoryItems = [
@@ -2317,6 +2378,7 @@ Logging in with Google... Restarting Gemini CLI to continue.
]),
hintBuffer: '',
sisyphusSecondsRemaining,
a2aListenerPort,
}),
[
isThemeDialogOpen,
@@ -2441,6 +2503,7 @@ Logging in with Google... Restarting Gemini CLI to continue.
newAgents,
showIsExpandableHint,
sisyphusSecondsRemaining,
a2aListenerPort,
isOnboardingForeverMode,
],
);

View File

@@ -209,6 +209,7 @@ const createMockUIState = (overrides: Partial<UIState> = {}): UIState =>
validationRequest: null,
},
sisyphusSecondsRemaining: null,
a2aListenerPort: null,
...overrides,
}) as UIState;

View File

@@ -55,6 +55,7 @@ const createMockUIState = (overrides: UIStateOverrides = {}): UIState =>
buffer: { text: '' },
history: [{ id: 1, type: 'user', text: 'test' }],
sisyphusSecondsRemaining: null,
a2aListenerPort: null,
...overrides,
}) as UIState;
@@ -183,4 +184,30 @@ describe('StatusDisplay', () => {
expect(lastFrame()).toContain('✦ Resuming work in 01:05');
unmount();
});
it('renders A2A listener port when active', async () => {
const uiState = createMockUIState({
a2aListenerPort: 8080,
});
const { lastFrame, unmount } = await renderStatusDisplay(
{ hideContextSummary: false },
uiState,
);
expect(lastFrame()).toContain('⚡ A2A :8080');
unmount();
});
it('renders both A2A port and Sisyphus timer together', async () => {
const uiState = createMockUIState({
a2aListenerPort: 3000,
sisyphusSecondsRemaining: 120, // 02:00
});
const { lastFrame, unmount } = await renderStatusDisplay(
{ hideContextSummary: false },
uiState,
);
expect(lastFrame()).toContain('⚡ A2A :3000');
expect(lastFrame()).toContain('✦ Resuming work in 02:00');
unmount();
});
});

View File

@@ -37,6 +37,12 @@ export const StatusDisplay: React.FC<StatusDisplayProps> = ({
items.push(<HookStatusDisplay activeHooks={uiState.activeHooks} />);
}
if (uiState.a2aListenerPort !== null) {
items.push(
<Text color={theme.text.accent}> A2A :{uiState.a2aListenerPort}</Text>,
);
}
if (uiState.sisyphusSecondsRemaining !== null) {
const mins = Math.floor(uiState.sisyphusSecondsRemaining / 60);
const secs = uiState.sisyphusSecondsRemaining % 60;

View File

@@ -231,6 +231,7 @@ export interface UIState {
type: TransientMessageType;
} | null;
sisyphusSecondsRemaining: number | null;
a2aListenerPort: number | null;
}
export const UIStateContext = createContext<UIState | null>(null);

View File

@@ -23,6 +23,8 @@ export enum AppEvent {
PasteTimeout = 'paste-timeout',
TerminalBackground = 'terminal-background',
TransientMessage = 'transient-message',
ExternalMessage = 'external-message',
A2AListenerStarted = 'a2a-listener-started',
}
export interface AppEvents {
@@ -32,6 +34,8 @@ export interface AppEvents {
[AppEvent.PasteTimeout]: never[];
[AppEvent.TerminalBackground]: [string];
[AppEvent.TransientMessage]: [TransientMessagePayload];
[AppEvent.ExternalMessage]: [string];
[AppEvent.A2AListenerStarted]: [number];
}
export const appEvents = new EventEmitter<AppEvents>();