mirror of
https://github.com/google-gemini/gemini-cli.git
synced 2026-06-10 19:37:17 -07:00
listen
This commit is contained in:
@@ -668,11 +668,20 @@ describe('parseArguments', () => {
|
||||
expect(argv.isCommand).toBe(true);
|
||||
});
|
||||
|
||||
it('should correctly parse the --forever flag', async () => {
|
||||
it('should correctly parse the --forever flag and set default a2aPort to 0', async () => {
|
||||
process.argv = ['node', 'script.js', '--forever'];
|
||||
const settings = createTestMergedSettings({});
|
||||
const argv = await parseArguments(settings);
|
||||
expect(argv.forever).toBe(true);
|
||||
expect(argv.a2aPort).toBe(0);
|
||||
});
|
||||
|
||||
it('should not override explicit a2aPort when --forever is specified', async () => {
|
||||
process.argv = ['node', 'script.js', '--forever', '--a2a-port', '8080'];
|
||||
const settings = createTestMergedSettings({});
|
||||
const argv = await parseArguments(settings);
|
||||
expect(argv.forever).toBe(true);
|
||||
expect(argv.a2aPort).toBe(8080);
|
||||
});
|
||||
});
|
||||
|
||||
|
||||
@@ -103,6 +103,7 @@ export interface CliArgs {
|
||||
rawOutput: boolean | undefined;
|
||||
acceptRawOutputRisk: boolean | undefined;
|
||||
isCommand: boolean | undefined;
|
||||
a2aPort: number | undefined;
|
||||
}
|
||||
|
||||
export async function parseArguments(
|
||||
@@ -299,6 +300,12 @@ export async function parseArguments(
|
||||
.option('accept-raw-output-risk', {
|
||||
type: 'boolean',
|
||||
description: 'Suppress the security warning when using --raw-output.',
|
||||
})
|
||||
.option('a2a-port', {
|
||||
type: 'number',
|
||||
nargs: 1,
|
||||
description:
|
||||
'Enable the embedded A2A HTTP listener on the specified port (0 for random). Implies --a2a enabled.',
|
||||
}),
|
||||
)
|
||||
// Register MCP subcommands
|
||||
@@ -400,8 +407,15 @@ export async function parseArguments(
|
||||
(result as Record<string, unknown>)['query'] = q || undefined;
|
||||
(result as Record<string, unknown>)['startupMessages'] = startupMessages;
|
||||
|
||||
// The import format is now only controlled by settings.memoryImportFormat
|
||||
// We no longer accept it as a CLI argument
|
||||
// Enable A2A listener by default in Forever Mode
|
||||
if (
|
||||
result['forever'] &&
|
||||
result['a2aPort'] === undefined &&
|
||||
result['a2a-port'] === undefined
|
||||
) {
|
||||
(result as Record<string, unknown>)['a2aPort'] = 0;
|
||||
}
|
||||
// The import format is now only controlled by settings.memoryImportFormat // We no longer accept it as a CLI argument
|
||||
// eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion
|
||||
return result as unknown as CliArgs;
|
||||
}
|
||||
|
||||
@@ -0,0 +1,426 @@
|
||||
/**
|
||||
* @license
|
||||
* Copyright 2025 Google LLC
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
import http from 'node:http';
|
||||
import { writeFileSync, mkdirSync, unlinkSync } from 'node:fs';
|
||||
import { join } from 'node:path';
|
||||
import os from 'node:os';
|
||||
import crypto from 'node:crypto';
|
||||
import { appEvents, AppEvent } from './utils/events.js';
|
||||
|
||||
// --- A2A Task management ---
|
||||
|
||||
interface A2AResponseMessage {
|
||||
kind: 'message';
|
||||
role: 'agent';
|
||||
parts: Array<{ kind: 'text'; text: string }>;
|
||||
messageId: string;
|
||||
}
|
||||
|
||||
interface A2ATask {
|
||||
id: string;
|
||||
contextId: string;
|
||||
status: {
|
||||
state: 'submitted' | 'working' | 'completed' | 'failed';
|
||||
timestamp: string;
|
||||
message?: A2AResponseMessage;
|
||||
};
|
||||
}
|
||||
|
||||
const tasks = new Map<string, A2ATask>();
|
||||
|
||||
const TASK_CLEANUP_DELAY_MS = 10 * 60 * 1000; // 10 minutes
|
||||
const DEFAULT_BLOCKING_TIMEOUT_MS = 5 * 60 * 1000; // 5 minutes
|
||||
|
||||
interface ResponseWaiter {
|
||||
taskId: string;
|
||||
resolve: (text: string) => void;
|
||||
}
|
||||
|
||||
const responseWaiters: ResponseWaiter[] = [];
|
||||
|
||||
/**
|
||||
* Called by AppContainer when streaming transitions from non-Idle to Idle.
|
||||
* Resolves the oldest blocking waiter (FIFO) and completes its task.
|
||||
*/
|
||||
export function notifyResponse(responseText: string): void {
|
||||
const waiter = responseWaiters.shift();
|
||||
if (!waiter) return;
|
||||
|
||||
const task = tasks.get(waiter.taskId);
|
||||
if (task) {
|
||||
task.status = {
|
||||
state: 'completed',
|
||||
timestamp: new Date().toISOString(),
|
||||
message: {
|
||||
kind: 'message',
|
||||
role: 'agent',
|
||||
parts: [{ kind: 'text', text: responseText }],
|
||||
messageId: crypto.randomUUID(),
|
||||
},
|
||||
};
|
||||
scheduleTaskCleanup(task.id);
|
||||
}
|
||||
|
||||
waiter.resolve(responseText);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns true if there are any in-flight tasks waiting for a response.
|
||||
*/
|
||||
export function hasPendingTasks(): boolean {
|
||||
return responseWaiters.length > 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Called when streaming starts (Idle -> non-Idle) to mark the oldest
|
||||
* submitted task as "working".
|
||||
*/
|
||||
export function markTasksWorking(): void {
|
||||
const waiter = responseWaiters[0];
|
||||
if (!waiter) return;
|
||||
const task = tasks.get(waiter.taskId);
|
||||
if (task && task.status.state === 'submitted') {
|
||||
task.status = {
|
||||
state: 'working',
|
||||
timestamp: new Date().toISOString(),
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
function scheduleTaskCleanup(taskId: string): void {
|
||||
setTimeout(() => {
|
||||
tasks.delete(taskId);
|
||||
}, TASK_CLEANUP_DELAY_MS);
|
||||
}
|
||||
|
||||
function createTask(): A2ATask {
|
||||
const task: A2ATask = {
|
||||
id: crypto.randomUUID(),
|
||||
contextId: `session-${process.pid}`,
|
||||
status: {
|
||||
state: 'submitted',
|
||||
timestamp: new Date().toISOString(),
|
||||
},
|
||||
};
|
||||
tasks.set(task.id, task);
|
||||
return task;
|
||||
}
|
||||
|
||||
function formatTaskResult(task: A2ATask): object {
|
||||
return {
|
||||
kind: 'task',
|
||||
id: task.id,
|
||||
contextId: task.contextId,
|
||||
status: task.status,
|
||||
};
|
||||
}
|
||||
|
||||
// --- JSON-RPC helpers ---
|
||||
|
||||
interface JsonRpcRequest {
|
||||
jsonrpc?: string;
|
||||
id?: string | number | null;
|
||||
method?: string;
|
||||
params?: Record<string, unknown>;
|
||||
}
|
||||
|
||||
function jsonRpcSuccess(id: string | number | null, result: object): object {
|
||||
return { jsonrpc: '2.0', id, result };
|
||||
}
|
||||
|
||||
function jsonRpcError(
|
||||
id: string | number | null,
|
||||
code: number,
|
||||
message: string,
|
||||
): object {
|
||||
return { jsonrpc: '2.0', id, error: { code, message } };
|
||||
}
|
||||
|
||||
// --- HTTP utilities ---
|
||||
|
||||
function getSessionsDir(): string {
|
||||
return join(os.homedir(), '.gemini', 'sessions');
|
||||
}
|
||||
|
||||
function getPortFilePath(): string {
|
||||
return join(getSessionsDir(), `interactive-${process.pid}.port`);
|
||||
}
|
||||
|
||||
function buildAgentCard(port: number): object {
|
||||
return {
|
||||
name: 'Gemini CLI Interactive Session',
|
||||
url: `http://localhost:${port}/`,
|
||||
protocolVersion: '0.3.0',
|
||||
provider: { organization: 'Google', url: 'https://google.com' },
|
||||
capabilities: { streaming: false, pushNotifications: false },
|
||||
defaultInputModes: ['text'],
|
||||
defaultOutputModes: ['text'],
|
||||
skills: [
|
||||
{
|
||||
id: 'interactive_session',
|
||||
name: 'Interactive Session',
|
||||
description: 'Send messages to the live interactive Gemini CLI session',
|
||||
},
|
||||
],
|
||||
};
|
||||
}
|
||||
|
||||
interface A2AMessagePart {
|
||||
kind?: string;
|
||||
text?: string;
|
||||
}
|
||||
|
||||
function extractTextFromParts(
|
||||
parts: A2AMessagePart[] | undefined,
|
||||
): string | null {
|
||||
if (!Array.isArray(parts)) {
|
||||
return null;
|
||||
}
|
||||
const texts: string[] = [];
|
||||
for (const part of parts) {
|
||||
if (part.kind === 'text' && typeof part.text === 'string') {
|
||||
texts.push(part.text);
|
||||
}
|
||||
}
|
||||
return texts.length > 0 ? texts.join('\n') : null;
|
||||
}
|
||||
|
||||
function sendJson(
|
||||
res: http.ServerResponse,
|
||||
statusCode: number,
|
||||
data: object,
|
||||
): void {
|
||||
const body = JSON.stringify(data);
|
||||
res.writeHead(statusCode, {
|
||||
'Content-Type': 'application/json',
|
||||
'Content-Length': Buffer.byteLength(body),
|
||||
});
|
||||
res.end(body);
|
||||
}
|
||||
|
||||
function readBody(req: http.IncomingMessage): Promise<string> {
|
||||
return new Promise((resolve, reject) => {
|
||||
const chunks: Buffer[] = [];
|
||||
let size = 0;
|
||||
const maxSize = 1024 * 1024; // 1MB limit
|
||||
req.on('data', (chunk: Buffer) => {
|
||||
size += chunk.length;
|
||||
if (size > maxSize) {
|
||||
req.destroy();
|
||||
reject(new Error('Request body too large'));
|
||||
return;
|
||||
}
|
||||
chunks.push(chunk);
|
||||
});
|
||||
req.on('end', () => resolve(Buffer.concat(chunks).toString('utf-8')));
|
||||
req.on('error', reject);
|
||||
});
|
||||
}
|
||||
|
||||
// --- JSON-RPC request handlers ---
|
||||
|
||||
function handleMessageSend(
|
||||
rpcId: string | number | null,
|
||||
params: Record<string, unknown>,
|
||||
res: http.ServerResponse,
|
||||
): void {
|
||||
const messageVal = params['message'];
|
||||
const message =
|
||||
messageVal && typeof messageVal === 'object'
|
||||
? (messageVal as { role?: string; parts?: A2AMessagePart[] })
|
||||
: undefined;
|
||||
const text = extractTextFromParts(message?.parts);
|
||||
if (!text) {
|
||||
sendJson(
|
||||
res,
|
||||
200,
|
||||
jsonRpcError(
|
||||
rpcId,
|
||||
-32602,
|
||||
'Missing or empty text. Expected: params.message.parts with kind "text".',
|
||||
),
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
const task = createTask();
|
||||
|
||||
// Inject message into the session
|
||||
appEvents.emit(AppEvent.ExternalMessage, text);
|
||||
|
||||
// Block until response (standard A2A message/send semantics)
|
||||
const timer = setTimeout(() => {
|
||||
const idx = responseWaiters.findIndex((w) => w.taskId === task.id);
|
||||
if (idx !== -1) {
|
||||
responseWaiters.splice(idx, 1);
|
||||
}
|
||||
task.status = {
|
||||
state: 'failed',
|
||||
timestamp: new Date().toISOString(),
|
||||
};
|
||||
scheduleTaskCleanup(task.id);
|
||||
sendJson(res, 200, jsonRpcError(rpcId, -32000, 'Request timed out'));
|
||||
}, DEFAULT_BLOCKING_TIMEOUT_MS);
|
||||
|
||||
responseWaiters.push({
|
||||
taskId: task.id,
|
||||
resolve: () => {
|
||||
clearTimeout(timer);
|
||||
// Task is already updated in notifyResponse
|
||||
const updatedTask = tasks.get(task.id);
|
||||
sendJson(
|
||||
res,
|
||||
200,
|
||||
jsonRpcSuccess(rpcId, formatTaskResult(updatedTask ?? task)),
|
||||
);
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
function handleTasksGet(
|
||||
rpcId: string | number | null,
|
||||
params: Record<string, unknown>,
|
||||
res: http.ServerResponse,
|
||||
): void {
|
||||
const taskId = params['id'];
|
||||
if (typeof taskId !== 'string') {
|
||||
sendJson(
|
||||
res,
|
||||
200,
|
||||
jsonRpcError(rpcId, -32602, 'Missing or invalid params.id'),
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
const task = tasks.get(taskId);
|
||||
if (!task) {
|
||||
sendJson(res, 200, jsonRpcError(rpcId, -32001, 'Task not found'));
|
||||
return;
|
||||
}
|
||||
|
||||
sendJson(res, 200, jsonRpcSuccess(rpcId, formatTaskResult(task)));
|
||||
}
|
||||
|
||||
// --- Server ---
|
||||
|
||||
export interface ExternalListenerResult {
|
||||
port: number;
|
||||
cleanup: () => void;
|
||||
}
|
||||
|
||||
/**
|
||||
* Start an embedded HTTP server that accepts A2A-format JSON-RPC messages
|
||||
* and bridges them into the interactive session's message queue.
|
||||
*/
|
||||
export function startExternalListener(options?: {
|
||||
port?: number;
|
||||
}): Promise<ExternalListenerResult> {
|
||||
const port = options?.port ?? 0;
|
||||
|
||||
return new Promise((resolve, reject) => {
|
||||
const server = http.createServer(
|
||||
(req: http.IncomingMessage, res: http.ServerResponse) => {
|
||||
const url = new URL(req.url ?? '/', `http://localhost`);
|
||||
|
||||
// GET /.well-known/agent-card.json
|
||||
if (
|
||||
req.method === 'GET' &&
|
||||
url.pathname === '/.well-known/agent-card.json'
|
||||
) {
|
||||
const address = server.address();
|
||||
const actualPort =
|
||||
typeof address === 'object' && address ? address.port : port;
|
||||
sendJson(res, 200, buildAgentCard(actualPort));
|
||||
return;
|
||||
}
|
||||
|
||||
// POST / — JSON-RPC 2.0 routing
|
||||
if (req.method === 'POST' && url.pathname === '/') {
|
||||
readBody(req)
|
||||
.then((rawBody) => {
|
||||
let parsed: JsonRpcRequest;
|
||||
try {
|
||||
// eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion
|
||||
parsed = JSON.parse(rawBody) as JsonRpcRequest;
|
||||
} catch {
|
||||
sendJson(
|
||||
res,
|
||||
200,
|
||||
jsonRpcError(null, -32700, 'Parse error: invalid JSON'),
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
const rpcId = parsed.id ?? null;
|
||||
const method = parsed.method;
|
||||
const params = parsed.params ?? {};
|
||||
|
||||
switch (method) {
|
||||
case 'message/send':
|
||||
handleMessageSend(rpcId, params, res);
|
||||
break;
|
||||
case 'tasks/get':
|
||||
handleTasksGet(rpcId, params, res);
|
||||
break;
|
||||
default:
|
||||
sendJson(
|
||||
res,
|
||||
200,
|
||||
jsonRpcError(
|
||||
rpcId,
|
||||
-32601,
|
||||
`Method not found: ${method ?? '(none)'}`,
|
||||
),
|
||||
);
|
||||
}
|
||||
})
|
||||
.catch(() => {
|
||||
sendJson(
|
||||
res,
|
||||
200,
|
||||
jsonRpcError(null, -32603, 'Failed to read request body'),
|
||||
);
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
// 404 for everything else
|
||||
sendJson(res, 404, { error: 'Not found' });
|
||||
},
|
||||
);
|
||||
|
||||
server.listen(port, '127.0.0.1', () => {
|
||||
const address = server.address();
|
||||
const actualPort =
|
||||
typeof address === 'object' && address ? address.port : port;
|
||||
|
||||
// Write port file
|
||||
try {
|
||||
const sessionsDir = getSessionsDir();
|
||||
mkdirSync(sessionsDir, { recursive: true });
|
||||
writeFileSync(getPortFilePath(), String(actualPort), 'utf-8');
|
||||
} catch {
|
||||
// Non-fatal: port file is a convenience, not a requirement
|
||||
}
|
||||
|
||||
const cleanup = () => {
|
||||
server.close();
|
||||
try {
|
||||
unlinkSync(getPortFilePath());
|
||||
} catch {
|
||||
// Ignore: file may already be deleted
|
||||
}
|
||||
};
|
||||
|
||||
resolve({ port: actualPort, cleanup });
|
||||
});
|
||||
|
||||
server.on('error', (err) => {
|
||||
reject(err);
|
||||
});
|
||||
});
|
||||
}
|
||||
@@ -499,6 +499,7 @@ describe('gemini.tsx main function kitty protocol', () => {
|
||||
rawOutput: undefined,
|
||||
acceptRawOutputRisk: undefined,
|
||||
isCommand: undefined,
|
||||
a2aPort: undefined,
|
||||
});
|
||||
|
||||
await act(async () => {
|
||||
|
||||
@@ -82,6 +82,7 @@ import { validateNonInteractiveAuth } from './validateNonInterActiveAuth.js';
|
||||
import { checkForUpdates } from './ui/utils/updateCheck.js';
|
||||
import { handleAutoUpdate } from './utils/handleAutoUpdate.js';
|
||||
import { appEvents, AppEvent } from './utils/events.js';
|
||||
import { startExternalListener } from './external-listener.js';
|
||||
import { SessionSelector } from './utils/sessionUtils.js';
|
||||
import { SettingsContext } from './ui/contexts/SettingsContext.js';
|
||||
import { MouseProvider } from './ui/contexts/MouseContext.js';
|
||||
@@ -188,6 +189,7 @@ export async function startInteractiveUI(
|
||||
workspaceRoot: string = process.cwd(),
|
||||
resumedSessionData: ResumedSessionData | undefined,
|
||||
initializationResult: InitializationResult,
|
||||
a2aPort?: number,
|
||||
) {
|
||||
// Never enter Ink alternate buffer mode when screen reader mode is enabled
|
||||
// as there is no benefit of alternate buffer mode when using a screen reader
|
||||
@@ -319,6 +321,23 @@ export async function startInteractiveUI(
|
||||
});
|
||||
|
||||
registerCleanup(() => instance.unmount());
|
||||
|
||||
// Start embedded A2A HTTP listener if enabled via --a2a-port
|
||||
if (a2aPort !== undefined) {
|
||||
try {
|
||||
const listener = await startExternalListener({ port: a2aPort });
|
||||
registerCleanup(listener.cleanup);
|
||||
coreEvents.emitFeedback(
|
||||
'info',
|
||||
`A2A endpoint listening on port ${listener.port}`,
|
||||
);
|
||||
} catch (err) {
|
||||
coreEvents.emitFeedback(
|
||||
'warning',
|
||||
`Failed to start A2A listener: ${err instanceof Error ? err.message : String(err)}`,
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
export async function main() {
|
||||
@@ -722,6 +741,7 @@ export async function main() {
|
||||
process.cwd(),
|
||||
resumedSessionData,
|
||||
initializationResult,
|
||||
argv.a2aPort,
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -123,6 +123,11 @@ import { useFolderTrust } from './hooks/useFolderTrust.js';
|
||||
import { useIdeTrustListener } from './hooks/useIdeTrustListener.js';
|
||||
import { type IdeIntegrationNudgeResult } from './IdeIntegrationNudge.js';
|
||||
import { appEvents, AppEvent, TransientMessageType } from '../utils/events.js';
|
||||
import {
|
||||
notifyResponse,
|
||||
hasPendingTasks,
|
||||
markTasksWorking,
|
||||
} from '../external-listener.js';
|
||||
import { type UpdateObject } from './utils/updateCheck.js';
|
||||
import { setUpdateHandler } from '../utils/handleAutoUpdate.js';
|
||||
import { registerCleanup, runExitCleanup } from '../utils/cleanup.js';
|
||||
@@ -1199,6 +1204,49 @@ Logging in with Google... Restarting Gemini CLI to continue.
|
||||
isMcpReady,
|
||||
});
|
||||
|
||||
// Bridge external messages from A2A HTTP listener to message queue
|
||||
useEffect(() => {
|
||||
const handler = (text: string) => {
|
||||
addMessage(text);
|
||||
};
|
||||
appEvents.on(AppEvent.ExternalMessage, handler);
|
||||
return () => {
|
||||
appEvents.off(AppEvent.ExternalMessage, handler);
|
||||
};
|
||||
}, [addMessage]);
|
||||
|
||||
// Track streaming state transitions for A2A response capture
|
||||
const prevStreamingStateRef = useRef(streamingState);
|
||||
|
||||
useEffect(() => {
|
||||
const prev = prevStreamingStateRef.current;
|
||||
prevStreamingStateRef.current = streamingState;
|
||||
|
||||
// Mark tasks as "working" when streaming starts
|
||||
if (
|
||||
prev === StreamingState.Idle &&
|
||||
streamingState !== StreamingState.Idle
|
||||
) {
|
||||
markTasksWorking();
|
||||
}
|
||||
|
||||
// Capture response when streaming ends
|
||||
if (
|
||||
prev !== StreamingState.Idle &&
|
||||
streamingState === StreamingState.Idle &&
|
||||
hasPendingTasks()
|
||||
) {
|
||||
const lastResponse = historyManager.history
|
||||
.slice()
|
||||
.reverse()
|
||||
.find((item) => item.type === 'gemini');
|
||||
notifyResponse(
|
||||
typeof lastResponse?.text === 'string' ? lastResponse.text : '',
|
||||
);
|
||||
}
|
||||
// eslint-disable-next-line react-hooks/exhaustive-deps
|
||||
}, [streamingState]);
|
||||
|
||||
cancelHandlerRef.current = useCallback(
|
||||
(shouldRestorePrompt: boolean = true) => {
|
||||
const pendingHistoryItems = [
|
||||
|
||||
@@ -23,6 +23,7 @@ export enum AppEvent {
|
||||
PasteTimeout = 'paste-timeout',
|
||||
TerminalBackground = 'terminal-background',
|
||||
TransientMessage = 'transient-message',
|
||||
ExternalMessage = 'external-message',
|
||||
}
|
||||
|
||||
export interface AppEvents {
|
||||
@@ -32,6 +33,7 @@ export interface AppEvents {
|
||||
[AppEvent.PasteTimeout]: never[];
|
||||
[AppEvent.TerminalBackground]: [string];
|
||||
[AppEvent.TransientMessage]: [TransientMessagePayload];
|
||||
[AppEvent.ExternalMessage]: [string];
|
||||
}
|
||||
|
||||
export const appEvents = new EventEmitter<AppEvents>();
|
||||
|
||||
Reference in New Issue
Block a user