diff --git a/packages/a2a-server/package.json b/packages/a2a-server/package.json index 2a0d740a38..fe0a551338 100644 --- a/packages/a2a-server/package.json +++ b/packages/a2a-server/package.json @@ -26,6 +26,7 @@ ], "dependencies": { "@a2a-js/sdk": "^0.3.8", + "@google-cloud/pubsub": "^4.9.0", "@google-cloud/storage": "^7.16.0", "@google/gemini-cli-core": "file:../core", "express": "^5.1.0", diff --git a/packages/a2a-server/src/chat-bridge/bridge.ts b/packages/a2a-server/src/chat-bridge/bridge.ts index 855bd84861..2c812bf300 100644 --- a/packages/a2a-server/src/chat-bridge/bridge.ts +++ b/packages/a2a-server/src/chat-bridge/bridge.ts @@ -5,27 +5,28 @@ */ /** - * Minimal Google Chat bridge for the Gemini CLI forever mode. + * Google Chat bridge for the Gemini CLI forever mode via Cloud Pub/Sub. * * Architecture: - * Google Chat webhook -> this bridge (port 8081) -> external listener (port 3100) - * Response comes back -> bridge pushes to Google Chat via Chat API + * Google Chat → Pub/Sub topic → this bridge (pull subscriber) → agent (localhost:3100) + * Response comes back → bridge pushes to Google Chat via Chat API * - * One agent per space. Messages are forwarded as-is to the running + * One agent per VM. Messages are forwarded as-is to the running * gemini-cli --forever session via its JSON-RPC external listener. */ -import express from 'express'; -import { OAuth2Client } from 'google-auth-library'; +import http from 'node:http'; +import { PubSub } from '@google-cloud/pubsub'; import { ChatApiClient } from './chat-api-client.js'; import { logger } from '../utils/logger.js'; // --- Config from env vars --- -const BRIDGE_PORT = parseInt(process.env['BRIDGE_PORT'] ?? '8081', 10); const A2A_URL = process.env['A2A_URL'] ?? 'http://127.0.0.1:3100'; -const CHAT_PROJECT_NUMBER = process.env['CHAT_PROJECT_NUMBER']; -const CHAT_ISSUER = 'chat@system.gserviceaccount.com'; +const GOOGLE_CLOUD_PROJECT = process.env['GOOGLE_CLOUD_PROJECT'] ?? ''; +const PUBSUB_SUBSCRIPTION = + process.env['PUBSUB_SUBSCRIPTION'] ?? 'forever-agent-chat-sub'; +const HEALTH_PORT = parseInt(process.env['BRIDGE_PORT'] ?? '8081', 10); // --- Types --- @@ -50,56 +51,6 @@ interface JsonRpcResponse { error?: { code: number; message: string }; } -// --- Auth middleware --- - -function createAuthMiddleware(): ( - req: express.Request, - res: express.Response, - next: express.NextFunction, -) => void { - // On Cloud Run, IAM handles auth - if (process.env['K_SERVICE']) { - logger.info('[Bridge] Running on Cloud Run — auth delegated to IAM.'); - return (_req, _res, next) => next(); - } - - if (!CHAT_PROJECT_NUMBER) { - logger.warn( - '[Bridge] CHAT_PROJECT_NUMBER not set — JWT verification disabled.', - ); - return (_req, _res, next) => next(); - } - - const authClient = new OAuth2Client(); - - return (req, res, next) => { - const authHeader = req.headers['authorization']; - if (!authHeader || !authHeader.startsWith('Bearer ')) { - res.status(401).json({ error: 'Unauthorized' }); - return; - } - - authClient - .verifyIdToken({ - idToken: authHeader.substring(7), - audience: CHAT_PROJECT_NUMBER, - }) - .then((ticket) => { - const payload = ticket.getPayload(); - if (payload?.iss !== CHAT_ISSUER) { - res.status(403).json({ error: 'Forbidden: invalid issuer' }); - return; - } - next(); - }) - .catch((err: unknown) => { - const msg = err instanceof Error ? err.message : 'Unknown error'; - logger.warn(`[Bridge] JWT verification failed: ${msg}`); - res.status(401).json({ error: 'Unauthorized' }); - }); - }; -} - // --- Event normalization --- function isObj(v: unknown): v is Record { @@ -119,7 +70,7 @@ interface NormalizedEvent { } /** - * Extract the essentials from a Google Chat webhook event. + * Extract the essentials from a Google Chat event. * Handles both legacy and Workspace Add-ons format. */ function normalizeEvent(raw: Record): NormalizedEvent | null { @@ -218,70 +169,7 @@ async function sendToAgent(text: string): Promise { return texts.join('\n') || '(no response)'; } -// --- Express app --- - -export function createBridgeApp(): express.Express { - const app = express(); - app.use(express.json()); - - const chatApi = new ChatApiClient(); - const auth = createAuthMiddleware(); - - // Health check - app.get('/health', (_req, res) => { - res.json({ status: 'ok', a2aUrl: A2A_URL }); - }); - - // Google Chat webhook - app.post('/chat/webhook', auth, (req, res) => { - // eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion - const raw = req.body as Record; - const event = normalizeEvent(raw); - - if (!event) { - logger.warn( - `[Bridge] Unknown event format: ${Object.keys(raw).join(',')}`, - ); - res.json({}); - return; - } - - logger.info( - `[Bridge] ${event.type}: space=${event.spaceName} text="${event.text.substring(0, 100)}"`, - ); - - // Handle non-message events - if (event.type === 'ADDED_TO_SPACE') { - res.json({ - hostAppDataAction: { - chatDataAction: { - createMessageAction: { - message: { - text: 'Gemini CLI forever agent connected. Send me a task!', - }, - }, - }, - }, - }); - return; - } - - if (event.type !== 'MESSAGE' || !event.text) { - res.json({}); - return; - } - - // Immediately ack the webhook (30s timeout) - res.json({}); - - // Process async — send to agent, push response back via Chat API - processMessageAsync(chatApi, event).catch((err) => { - logger.error(`[Bridge] Async processing failed: ${err}`); - }); - }); - - return app; -} +// --- Async message processing --- async function processMessageAsync( chatApi: ChatApiClient, @@ -307,15 +195,96 @@ async function processMessageAsync( } } +// --- Pub/Sub subscriber --- + +function startSubscriber(): void { + if (!GOOGLE_CLOUD_PROJECT) { + logger.error( + '[Bridge] GOOGLE_CLOUD_PROJECT not set — cannot start Pub/Sub subscriber', + ); + process.exit(1); + } + + const pubsub = new PubSub({ projectId: GOOGLE_CLOUD_PROJECT }); + const subscription = pubsub.subscription(PUBSUB_SUBSCRIPTION); + const chatApi = new ChatApiClient(); + + logger.info( + `[Bridge] Subscribing to ${PUBSUB_SUBSCRIPTION} in project ${GOOGLE_CLOUD_PROJECT}`, + ); + logger.info(`[Bridge] Forwarding to agent at ${A2A_URL}`); + + subscription.on('message', (message) => { + try { + // eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion + const raw = JSON.parse(message.data.toString()) as Record< + string, + unknown + >; + message.ack(); + + const event = normalizeEvent(raw); + if (!event) { + logger.warn( + `[Bridge] Unknown event format: ${Object.keys(raw).join(',')}`, + ); + return; + } + + logger.info( + `[Bridge] ${event.type}: space=${event.spaceName} text="${event.text.substring(0, 100)}"`, + ); + + if (event.type === 'ADDED_TO_SPACE') { + chatApi + .sendMessage(event.spaceName, '', { + text: 'Gemini CLI forever agent connected. Send me a task!', + }) + .catch((err) => + logger.error(`[Bridge] Welcome message failed: ${err}`), + ); + return; + } + + if (event.type !== 'MESSAGE' || !event.text) return; + + processMessageAsync(chatApi, event).catch((err) => { + logger.error(`[Bridge] Async processing failed: ${err}`); + }); + } catch (err) { + const msg = err instanceof Error ? err.message : String(err); + logger.error(`[Bridge] Failed to parse Pub/Sub message: ${msg}`); + message.ack(); // ack to avoid redelivery of bad messages + } + }); + + subscription.on('error', (err) => { + logger.error(`[Bridge] Pub/Sub subscription error: ${err.message}`); + }); + + // Health check for systemd + http + .createServer((_req, res) => { + res.writeHead(200, { 'Content-Type': 'application/json' }); + res.end( + JSON.stringify({ + status: 'ok', + mode: 'pubsub', + subscription: PUBSUB_SUBSCRIPTION, + a2aUrl: A2A_URL, + }), + ); + }) + .listen(HEALTH_PORT, '127.0.0.1', () => { + logger.info(`[Bridge] Health check on port ${HEALTH_PORT}`); + }); +} + // --- Standalone entrypoint --- if ( process.argv[1]?.endsWith('bridge.js') || process.argv[1]?.endsWith('bridge.ts') ) { - const app = createBridgeApp(); - app.listen(BRIDGE_PORT, '0.0.0.0', () => { - logger.info(`[Bridge] Google Chat bridge listening on port ${BRIDGE_PORT}`); - logger.info(`[Bridge] Forwarding to agent at ${A2A_URL}`); - }); + startSubscriber(); } diff --git a/scripts/gce-startup.sh b/scripts/gce-startup.sh index 309a4099a0..6d29190a62 100644 --- a/scripts/gce-startup.sh +++ b/scripts/gce-startup.sh @@ -93,19 +93,17 @@ You are a forever-running autonomous agent accessible via Google Chat. Process incoming tasks, answer questions, and proactively work on improvements. GEMINIEOF -# Create systemd service for the chat bridge +# Create systemd service for the chat bridge (Pub/Sub mode) cat > /etc/systemd/system/chat-bridge.service << EOF [Unit] -Description=Google Chat Bridge +Description=Google Chat Bridge (Pub/Sub) After=network.target [Service] Type=simple -Environment=GOOGLE_API_KEY=${GEMINI_API_KEY} -Environment=CHAT_PROJECT_NUMBER=${CHAT_PROJECT_NUMBER} -Environment=A2A_PORT=3100 -Environment=BRIDGE_PORT=8081 Environment=A2A_URL=http://127.0.0.1:3100 +Environment=GOOGLE_CLOUD_PROJECT=adamfweidman-test +Environment=PUBSUB_SUBSCRIPTION=forever-agent-chat-sub Environment=GIT_TERMINAL_PROMPT=0 WorkingDirectory=${REPO_DIR} ExecStart=/usr/bin/node ${REPO_DIR}/packages/a2a-server/dist/src/chat-bridge/bridge.js