mirror of
https://github.com/google-gemini/gemini-cli.git
synced 2026-03-16 00:51:25 -07:00
feat: introduce Forever Mode with A2A listener
- Sisyphus: auto-resume timer with schedule_work tool - Confucius: built-in sub-agent for knowledge consolidation before compression - Hippocampus: in-memory short-term memory via background micro-consolidation - Bicameral Voice: proactive knowledge alignment on user input - Archive compression mode for long-running sessions - Onboarding dialog for first-time Forever Mode setup - Refresh system instruction per turn so hippocampus reaches the model - Auto-start A2A HTTP server when Forever Mode + Sisyphus enabled - Bridge external messages into session and capture responses - Display A2A port in status bar alongside Sisyphus timer
This commit is contained in:
56
scripts/forever-bridge/README.md
Normal file
56
scripts/forever-bridge/README.md
Normal file
@@ -0,0 +1,56 @@
|
||||
# Forever Bridge — Google Chat via Pub/Sub
|
||||
|
||||
Zero-dependency Node.js service that runs Gemini CLI in forever mode on a GCE
|
||||
VM, receiving Google Chat messages via Cloud Pub/Sub and responding via Chat
|
||||
REST API.
|
||||
|
||||
**No `npm install` needed.** Auth uses the GCE metadata server; Pub/Sub and Chat
|
||||
use raw REST APIs. Just `node main.mjs`.
|
||||
|
||||
## Architecture
|
||||
|
||||
```
|
||||
Google Chat Space
|
||||
↓ Pub/Sub topic
|
||||
Cloud Pub/Sub (pull)
|
||||
↓
|
||||
main.mjs (poll loop)
|
||||
├── process-manager.mjs → npx gemini-cli --forever -r -y -m gemini-3-flash-preview
|
||||
│ └── A2A JSON-RPC on localhost:3100
|
||||
└── Chat REST API → Google Chat Space (response)
|
||||
```
|
||||
|
||||
## Files
|
||||
|
||||
| File | Purpose |
|
||||
| --------------------- | ------------------------------------------------------------------------ |
|
||||
| `main.mjs` | Pub/Sub poller, Chat API client, A2A forwarder, health check, entrypoint |
|
||||
| `process-manager.mjs` | Spawns/restarts Gemini CLI with exponential backoff |
|
||||
|
||||
## Usage
|
||||
|
||||
```bash
|
||||
# On a GCE VM with appropriate service account:
|
||||
export GOOGLE_API_KEY="your-gemini-api-key"
|
||||
export PUBSUB_SUBSCRIPTION="projects/my-project/subscriptions/chat-sub"
|
||||
|
||||
node main.mjs
|
||||
```
|
||||
|
||||
## Environment Variables
|
||||
|
||||
| Variable | Required | Default | Description |
|
||||
| --------------------- | -------- | ------------------------ | --------------------------------- |
|
||||
| `GOOGLE_API_KEY` | Yes | — | Gemini API key |
|
||||
| `PUBSUB_SUBSCRIPTION` | Yes | — | `projects/PROJ/subscriptions/SUB` |
|
||||
| `A2A_PORT` | No | `3100` | Agent A2A port |
|
||||
| `GEMINI_MODEL` | No | `gemini-3-flash-preview` | Model |
|
||||
| `HEALTH_PORT` | No | `8081` | Health check port |
|
||||
| `POLL_INTERVAL_MS` | No | `1000` | Pub/Sub poll interval |
|
||||
|
||||
## GCP Prerequisites
|
||||
|
||||
1. Chat API + Pub/Sub API enabled
|
||||
2. Service account with `Pub/Sub Subscriber` + `Chat Bot` roles
|
||||
3. Google Chat app configured with **Cloud Pub/Sub** connection type
|
||||
4. Pub/Sub subscription created for the Chat topic
|
||||
631
scripts/forever-bridge/main.mjs
Normal file
631
scripts/forever-bridge/main.mjs
Normal file
@@ -0,0 +1,631 @@
|
||||
/**
|
||||
* @license
|
||||
* Copyright 2026 Google LLC
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
/**
|
||||
* Forever Bridge — Zero-dependency Pub/Sub Google Chat bridge.
|
||||
*
|
||||
* Runs Gemini CLI in forever mode, pulls Chat messages from Pub/Sub,
|
||||
* forwards to the agent via A2A JSON-RPC, sends responses back via Chat API.
|
||||
* Auto-restarts the CLI on crash. Auth via GCE metadata server.
|
||||
*
|
||||
* Just: node main.mjs
|
||||
*/
|
||||
|
||||
import http from 'node:http';
|
||||
import { spawnSync } from 'node:child_process';
|
||||
import { homedir } from 'node:os';
|
||||
import { readdirSync, readFileSync, writeFileSync, mkdirSync } from 'node:fs';
|
||||
import { join } from 'node:path';
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Config
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
let A2A_URL = null;
|
||||
const HEALTH_PORT = parseInt(process.env.HEALTH_PORT || '8081', 10);
|
||||
const POLL_INTERVAL_MS = parseInt(process.env.POLL_INTERVAL_MS || '1000', 10);
|
||||
const GEMINI_MODEL = process.env.GEMINI_MODEL || 'gemini-3-flash-preview';
|
||||
const GEMINI_NPX_PACKAGE =
|
||||
process.env.GEMINI_NPX_PACKAGE ||
|
||||
'https://github.com/google-gemini/gemini-cli#forever';
|
||||
|
||||
const PUBSUB_API = 'https://pubsub.googleapis.com/v1';
|
||||
const CHAT_API = 'https://chat.googleapis.com/v1';
|
||||
const METADATA_BASE = 'http://metadata.google.internal/computeMetadata/v1';
|
||||
const METADATA_TOKEN_URL = `${METADATA_BASE}/instance/service-accounts/default/token`;
|
||||
const A2A_TIMEOUT_MS = 5 * 60 * 1000;
|
||||
const MAX_CHAT_TEXT_LENGTH = 4000;
|
||||
|
||||
let PUBSUB_SUBSCRIPTION;
|
||||
|
||||
function log(msg) {
|
||||
console.log(`[${new Date().toISOString()}] [Bridge] ${msg}`);
|
||||
}
|
||||
|
||||
function logError(msg) {
|
||||
console.error(`[${new Date().toISOString()}] [Bridge] ${msg}`);
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// GCE metadata — project ID + auth tokens (zero dependencies)
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
async function getProjectId() {
|
||||
const res = await fetch(`${METADATA_BASE}/project/project-id`, {
|
||||
headers: { 'Metadata-Flavor': 'Google' },
|
||||
});
|
||||
if (!res.ok) throw new Error(`Failed to get project ID: ${res.status}`);
|
||||
return (await res.text()).trim();
|
||||
}
|
||||
|
||||
let cachedToken = null;
|
||||
let tokenExpiresAt = 0;
|
||||
|
||||
async function getAccessToken(scope) {
|
||||
const now = Date.now();
|
||||
if (cachedToken && now < tokenExpiresAt - 30_000) {
|
||||
return cachedToken;
|
||||
}
|
||||
|
||||
const url = scope
|
||||
? `${METADATA_TOKEN_URL}?scopes=${encodeURIComponent(scope)}`
|
||||
: METADATA_TOKEN_URL;
|
||||
const res = await fetch(url, {
|
||||
headers: { 'Metadata-Flavor': 'Google' },
|
||||
});
|
||||
|
||||
if (!res.ok) throw new Error(`Metadata token request failed: ${res.status}`);
|
||||
|
||||
const data = await res.json();
|
||||
cachedToken = data.access_token;
|
||||
tokenExpiresAt = now + data.expires_in * 1000;
|
||||
return cachedToken;
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Process manager — spawn/restart Gemini CLI forever session
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
const PM_MIN_BACKOFF_MS = 1_000;
|
||||
const PM_MAX_BACKOFF_MS = 60_000;
|
||||
const PM_STABLE_UPTIME_MS = 30_000;
|
||||
const PM_READY_POLL_MS = 2_000;
|
||||
const PM_READY_TIMEOUT_MS = 120_000;
|
||||
|
||||
let pmStopping = false;
|
||||
let pmBackoffMs = PM_MIN_BACKOFF_MS;
|
||||
let pmReadyResolve = null;
|
||||
let agentReady = false;
|
||||
|
||||
function hasExistingSession() {
|
||||
const tmpDir = join(homedir(), '.gemini', 'tmp');
|
||||
try {
|
||||
for (const project of readdirSync(tmpDir)) {
|
||||
const chatsDir = join(tmpDir, project, 'chats');
|
||||
try {
|
||||
if (readdirSync(chatsDir).some(f => f.endsWith('.json'))) return true;
|
||||
} catch { /* no chats dir for this project */ }
|
||||
}
|
||||
} catch { /* no tmp dir yet */ }
|
||||
return false;
|
||||
}
|
||||
|
||||
const TMUX_SESSION = 'gemini-forever';
|
||||
|
||||
function buildCommand() {
|
||||
const cliArgs = [
|
||||
GEMINI_NPX_PACKAGE,
|
||||
'--forever', '-y', '-m', GEMINI_MODEL,
|
||||
];
|
||||
|
||||
if (hasExistingSession()) {
|
||||
cliArgs.push('-r');
|
||||
log('Resuming previous session');
|
||||
}
|
||||
|
||||
const npxCmd = `npx -y ${cliArgs.join(' ')}`;
|
||||
|
||||
// Run inside a tmux session so Ink gets a real TTY without overpainting the bridge
|
||||
return {
|
||||
cmd: 'tmux',
|
||||
args: ['new-session', '-d', '-s', TMUX_SESSION, '-x', '200', '-y', '50', npxCmd],
|
||||
};
|
||||
}
|
||||
|
||||
function discoverPort() {
|
||||
const sessionsDir = join(homedir(), '.gemini', 'sessions');
|
||||
try {
|
||||
const files = readdirSync(sessionsDir).filter(f => f.endsWith('.port'));
|
||||
for (const f of files) {
|
||||
const port = parseInt(readFileSync(join(sessionsDir, f), 'utf-8').trim(), 10);
|
||||
if (port > 0) return port;
|
||||
}
|
||||
} catch {
|
||||
// directory may not exist yet
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
async function waitForA2AReady() {
|
||||
const deadline = Date.now() + PM_READY_TIMEOUT_MS;
|
||||
|
||||
while (Date.now() < deadline) {
|
||||
const port = discoverPort();
|
||||
if (port) {
|
||||
const url = `http://127.0.0.1:${port}`;
|
||||
try {
|
||||
const res = await fetch(`${url}/.well-known/agent-card.json`);
|
||||
if (res.ok) {
|
||||
A2A_URL = url;
|
||||
log(`Agent ready at ${A2A_URL} (port ${port} from file)`);
|
||||
return true;
|
||||
}
|
||||
} catch {
|
||||
// port file exists but agent not responding yet
|
||||
}
|
||||
}
|
||||
await new Promise(r => setTimeout(r, PM_READY_POLL_MS));
|
||||
}
|
||||
|
||||
logError(`Agent did not become ready within ${PM_READY_TIMEOUT_MS / 1000}s`);
|
||||
return false;
|
||||
}
|
||||
|
||||
function scheduleRestart() {
|
||||
if (pmStopping) return;
|
||||
log(`Restarting in ${pmBackoffMs}ms...`);
|
||||
setTimeout(() => {
|
||||
pmBackoffMs = Math.min(pmBackoffMs * 2, PM_MAX_BACKOFF_MS);
|
||||
spawnAgent();
|
||||
}, pmBackoffMs);
|
||||
}
|
||||
|
||||
function isTmuxSessionAlive() {
|
||||
try {
|
||||
const result = spawnSync('tmux', ['has-session', '-t', TMUX_SESSION], { stdio: 'ignore' });
|
||||
return result.status === 0;
|
||||
} catch {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
function spawnAgent() {
|
||||
if (pmStopping) return;
|
||||
|
||||
// Kill stale tmux session if it exists
|
||||
spawnSync('tmux', ['kill-session', '-t', TMUX_SESSION], { stdio: 'ignore' });
|
||||
|
||||
const { cmd, args } = buildCommand();
|
||||
log(`Spawning: ${cmd} ${args.join(' ')}`);
|
||||
|
||||
const startTime = Date.now();
|
||||
agentReady = false;
|
||||
|
||||
const result = spawnSync(cmd, args, {
|
||||
env: { ...process.env },
|
||||
stdio: 'ignore',
|
||||
});
|
||||
|
||||
if (result.status !== 0) {
|
||||
logError(`Failed to start tmux session (exit ${result.status})`);
|
||||
scheduleRestart();
|
||||
return;
|
||||
}
|
||||
|
||||
log(`tmux session '${TMUX_SESSION}' started`);
|
||||
|
||||
// Monitor the tmux session
|
||||
const monitor = setInterval(() => {
|
||||
if (pmStopping) { clearInterval(monitor); return; }
|
||||
|
||||
if (!isTmuxSessionAlive()) {
|
||||
clearInterval(monitor);
|
||||
const uptime = Date.now() - startTime;
|
||||
log(`Agent tmux session died: uptime=${uptime}ms`);
|
||||
if (uptime > PM_STABLE_UPTIME_MS) pmBackoffMs = PM_MIN_BACKOFF_MS;
|
||||
agentReady = false;
|
||||
if (!pmStopping) scheduleRestart();
|
||||
}
|
||||
}, 5000);
|
||||
|
||||
waitForA2AReady().then(ready => {
|
||||
if (ready) {
|
||||
agentReady = true;
|
||||
pmBackoffMs = PM_MIN_BACKOFF_MS;
|
||||
if (pmReadyResolve) { pmReadyResolve(); pmReadyResolve = null; }
|
||||
} else if (!pmStopping) {
|
||||
logError('Killing agent (A2A port never became ready)');
|
||||
spawnSync('tmux', ['kill-session', '-t', TMUX_SESSION], { stdio: 'ignore' });
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
function startAgent() {
|
||||
pmStopping = false;
|
||||
const p = new Promise(resolve => { pmReadyResolve = resolve; });
|
||||
spawnAgent();
|
||||
return p;
|
||||
}
|
||||
|
||||
function stopAgent() {
|
||||
pmStopping = true;
|
||||
log('Killing tmux session');
|
||||
spawnSync('tmux', ['kill-session', '-t', TMUX_SESSION], { stdio: 'ignore' });
|
||||
}
|
||||
|
||||
function isReady() {
|
||||
return agentReady;
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Pub/Sub pull (REST API)
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
let polling = true;
|
||||
|
||||
async function pullMessages() {
|
||||
const token = await getAccessToken();
|
||||
const res = await fetch(`${PUBSUB_API}/${PUBSUB_SUBSCRIPTION}:pull`, {
|
||||
method: 'POST',
|
||||
headers: { Authorization: `Bearer ${token}`, 'Content-Type': 'application/json' },
|
||||
body: JSON.stringify({ maxMessages: 10 }),
|
||||
});
|
||||
|
||||
if (!res.ok) throw new Error(`Pub/Sub pull failed: ${res.status} ${await res.text()}`);
|
||||
const data = await res.json();
|
||||
return data.receivedMessages || [];
|
||||
}
|
||||
|
||||
async function ackMessages(ackIds) {
|
||||
if (ackIds.length === 0) return;
|
||||
const token = await getAccessToken();
|
||||
await fetch(`${PUBSUB_API}/${PUBSUB_SUBSCRIPTION}:acknowledge`, {
|
||||
method: 'POST',
|
||||
headers: { Authorization: `Bearer ${token}`, 'Content-Type': 'application/json' },
|
||||
body: JSON.stringify({ ackIds }),
|
||||
});
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Google Chat API
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
async function sendChatMessage(spaceName, threadName, text, isDm = false) {
|
||||
const chunks = splitText(text || '(no response)');
|
||||
|
||||
for (const chunk of chunks) {
|
||||
const message = { text: chunk };
|
||||
if (!isDm && threadName) message.thread = { name: threadName };
|
||||
|
||||
const queryParam = isDm ? '' : '?messageReplyOption=REPLY_MESSAGE_FALLBACK_TO_NEW_THREAD';
|
||||
const url = `${CHAT_API}/${spaceName}/messages${queryParam}`;
|
||||
|
||||
try {
|
||||
const token = await getAccessToken('https://www.googleapis.com/auth/chat.bot');
|
||||
const res = await fetch(url, {
|
||||
method: 'POST',
|
||||
headers: { Authorization: `Bearer ${token}`, 'Content-Type': 'application/json' },
|
||||
body: JSON.stringify(message),
|
||||
});
|
||||
|
||||
if (!res.ok) logError(`Chat API ${res.status}: ${await res.text()}`);
|
||||
else log(`Sent to ${spaceName} (${chunk.length} chars)`);
|
||||
} catch (err) {
|
||||
logError(`Chat API error: ${err.message}`);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
function splitText(text) {
|
||||
if (text.length <= MAX_CHAT_TEXT_LENGTH) return [text];
|
||||
const chunks = [];
|
||||
let remaining = text;
|
||||
while (remaining.length > MAX_CHAT_TEXT_LENGTH) {
|
||||
let splitAt = remaining.lastIndexOf('\n\n', MAX_CHAT_TEXT_LENGTH);
|
||||
if (splitAt < MAX_CHAT_TEXT_LENGTH * 0.3) splitAt = remaining.lastIndexOf('\n', MAX_CHAT_TEXT_LENGTH);
|
||||
if (splitAt < MAX_CHAT_TEXT_LENGTH * 0.3) splitAt = MAX_CHAT_TEXT_LENGTH;
|
||||
chunks.push(remaining.substring(0, splitAt));
|
||||
remaining = remaining.substring(splitAt);
|
||||
}
|
||||
if (remaining) chunks.push(remaining);
|
||||
return chunks;
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Event normalization
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
function isObj(v) {
|
||||
return typeof v === 'object' && v !== null && !Array.isArray(v);
|
||||
}
|
||||
|
||||
function str(obj, key) {
|
||||
const v = obj[key];
|
||||
return typeof v === 'string' ? v : '';
|
||||
}
|
||||
|
||||
function getSpaceType(space) {
|
||||
// Google Chat uses 'type' or 'spaceType' depending on API version
|
||||
return str(space, 'type') || str(space, 'spaceType');
|
||||
}
|
||||
|
||||
function normalizeEvent(raw) {
|
||||
if (typeof raw.type === 'string') {
|
||||
const message = isObj(raw.message) ? raw.message : {};
|
||||
const space = isObj(raw.space) ? raw.space : isObj(message.space) ? message.space : {};
|
||||
const thread = isObj(message.thread) ? message.thread : {};
|
||||
return { type: raw.type, text: str(message, 'text'), spaceName: str(space, 'name'), threadName: str(thread, 'name'), spaceType: getSpaceType(space) };
|
||||
}
|
||||
|
||||
const chat = raw.chat;
|
||||
if (!isObj(chat)) return null;
|
||||
|
||||
if (isObj(chat.messagePayload)) {
|
||||
const payload = chat.messagePayload;
|
||||
const message = isObj(payload.message) ? payload.message : {};
|
||||
const space = isObj(payload.space) ? payload.space : isObj(message.space) ? message.space : {};
|
||||
const thread = isObj(message.thread) ? message.thread : {};
|
||||
return { type: 'MESSAGE', text: str(message, 'text'), spaceName: str(space, 'name'), threadName: str(thread, 'name'), spaceType: getSpaceType(space) };
|
||||
}
|
||||
|
||||
if (isObj(chat.addedToSpacePayload)) {
|
||||
const space = isObj(chat.addedToSpacePayload.space) ? chat.addedToSpacePayload.space : {};
|
||||
return { type: 'ADDED_TO_SPACE', text: '', spaceName: str(space, 'name'), threadName: '', spaceType: getSpaceType(space) };
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// A2A JSON-RPC
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
async function sendToAgent(text) {
|
||||
const controller = new AbortController();
|
||||
const timeout = setTimeout(() => controller.abort(), A2A_TIMEOUT_MS);
|
||||
|
||||
try {
|
||||
const res = await fetch(A2A_URL, {
|
||||
method: 'POST',
|
||||
headers: { 'Content-Type': 'application/json' },
|
||||
body: JSON.stringify({
|
||||
jsonrpc: '2.0',
|
||||
id: Date.now(),
|
||||
method: 'message/send',
|
||||
params: { message: { role: 'user', parts: [{ kind: 'text', text }] } },
|
||||
}),
|
||||
signal: controller.signal,
|
||||
});
|
||||
|
||||
if (!res.ok) throw new Error(`Agent ${res.status}: ${await res.text()}`);
|
||||
const result = await res.json();
|
||||
if (result.error) throw new Error(`Agent error: ${result.error.message}`);
|
||||
|
||||
const parts = result.result?.status?.message?.parts ?? [];
|
||||
return parts.filter(p => p.kind === 'text' && p.text).map(p => p.text).join('\n') || '(no response)';
|
||||
} finally {
|
||||
clearTimeout(timeout);
|
||||
}
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Message handler
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
// Track last active Chat space for unsolicited response forwarding
|
||||
let lastChatSpace = null;
|
||||
let lastChatThread = null;
|
||||
let lastChatIsDm = false;
|
||||
|
||||
async function handleMessage(data) {
|
||||
let raw;
|
||||
try {
|
||||
raw = JSON.parse(Buffer.from(data, 'base64').toString('utf-8'));
|
||||
} catch (err) {
|
||||
logError(`Bad message data: ${err.message}`);
|
||||
return;
|
||||
}
|
||||
|
||||
const event = normalizeEvent(raw);
|
||||
if (!event) return;
|
||||
|
||||
const isDm = event.spaceType === 'DM';
|
||||
log(`${event.type}: space=${event.spaceName} type=${event.spaceType || 'unknown'} text="${event.text.substring(0, 100)}"`);
|
||||
|
||||
if (event.type === 'ADDED_TO_SPACE') {
|
||||
await sendChatMessage(event.spaceName, '', 'Gemini CLI forever agent connected. Send me a task!', isDm);
|
||||
return;
|
||||
}
|
||||
|
||||
if (event.type !== 'MESSAGE' || !event.text) return;
|
||||
|
||||
// Track last active space for unsolicited response forwarding
|
||||
lastChatSpace = event.spaceName;
|
||||
lastChatThread = event.threadName;
|
||||
lastChatIsDm = isDm;
|
||||
|
||||
if (!isReady()) {
|
||||
await sendChatMessage(event.spaceName, event.threadName, '⏳ Agent is starting up, please try again in a moment.', isDm);
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
log(`→ Agent: "${event.text.substring(0, 100)}"`);
|
||||
const responseText = await sendToAgent(event.text);
|
||||
log(`← Agent: ${responseText.length} chars`);
|
||||
await sendChatMessage(event.spaceName, event.threadName, responseText, isDm);
|
||||
} catch (err) {
|
||||
logError(`Error: ${err.message}`);
|
||||
await sendChatMessage(event.spaceName, event.threadName, `❌ Error: ${err.message}`, isDm);
|
||||
}
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Poll unsolicited responses (Sisyphus auto-resume output, etc.)
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
const UNSOLICITED_POLL_MS = 5000;
|
||||
|
||||
async function pollUnsolicitedResponses() {
|
||||
while (polling) {
|
||||
if (isReady() && A2A_URL && lastChatSpace) {
|
||||
try {
|
||||
const res = await fetch(A2A_URL, {
|
||||
method: 'POST',
|
||||
headers: { 'Content-Type': 'application/json' },
|
||||
body: JSON.stringify({
|
||||
jsonrpc: '2.0',
|
||||
id: Date.now(),
|
||||
method: 'responses/poll',
|
||||
params: {},
|
||||
}),
|
||||
});
|
||||
|
||||
if (res.ok) {
|
||||
const data = await res.json();
|
||||
const responses = data.result?.responses ?? [];
|
||||
for (const text of responses) {
|
||||
if (text) {
|
||||
log(`← Unsolicited (${text.length} chars) → ${lastChatSpace}`);
|
||||
await sendChatMessage(lastChatSpace, lastChatThread, text, lastChatIsDm);
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch {
|
||||
// agent may be restarting
|
||||
}
|
||||
}
|
||||
await new Promise(r => setTimeout(r, UNSOLICITED_POLL_MS));
|
||||
}
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Poll loop
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
async function pollLoop() {
|
||||
while (polling) {
|
||||
try {
|
||||
const messages = await pullMessages();
|
||||
|
||||
if (messages.length > 0) {
|
||||
await ackMessages(messages.map(m => m.ackId));
|
||||
|
||||
for (const msg of messages) {
|
||||
await handleMessage(msg.message.data);
|
||||
}
|
||||
}
|
||||
} catch (err) {
|
||||
logError(`Poll error: ${err.message}`);
|
||||
}
|
||||
|
||||
await new Promise(r => setTimeout(r, POLL_INTERVAL_MS));
|
||||
}
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Health check
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
function startHealthServer() {
|
||||
const server = http.createServer((req, res) => {
|
||||
if (req.method === 'GET' && req.url === '/health') {
|
||||
const code = isReady() ? 200 : 503;
|
||||
res.writeHead(code, { 'Content-Type': 'application/json' });
|
||||
res.end(JSON.stringify({ status: isReady() ? 'ok' : 'agent_starting', agentReady: isReady(), a2aUrl: A2A_URL || 'discovering...' }));
|
||||
return;
|
||||
}
|
||||
res.writeHead(404);
|
||||
res.end('Not found');
|
||||
});
|
||||
server.listen(HEALTH_PORT, '0.0.0.0', () => log(`Health check on :${HEALTH_PORT}`));
|
||||
return server;
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Main
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
const WORKSPACE_DIR = join(homedir(), 'forever-workspace');
|
||||
|
||||
function setupWorkspace() {
|
||||
const home = homedir();
|
||||
const userGemini = join(home, '.gemini');
|
||||
|
||||
// Create a clean workspace directory for the agent
|
||||
mkdirSync(WORKSPACE_DIR, { recursive: true });
|
||||
const wsGemini = join(WORKSPACE_DIR, '.gemini');
|
||||
mkdirSync(wsGemini, { recursive: true });
|
||||
|
||||
// Pre-trust the workspace directory
|
||||
const trustFile = join(userGemini, 'trustedFolders.json');
|
||||
try {
|
||||
mkdirSync(userGemini, { recursive: true });
|
||||
const existing = (() => { try { return JSON.parse(readFileSync(trustFile, 'utf-8')); } catch { return {}; } })();
|
||||
existing[WORKSPACE_DIR] = 'TRUST_FOLDER';
|
||||
writeFileSync(trustFile, JSON.stringify(existing, null, 2));
|
||||
log(`Trusted folder: ${WORKSPACE_DIR}`);
|
||||
} catch (err) {
|
||||
logError(`Failed to write trust config: ${err.message}`);
|
||||
}
|
||||
|
||||
// Disable interactive prompts via settings
|
||||
const settings = {
|
||||
security: {
|
||||
folderTrust: { enabled: false },
|
||||
auth: { selectedType: 'gemini-api-key', useExternal: true },
|
||||
},
|
||||
general: { sessionRetention: { enabled: true, maxAge: '30d', warningAcknowledged: true } },
|
||||
experimental: { enableAgents: true },
|
||||
};
|
||||
for (const dir of [userGemini, wsGemini]) {
|
||||
try {
|
||||
mkdirSync(dir, { recursive: true });
|
||||
writeFileSync(join(dir, 'settings.json'), JSON.stringify(settings, null, 2));
|
||||
} catch { /* best effort */ }
|
||||
}
|
||||
|
||||
// Change to workspace so the agent runs there
|
||||
process.chdir(WORKSPACE_DIR);
|
||||
log(`Working directory: ${WORKSPACE_DIR}`);
|
||||
}
|
||||
|
||||
async function main() {
|
||||
log('=== Forever Bridge starting ===');
|
||||
|
||||
setupWorkspace();
|
||||
|
||||
const projectId = await getProjectId();
|
||||
PUBSUB_SUBSCRIPTION = `projects/${projectId}/subscriptions/gcli-agent-sub`;
|
||||
log(`Project: ${projectId}`);
|
||||
log(`Subscription: ${PUBSUB_SUBSCRIPTION}`);
|
||||
|
||||
const healthServer = startHealthServer();
|
||||
|
||||
log('Starting Gemini CLI...');
|
||||
await startAgent();
|
||||
log(`Agent ready at ${A2A_URL}`);
|
||||
|
||||
log('Polling Pub/Sub for Chat messages...');
|
||||
pollLoop();
|
||||
pollUnsolicitedResponses();
|
||||
|
||||
const shutdown = () => {
|
||||
log('Shutting down...');
|
||||
polling = false;
|
||||
stopAgent();
|
||||
healthServer.close();
|
||||
setTimeout(() => process.exit(0), 2000);
|
||||
};
|
||||
|
||||
process.on('SIGTERM', shutdown);
|
||||
process.on('SIGINT', shutdown);
|
||||
}
|
||||
|
||||
main().catch(err => {
|
||||
logError(`Fatal: ${err.message}`);
|
||||
process.exit(1);
|
||||
});
|
||||
Reference in New Issue
Block a user