mirror of
https://github.com/google-gemini/gemini-cli.git
synced 2026-05-24 10:42:37 -07:00
feat: switch chat bridge from HTTP webhook to Pub/Sub
Google Chat publishes to Pub/Sub topic, bridge pulls messages. Eliminates all inbound firewall/networking issues on GCE VM.
This commit is contained in:
@@ -26,6 +26,7 @@
|
||||
],
|
||||
"dependencies": {
|
||||
"@a2a-js/sdk": "^0.3.8",
|
||||
"@google-cloud/pubsub": "^4.9.0",
|
||||
"@google-cloud/storage": "^7.16.0",
|
||||
"@google/gemini-cli-core": "file:../core",
|
||||
"express": "^5.1.0",
|
||||
|
||||
@@ -5,27 +5,28 @@
|
||||
*/
|
||||
|
||||
/**
|
||||
* Minimal Google Chat bridge for the Gemini CLI forever mode.
|
||||
* Google Chat bridge for the Gemini CLI forever mode via Cloud Pub/Sub.
|
||||
*
|
||||
* Architecture:
|
||||
* Google Chat webhook -> this bridge (port 8081) -> external listener (port 3100)
|
||||
* Response comes back -> bridge pushes to Google Chat via Chat API
|
||||
* Google Chat → Pub/Sub topic → this bridge (pull subscriber) → agent (localhost:3100)
|
||||
* Response comes back → bridge pushes to Google Chat via Chat API
|
||||
*
|
||||
* One agent per space. Messages are forwarded as-is to the running
|
||||
* One agent per VM. Messages are forwarded as-is to the running
|
||||
* gemini-cli --forever session via its JSON-RPC external listener.
|
||||
*/
|
||||
|
||||
import express from 'express';
|
||||
import { OAuth2Client } from 'google-auth-library';
|
||||
import http from 'node:http';
|
||||
import { PubSub } from '@google-cloud/pubsub';
|
||||
import { ChatApiClient } from './chat-api-client.js';
|
||||
import { logger } from '../utils/logger.js';
|
||||
|
||||
// --- Config from env vars ---
|
||||
|
||||
const BRIDGE_PORT = parseInt(process.env['BRIDGE_PORT'] ?? '8081', 10);
|
||||
const A2A_URL = process.env['A2A_URL'] ?? 'http://127.0.0.1:3100';
|
||||
const CHAT_PROJECT_NUMBER = process.env['CHAT_PROJECT_NUMBER'];
|
||||
const CHAT_ISSUER = 'chat@system.gserviceaccount.com';
|
||||
const GOOGLE_CLOUD_PROJECT = process.env['GOOGLE_CLOUD_PROJECT'] ?? '';
|
||||
const PUBSUB_SUBSCRIPTION =
|
||||
process.env['PUBSUB_SUBSCRIPTION'] ?? 'forever-agent-chat-sub';
|
||||
const HEALTH_PORT = parseInt(process.env['BRIDGE_PORT'] ?? '8081', 10);
|
||||
|
||||
// --- Types ---
|
||||
|
||||
@@ -50,56 +51,6 @@ interface JsonRpcResponse {
|
||||
error?: { code: number; message: string };
|
||||
}
|
||||
|
||||
// --- Auth middleware ---
|
||||
|
||||
function createAuthMiddleware(): (
|
||||
req: express.Request,
|
||||
res: express.Response,
|
||||
next: express.NextFunction,
|
||||
) => void {
|
||||
// On Cloud Run, IAM handles auth
|
||||
if (process.env['K_SERVICE']) {
|
||||
logger.info('[Bridge] Running on Cloud Run — auth delegated to IAM.');
|
||||
return (_req, _res, next) => next();
|
||||
}
|
||||
|
||||
if (!CHAT_PROJECT_NUMBER) {
|
||||
logger.warn(
|
||||
'[Bridge] CHAT_PROJECT_NUMBER not set — JWT verification disabled.',
|
||||
);
|
||||
return (_req, _res, next) => next();
|
||||
}
|
||||
|
||||
const authClient = new OAuth2Client();
|
||||
|
||||
return (req, res, next) => {
|
||||
const authHeader = req.headers['authorization'];
|
||||
if (!authHeader || !authHeader.startsWith('Bearer ')) {
|
||||
res.status(401).json({ error: 'Unauthorized' });
|
||||
return;
|
||||
}
|
||||
|
||||
authClient
|
||||
.verifyIdToken({
|
||||
idToken: authHeader.substring(7),
|
||||
audience: CHAT_PROJECT_NUMBER,
|
||||
})
|
||||
.then((ticket) => {
|
||||
const payload = ticket.getPayload();
|
||||
if (payload?.iss !== CHAT_ISSUER) {
|
||||
res.status(403).json({ error: 'Forbidden: invalid issuer' });
|
||||
return;
|
||||
}
|
||||
next();
|
||||
})
|
||||
.catch((err: unknown) => {
|
||||
const msg = err instanceof Error ? err.message : 'Unknown error';
|
||||
logger.warn(`[Bridge] JWT verification failed: ${msg}`);
|
||||
res.status(401).json({ error: 'Unauthorized' });
|
||||
});
|
||||
};
|
||||
}
|
||||
|
||||
// --- Event normalization ---
|
||||
|
||||
function isObj(v: unknown): v is Record<string, unknown> {
|
||||
@@ -119,7 +70,7 @@ interface NormalizedEvent {
|
||||
}
|
||||
|
||||
/**
|
||||
* Extract the essentials from a Google Chat webhook event.
|
||||
* Extract the essentials from a Google Chat event.
|
||||
* Handles both legacy and Workspace Add-ons format.
|
||||
*/
|
||||
function normalizeEvent(raw: Record<string, unknown>): NormalizedEvent | null {
|
||||
@@ -218,70 +169,7 @@ async function sendToAgent(text: string): Promise<string> {
|
||||
return texts.join('\n') || '(no response)';
|
||||
}
|
||||
|
||||
// --- Express app ---
|
||||
|
||||
export function createBridgeApp(): express.Express {
|
||||
const app = express();
|
||||
app.use(express.json());
|
||||
|
||||
const chatApi = new ChatApiClient();
|
||||
const auth = createAuthMiddleware();
|
||||
|
||||
// Health check
|
||||
app.get('/health', (_req, res) => {
|
||||
res.json({ status: 'ok', a2aUrl: A2A_URL });
|
||||
});
|
||||
|
||||
// Google Chat webhook
|
||||
app.post('/chat/webhook', auth, (req, res) => {
|
||||
// eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion
|
||||
const raw = req.body as Record<string, unknown>;
|
||||
const event = normalizeEvent(raw);
|
||||
|
||||
if (!event) {
|
||||
logger.warn(
|
||||
`[Bridge] Unknown event format: ${Object.keys(raw).join(',')}`,
|
||||
);
|
||||
res.json({});
|
||||
return;
|
||||
}
|
||||
|
||||
logger.info(
|
||||
`[Bridge] ${event.type}: space=${event.spaceName} text="${event.text.substring(0, 100)}"`,
|
||||
);
|
||||
|
||||
// Handle non-message events
|
||||
if (event.type === 'ADDED_TO_SPACE') {
|
||||
res.json({
|
||||
hostAppDataAction: {
|
||||
chatDataAction: {
|
||||
createMessageAction: {
|
||||
message: {
|
||||
text: 'Gemini CLI forever agent connected. Send me a task!',
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
if (event.type !== 'MESSAGE' || !event.text) {
|
||||
res.json({});
|
||||
return;
|
||||
}
|
||||
|
||||
// Immediately ack the webhook (30s timeout)
|
||||
res.json({});
|
||||
|
||||
// Process async — send to agent, push response back via Chat API
|
||||
processMessageAsync(chatApi, event).catch((err) => {
|
||||
logger.error(`[Bridge] Async processing failed: ${err}`);
|
||||
});
|
||||
});
|
||||
|
||||
return app;
|
||||
}
|
||||
// --- Async message processing ---
|
||||
|
||||
async function processMessageAsync(
|
||||
chatApi: ChatApiClient,
|
||||
@@ -307,15 +195,96 @@ async function processMessageAsync(
|
||||
}
|
||||
}
|
||||
|
||||
// --- Pub/Sub subscriber ---
|
||||
|
||||
function startSubscriber(): void {
|
||||
if (!GOOGLE_CLOUD_PROJECT) {
|
||||
logger.error(
|
||||
'[Bridge] GOOGLE_CLOUD_PROJECT not set — cannot start Pub/Sub subscriber',
|
||||
);
|
||||
process.exit(1);
|
||||
}
|
||||
|
||||
const pubsub = new PubSub({ projectId: GOOGLE_CLOUD_PROJECT });
|
||||
const subscription = pubsub.subscription(PUBSUB_SUBSCRIPTION);
|
||||
const chatApi = new ChatApiClient();
|
||||
|
||||
logger.info(
|
||||
`[Bridge] Subscribing to ${PUBSUB_SUBSCRIPTION} in project ${GOOGLE_CLOUD_PROJECT}`,
|
||||
);
|
||||
logger.info(`[Bridge] Forwarding to agent at ${A2A_URL}`);
|
||||
|
||||
subscription.on('message', (message) => {
|
||||
try {
|
||||
// eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion
|
||||
const raw = JSON.parse(message.data.toString()) as Record<
|
||||
string,
|
||||
unknown
|
||||
>;
|
||||
message.ack();
|
||||
|
||||
const event = normalizeEvent(raw);
|
||||
if (!event) {
|
||||
logger.warn(
|
||||
`[Bridge] Unknown event format: ${Object.keys(raw).join(',')}`,
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
logger.info(
|
||||
`[Bridge] ${event.type}: space=${event.spaceName} text="${event.text.substring(0, 100)}"`,
|
||||
);
|
||||
|
||||
if (event.type === 'ADDED_TO_SPACE') {
|
||||
chatApi
|
||||
.sendMessage(event.spaceName, '', {
|
||||
text: 'Gemini CLI forever agent connected. Send me a task!',
|
||||
})
|
||||
.catch((err) =>
|
||||
logger.error(`[Bridge] Welcome message failed: ${err}`),
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
if (event.type !== 'MESSAGE' || !event.text) return;
|
||||
|
||||
processMessageAsync(chatApi, event).catch((err) => {
|
||||
logger.error(`[Bridge] Async processing failed: ${err}`);
|
||||
});
|
||||
} catch (err) {
|
||||
const msg = err instanceof Error ? err.message : String(err);
|
||||
logger.error(`[Bridge] Failed to parse Pub/Sub message: ${msg}`);
|
||||
message.ack(); // ack to avoid redelivery of bad messages
|
||||
}
|
||||
});
|
||||
|
||||
subscription.on('error', (err) => {
|
||||
logger.error(`[Bridge] Pub/Sub subscription error: ${err.message}`);
|
||||
});
|
||||
|
||||
// Health check for systemd
|
||||
http
|
||||
.createServer((_req, res) => {
|
||||
res.writeHead(200, { 'Content-Type': 'application/json' });
|
||||
res.end(
|
||||
JSON.stringify({
|
||||
status: 'ok',
|
||||
mode: 'pubsub',
|
||||
subscription: PUBSUB_SUBSCRIPTION,
|
||||
a2aUrl: A2A_URL,
|
||||
}),
|
||||
);
|
||||
})
|
||||
.listen(HEALTH_PORT, '127.0.0.1', () => {
|
||||
logger.info(`[Bridge] Health check on port ${HEALTH_PORT}`);
|
||||
});
|
||||
}
|
||||
|
||||
// --- Standalone entrypoint ---
|
||||
|
||||
if (
|
||||
process.argv[1]?.endsWith('bridge.js') ||
|
||||
process.argv[1]?.endsWith('bridge.ts')
|
||||
) {
|
||||
const app = createBridgeApp();
|
||||
app.listen(BRIDGE_PORT, '0.0.0.0', () => {
|
||||
logger.info(`[Bridge] Google Chat bridge listening on port ${BRIDGE_PORT}`);
|
||||
logger.info(`[Bridge] Forwarding to agent at ${A2A_URL}`);
|
||||
});
|
||||
startSubscriber();
|
||||
}
|
||||
|
||||
@@ -93,19 +93,17 @@ You are a forever-running autonomous agent accessible via Google Chat.
|
||||
Process incoming tasks, answer questions, and proactively work on improvements.
|
||||
GEMINIEOF
|
||||
|
||||
# Create systemd service for the chat bridge
|
||||
# Create systemd service for the chat bridge (Pub/Sub mode)
|
||||
cat > /etc/systemd/system/chat-bridge.service << EOF
|
||||
[Unit]
|
||||
Description=Google Chat Bridge
|
||||
Description=Google Chat Bridge (Pub/Sub)
|
||||
After=network.target
|
||||
|
||||
[Service]
|
||||
Type=simple
|
||||
Environment=GOOGLE_API_KEY=${GEMINI_API_KEY}
|
||||
Environment=CHAT_PROJECT_NUMBER=${CHAT_PROJECT_NUMBER}
|
||||
Environment=A2A_PORT=3100
|
||||
Environment=BRIDGE_PORT=8081
|
||||
Environment=A2A_URL=http://127.0.0.1:3100
|
||||
Environment=GOOGLE_CLOUD_PROJECT=adamfweidman-test
|
||||
Environment=PUBSUB_SUBSCRIPTION=forever-agent-chat-sub
|
||||
Environment=GIT_TERMINAL_PROMPT=0
|
||||
WorkingDirectory=${REPO_DIR}
|
||||
ExecStart=/usr/bin/node ${REPO_DIR}/packages/a2a-server/dist/src/chat-bridge/bridge.js
|
||||
|
||||
Reference in New Issue
Block a user