feat: async streaming chat bridge with Chat API push

Replace blocking A2A calls with streaming: webhook returns
immediate "Processing..." response, then streams results from
A2A agent and pushes them to Google Chat via REST API.

- Add ChatApiClient for proactive messaging via Chat REST API
- Add sendMessageStream() to A2ABridgeClient for SSE streaming
- Add extractFromStreamEvent() for parsing individual stream events
- Refactor handler to fire-and-forget async processing
- Fix isTerminal logic to use stream state instead of taskId presence
- Add asyncProcessing guard to prevent overlapping requests
- Add comprehensive README with deployment and setup guide
This commit is contained in:
Adam Weidman
2026-02-14 19:54:47 -07:00
parent 03ef280120
commit 4f3ffc8959
8 changed files with 819 additions and 95 deletions
+287 -2
View File
@@ -1,5 +1,290 @@
# Gemini CLI A2A Server
## All code in this package is experimental and under active development
> **Experimental** - This package is under active development.
This package contains the A2A server implementation for the Gemini CLI.
An [A2A (Agent-to-Agent)](https://google.github.io/A2A/) server that wraps the
Gemini CLI agent, enabling remote interaction via the A2A protocol. Includes a
Google Chat bridge for using the agent directly from Google Chat.
## Architecture
```
Google Chat ──webhook──> Chat Bridge ──A2A──> A2A Server ──> Gemini CLI Agent
└── Chat REST API (push responses back to Chat)
```
The server runs as a single Cloud Run container with two logical components:
1. **A2A Server** - Standard A2A protocol endpoint (JSON-RPC + SSE streaming)
that wraps the Gemini CLI agent
2. **Chat Bridge** - Translates between Google Chat webhook events and A2A
protocol, pushing streamed results back via the Chat REST API
The Chat Bridge responds immediately to webhooks with "Processing..." (avoiding
Google Chat's 30s timeout), then streams results from the A2A agent and pushes
them to Chat as they arrive.
## Prerequisites
- **GCP project** with the following APIs enabled:
- Cloud Run API
- Cloud Build API
- Artifact Registry API
- Google Chat API
- Cloud Storage API (for session persistence)
- **gcloud CLI** authenticated with your project
- **Node.js 20+** for local development
- **Gemini API key** from [Google AI Studio](https://aistudio.google.com/)
## Environment Variables
| Variable | Required | Description |
| ---------------------------- | -------- | ---------------------------------------------------------------------------------------------------- |
| `GEMINI_API_KEY` | Yes | Gemini API key for the agent |
| `CODER_AGENT_PORT` | No | Server port (default: `8080`) |
| `CODER_AGENT_HOST` | No | Bind host (default: `localhost`, set `0.0.0.0` for containers) |
| `CODER_AGENT_WORKSPACE_PATH` | No | Agent workspace directory (default: `/workspace`) |
| `GCS_BUCKET_NAME` | No | GCS bucket for task & session persistence |
| `GEMINI_YOLO_MODE` | No | Set `true` to auto-approve all tool calls |
| `CHAT_BRIDGE_A2A_URL` | No | A2A server URL for the Chat bridge (e.g. `http://localhost:8080`). Presence enables the Chat bridge. |
| `CHAT_PROJECT_NUMBER` | No | Google Chat project number for JWT verification |
| `CHAT_SA_KEY_PATH` | No | Path to service account key for Chat API auth (uses ADC if not set) |
| `CHAT_BRIDGE_DEBUG` | No | Set `true` for verbose bridge logging |
| `GIT_TERMINAL_PROMPT` | No | Set `0` to prevent git credential prompts in headless environments |
## Local Development
### Build
From the repo root:
```bash
npm install
npm run build
```
### Run
```bash
export GEMINI_API_KEY="your-api-key"
export CODER_AGENT_PORT=8080
export CHAT_BRIDGE_A2A_URL=http://localhost:8080
node packages/a2a-server/dist/src/http/server.js
```
### Test the A2A endpoint
```bash
# Check the agent card
curl http://localhost:8080/.well-known/agent-card.json | jq .
# Send a message (JSON-RPC)
curl -X POST http://localhost:8080 \
-H "Content-Type: application/json" \
-d '{
"jsonrpc": "2.0",
"id": 1,
"method": "message/send",
"params": {
"message": {
"kind": "message",
"role": "user",
"messageId": "test-1",
"parts": [{"kind": "text", "text": "Hello, what can you do?"}]
},
"configuration": {"blocking": true}
}
}'
```
### Test the Chat Bridge
```bash
# Health check
curl http://localhost:8080/chat/health | jq .
# Simulate a Google Chat MESSAGE event
curl -X POST http://localhost:8080/chat/webhook \
-H "Content-Type: application/json" \
-d '{
"type": "MESSAGE",
"eventTime": "2026-01-01T00:00:00Z",
"message": {
"name": "spaces/test/messages/1",
"text": "Hello agent",
"thread": {"name": "spaces/test/threads/abc"},
"sender": {"name": "users/1", "displayName": "Test User"},
"space": {"name": "spaces/test", "type": "DM"}
},
"space": {"name": "spaces/test", "type": "DM"},
"user": {"name": "users/1", "displayName": "Test User"}
}'
```
## Cloud Run Deployment
### 1. Create Artifact Registry repository
```bash
export PROJECT_ID=your-project-id
export REGION=us-central1
gcloud artifacts repositories create gemini-a2a \
--repository-format=docker \
--location=$REGION \
--project=$PROJECT_ID
```
### 2. Create GCS bucket (optional, for session persistence)
```bash
gsutil mb -l $REGION gs://gemini-a2a-sessions-$PROJECT_ID
```
### 3. Build with Cloud Build
```bash
gcloud builds submit \
--config=packages/a2a-server/cloudbuild.yaml \
--project=$PROJECT_ID
```
### 4. Deploy to Cloud Run
```bash
export IMAGE=us-central1-docker.pkg.dev/$PROJECT_ID/gemini-a2a/a2a-server:latest
gcloud run deploy gemini-a2a-server \
--image=$IMAGE \
--region=$REGION \
--project=$PROJECT_ID \
--platform=managed \
--allow-unauthenticated \
--memory=2Gi \
--cpu=2 \
--timeout=300 \
--concurrency=1 \
--max-instances=1 \
--set-env-vars="GEMINI_YOLO_MODE=true,GCS_BUCKET_NAME=gemini-a2a-sessions-$PROJECT_ID,CHAT_BRIDGE_A2A_URL=http://localhost:8080" \
--set-secrets="GEMINI_API_KEY=gemini-api-key:latest"
```
> **Important**: After initial deployment, always use `--update-env-vars`
> instead of `--set-env-vars` to avoid wiping existing environment variables.
### 5. Update an existing deployment
```bash
# Update env vars without replacing existing ones
gcloud run services update gemini-a2a-server \
--region=$REGION \
--project=$PROJECT_ID \
--update-env-vars="NEW_VAR=value"
# Deploy a new image
gcloud run services update gemini-a2a-server \
--region=$REGION \
--project=$PROJECT_ID \
--image=$IMAGE
```
## Google Chat App Configuration
### 1. Create a service account for Chat API
The Chat bridge needs a service account with the Chat API scope to push messages
proactively.
```bash
# Create service account
gcloud iam service-accounts create gemini-chat-bot \
--display-name="Gemini Chat Bot" \
--project=$PROJECT_ID
# Download key (for local dev)
gcloud iam service-accounts keys create chat-sa-key.json \
--iam-account=gemini-chat-bot@$PROJECT_ID.iam.gserviceaccount.com
```
On Cloud Run, use Application Default Credentials (ADC) instead of a key file.
Grant the Cloud Run service account the `chat.bot` scope by configuring it as
the Chat app's service account.
### 2. Configure the Google Chat app
1. Go to
[Google Cloud Console > APIs & Services > Google Chat API > Configuration](https://console.cloud.google.com/apis/api/chat.googleapis.com/hangouts-chat)
2. Set **App name** and **Description**
3. Under **Connection settings**, select **HTTP endpoint URL**
4. Set the URL to your Cloud Run service URL + `/chat/webhook`:
```
https://gemini-a2a-server-HASH-uc.a.run.app/chat/webhook
```
5. Under **Authentication Audience**, select **HTTP endpoint URL**
6. Under **Visibility**, choose who can use the app
7. Under **Permissions**, configure who can install it
8. Click **Save**
### 3. Grant Cloud Run invoker permission
If your Cloud Run service requires authentication (recommended):
```bash
# Get the Chat service account
# It's usually chat@system.gserviceaccount.com
gcloud run services add-iam-policy-binding gemini-a2a-server \
--region=$REGION \
--project=$PROJECT_ID \
--member="serviceAccount:chat@system.gserviceaccount.com" \
--role="roles/run.invoker"
```
## Chat Bridge Commands
When messaging the bot in Google Chat:
| Command | Description |
| ----------------------- | --------------------------------------------------- |
| `/reset` or `reset` | Clear the current session and start fresh |
| `/yolo` | Enable YOLO mode - auto-approve all tool calls |
| `/safe` | Disable YOLO mode - require approval for tool calls |
| `approve` / `yes` / `y` | Approve a pending tool call |
| `reject` / `no` / `n` | Reject a pending tool call |
| `always allow` | Approve and always allow this tool |
## Troubleshooting
### "Gemini CLI Agent is not responding" in Google Chat
This usually means the agent hit Google Chat's 30-second webhook timeout before
the bridge could return "Processing...". Check Cloud Run logs:
```bash
gcloud run services logs read gemini-a2a-server \
--region=$REGION --project=$PROJECT_ID --limit=50
```
### Tool approvals appearing in YOLO mode
Ensure `GEMINI_YOLO_MODE=true` is set. If you used `--set-env-vars` during a
deployment, it may have wiped this variable. Use `--update-env-vars` instead.
### Agent hangs on git operations
The `GIT_TERMINAL_PROMPT=0` env var (set in the Dockerfile) prevents git from
prompting for credentials. If git operations require authentication, configure a
credential helper or use `gh auth` with a token.
### Session state lost after restart
Enable GCS persistence by setting `GCS_BUCKET_NAME`. Sessions are automatically
flushed to GCS every 30 seconds and restored on startup.
### Chat responses appear as top-level messages instead of thread replies
The Chat bridge includes `thread.name` in all responses. If replies still appear
at the top level, ensure the webhook event includes thread information. DM
conversations always thread correctly; spaces may need threading enabled.
@@ -11,7 +11,14 @@
* core/agents/remote-invocation.ts.
*/
import type { Message, Task, Part, MessageSendParams } from '@a2a-js/sdk';
import type {
Message,
Task,
Part,
MessageSendParams,
TaskStatusUpdateEvent,
TaskArtifactUpdateEvent,
} from '@a2a-js/sdk';
import {
type Client,
ClientFactory,
@@ -25,6 +32,11 @@ import { logger } from '../utils/logger.js';
import { A2UI_EXTENSION_URI, A2UI_MIME_TYPE } from '../a2ui/a2ui-extension.js';
export type A2AResponse = Message | Task;
export type A2AStreamEventData =
| Message
| Task
| TaskStatusUpdateEvent
| TaskArtifactUpdateEvent;
/**
* Extracts contextId and taskId from an A2A response.
@@ -210,6 +222,36 @@ export class A2ABridgeClient {
return this.client.sendMessage(params);
}
/**
* Sends a text message and returns a streaming async generator.
* Each yielded event is a Message, Task, TaskStatusUpdateEvent,
* or TaskArtifactUpdateEvent.
*/
sendMessageStream(
text: string,
options: { contextId?: string; taskId?: string },
): AsyncGenerator<A2AStreamEventData, void, undefined> {
if (!this.client) {
throw new Error('A2A client not initialized. Call initialize() first.');
}
const params: MessageSendParams = {
message: {
kind: 'message',
role: 'user',
messageId: uuidv4(),
parts: [{ kind: 'text', text }],
contextId: options.contextId,
taskId: options.taskId,
metadata: {
extensions: [A2UI_EXTENSION_URI],
},
},
};
return this.client.sendMessageStream(params);
}
/**
* Sends a tool confirmation action back to the A2A server.
* The action is sent as a DataPart containing the A2UI action message.
@@ -0,0 +1,151 @@
/**
* @license
* Copyright 2026 Google LLC
* SPDX-License-Identifier: Apache-2.0
*/
/**
* Google Chat REST API client for sending proactive messages.
* Used to push agent responses back to Google Chat after the webhook
* has already returned an immediate acknowledgment.
*/
import { GoogleAuth } from 'google-auth-library';
import type { ChatCardV2 } from './types.js';
import { logger } from '../utils/logger.js';
const CHAT_API_BASE = 'https://chat.googleapis.com/v1';
export interface ChatApiClientConfig {
/** Path to service account key JSON file. If not set, uses ADC. */
serviceAccountKeyPath?: string;
}
export class ChatApiClient {
private auth: GoogleAuth;
private initialized = false;
constructor(config?: ChatApiClientConfig) {
this.auth = new GoogleAuth({
keyFile: config?.serviceAccountKeyPath,
scopes: ['https://www.googleapis.com/auth/chat.bot'],
});
}
async initialize(): Promise<void> {
if (this.initialized) return;
await this.auth.getClient();
this.initialized = true;
logger.info('[ChatApiClient] Initialized with chat.bot scope');
}
/**
* Sends a new message to a Google Chat space in a specific thread.
*/
async sendMessage(
spaceName: string,
threadName: string,
options: { text?: string; cardsV2?: ChatCardV2[] },
): Promise<string | undefined> {
try {
if (!this.initialized) await this.initialize();
const message: Record<string, unknown> = {};
if (options.text) message['text'] = options.text;
if (options.cardsV2) message['cardsV2'] = options.cardsV2;
message['thread'] = { name: threadName };
const url =
`${CHAT_API_BASE}/${spaceName}/messages` +
`?messageReplyOption=REPLY_MESSAGE_FALLBACK_TO_NEW_THREAD`;
const client = await this.auth.getClient();
const headers = await client.getRequestHeaders();
const response = await fetch(url, {
method: 'POST',
headers: {
...headers,
'Content-Type': 'application/json',
},
body: JSON.stringify(message),
});
if (!response.ok) {
const body = await response.text();
logger.error(
`[ChatApiClient] sendMessage failed: ${response.status} ${body}`,
);
return undefined;
}
const result: unknown = await response.json();
let messageName: string | undefined;
if (typeof result === 'object' && result !== null && 'name' in result) {
const rec = result as Record<string, unknown>;
if (typeof rec['name'] === 'string') {
messageName = rec['name'];
}
}
logger.info(
`[ChatApiClient] Message sent to ${spaceName}: ${messageName ?? 'unknown'}`,
);
return messageName;
} catch (error) {
const msg = error instanceof Error ? error.message : 'Unknown error';
logger.error(`[ChatApiClient] sendMessage error: ${msg}`, error);
return undefined;
}
}
/**
* Updates an existing message in Google Chat.
*/
async updateMessage(
messageName: string,
options: { text?: string; cardsV2?: ChatCardV2[] },
): Promise<void> {
try {
if (!this.initialized) await this.initialize();
const message: Record<string, unknown> = {};
const updateMasks: string[] = [];
if (options.text) {
message['text'] = options.text;
updateMasks.push('text');
}
if (options.cardsV2) {
message['cardsV2'] = options.cardsV2;
updateMasks.push('cardsV2');
}
const url = `${CHAT_API_BASE}/${messageName}?updateMask=${updateMasks.join(',')}`;
const client = await this.auth.getClient();
const headers = await client.getRequestHeaders();
const response = await fetch(url, {
method: 'PATCH',
headers: {
...headers,
'Content-Type': 'application/json',
},
body: JSON.stringify(message),
});
if (!response.ok) {
const body = await response.text();
logger.error(
`[ChatApiClient] updateMessage failed: ${response.status} ${body}`,
);
} else {
logger.info(`[ChatApiClient] Message updated: ${messageName}`);
}
} catch (error) {
const msg = error instanceof Error ? error.message : 'Unknown error';
logger.error(`[ChatApiClient] updateMessage error: ${msg}`, error);
}
}
}
+239 -92
View File
@@ -6,36 +6,55 @@
/**
* Google Chat webhook handler.
* Processes incoming Google Chat events, forwards them to the A2A server,
* and converts responses back to Google Chat format.
* Responds immediately with "Processing..." and streams results
* from the A2A server, pushing updates to Chat via the REST API.
*/
import type { ChatEvent, ChatResponse, ChatBridgeConfig } from './types.js';
import type { SessionInfo } from './session-store.js';
import { SessionStore } from './session-store.js';
import {
A2ABridgeClient,
extractIdsFromResponse,
} from './a2a-bridge-client.js';
import { renderResponse, extractToolApprovals } from './response-renderer.js';
import { ChatApiClient } from './chat-api-client.js';
import { renderResponse, extractFromStreamEvent } from './response-renderer.js';
import { logger } from '../utils/logger.js';
const TERMINAL_STATES = new Set([
'completed',
'failed',
'canceled',
'rejected',
]);
export class ChatBridgeHandler {
private sessionStore: SessionStore;
private a2aClient: A2ABridgeClient;
private chatApiClient: ChatApiClient;
private initialized = false;
constructor(private config: ChatBridgeConfig) {
constructor(
private config: ChatBridgeConfig,
chatApiClient?: ChatApiClient,
) {
this.sessionStore = new SessionStore(config.gcsBucket);
this.a2aClient = new A2ABridgeClient(config.a2aServerUrl);
this.chatApiClient =
chatApiClient ??
new ChatApiClient({
serviceAccountKeyPath: config.serviceAccountKeyPath,
});
}
/**
* Initializes the A2A client connection and restores persisted sessions.
* Must be called before handling events.
* Initializes the A2A client connection, Chat API client,
* and restores persisted sessions. Must be called before handling events.
*/
async initialize(): Promise<void> {
if (this.initialized) return;
await this.a2aClient.initialize();
await this.chatApiClient.initialize();
await this.sessionStore.restore();
this.initialized = true;
logger.info(
@@ -72,6 +91,8 @@ export class ChatBridgeHandler {
/**
* Handles a MESSAGE event: user sent a text message in Chat.
* Returns an immediate "Processing..." response and processes
* the A2A request asynchronously, pushing results via Chat API.
*/
private async handleMessage(event: ChatEvent): Promise<ChatResponse> {
const message = event.message;
@@ -87,7 +108,7 @@ export class ChatBridgeHandler {
const threadName = message.thread.name;
const spaceName = event.space.name;
// Handle slash commands
// Handle slash commands synchronously (fast, no A2A call)
const trimmed = text.trim().toLowerCase();
if (
trimmed === '/reset' ||
@@ -120,104 +141,231 @@ export class ChatBridgeHandler {
`[ChatBridge] MESSAGE from ${event.user.displayName}: "${text.substring(0, 100)}"`,
);
// Handle text-based tool approval responses
const lowerText = trimmed;
if (
session.pendingToolApproval &&
(lowerText === 'approve' ||
lowerText === 'yes' ||
lowerText === 'y' ||
lowerText === 'reject' ||
lowerText === 'no' ||
lowerText === 'n' ||
lowerText === 'always allow')
) {
const approval = session.pendingToolApproval;
const isReject =
lowerText === 'reject' || lowerText === 'no' || lowerText === 'n';
const isAlwaysAllow = lowerText === 'always allow';
const outcome = isReject
? 'cancel'
: isAlwaysAllow
? 'proceed_always_tool'
: 'proceed_once';
logger.info(
`[ChatBridge] Text-based tool ${outcome}: callId=${approval.callId}, taskId=${approval.taskId}`,
);
session.pendingToolApproval = undefined;
try {
const response = await this.a2aClient.sendToolConfirmation(
approval.callId,
outcome,
approval.taskId,
{ contextId: session.contextId },
);
const { contextId: newCtxId, taskId: newTaskId } =
extractIdsFromResponse(response);
if (newCtxId) session.contextId = newCtxId;
this.sessionStore.updateTaskId(threadName, newTaskId);
const threadKey = message.thread.threadKey || threadName;
return renderResponse(response, threadKey, threadName);
} catch (error) {
const errorMsg =
error instanceof Error ? error.message : 'Unknown error';
logger.error(
`[ChatBridge] Error sending tool confirmation: ${errorMsg}`,
error,
);
return { text: `Error processing tool confirmation: ${errorMsg}` };
}
// Handle text-based tool approval responses synchronously
// (sendToolConfirmation is fast — no need for async)
if (session.pendingToolApproval && this.isToolApprovalText(trimmed)) {
return this.handleToolApprovalText(event, session, trimmed);
}
// Guard against overlapping async requests
if (session.asyncProcessing) {
return {
text: 'Still processing your previous request. Please wait...',
thread: { name: threadName },
};
}
// Fire-and-forget async processing
this.processMessageAsync(event, session, text).catch((err) => {
const msg = err instanceof Error ? err.message : 'Unknown error';
logger.error(`[ChatBridge] Async processing failed: ${msg}`, err);
});
// Return immediate acknowledgment
return {
text: '_Processing your request..._',
thread: {
threadKey: message.thread.threadKey || threadName,
name: threadName,
},
};
}
/**
* Checks if text is a tool approval command.
*/
private isToolApprovalText(text: string): boolean {
return (
text === 'approve' ||
text === 'yes' ||
text === 'y' ||
text === 'reject' ||
text === 'no' ||
text === 'n' ||
text === 'always allow'
);
}
/**
* Handles text-based tool approval responses synchronously.
*/
private async handleToolApprovalText(
event: ChatEvent,
session: SessionInfo,
trimmed: string,
): Promise<ChatResponse> {
const message = event.message!;
const threadName = message.thread.name;
const approval = session.pendingToolApproval!;
const isReject =
trimmed === 'reject' || trimmed === 'no' || trimmed === 'n';
const isAlwaysAllow = trimmed === 'always allow';
const outcome = isReject
? 'cancel'
: isAlwaysAllow
? 'proceed_always_tool'
: 'proceed_once';
logger.info(
`[ChatBridge] Text-based tool ${outcome}: callId=${approval.callId}, taskId=${approval.taskId}`,
);
session.pendingToolApproval = undefined;
try {
const response = await this.a2aClient.sendMessage(text, {
contextId: session.contextId,
taskId: session.taskId,
});
const response = await this.a2aClient.sendToolConfirmation(
approval.callId,
outcome,
approval.taskId,
{ contextId: session.contextId },
);
// Update session with new IDs from response
const { contextId, taskId } = extractIdsFromResponse(response);
if (contextId) {
session.contextId = contextId;
}
this.sessionStore.updateTaskId(threadName, taskId);
const { contextId: newCtxId, taskId: newTaskId } =
extractIdsFromResponse(response);
if (newCtxId) session.contextId = newCtxId;
this.sessionStore.updateTaskId(threadName, newTaskId);
// Check for pending tool approvals and store for text-based confirmation
const approvals = extractToolApprovals(response);
if (approvals.length > 0) {
const firstApproval = approvals[0];
session.pendingToolApproval = {
callId: firstApproval.callId,
taskId: firstApproval.taskId,
toolName: firstApproval.displayName || firstApproval.name,
};
logger.info(
`[ChatBridge] Pending tool approval: ${firstApproval.displayName || firstApproval.name} callId=${firstApproval.callId}`,
);
} else {
session.pendingToolApproval = undefined;
}
// Convert A2A response to Chat format
const threadKey = message.thread.threadKey || threadName;
return renderResponse(response, threadKey, threadName);
} catch (error) {
const errorMsg = error instanceof Error ? error.message : 'Unknown error';
logger.error(`[ChatBridge] Error handling message: ${errorMsg}`, error);
return {
text: `Sorry, I encountered an error processing your request: ${errorMsg}`,
};
logger.error(
`[ChatBridge] Error sending tool confirmation: ${errorMsg}`,
error,
);
return { text: `Error processing tool confirmation: ${errorMsg}` };
}
}
/**
* Processes a message asynchronously using A2A streaming.
* Pushes results to Google Chat via the REST API.
*/
private async processMessageAsync(
event: ChatEvent,
session: SessionInfo,
text: string,
): Promise<void> {
const message = event.message!;
const threadName = message.thread.name;
const spaceName = event.space.name;
session.asyncProcessing = true;
try {
const stream = this.a2aClient.sendMessageStream(text, {
contextId: session.contextId,
taskId: session.taskId,
});
let lastText = '';
let lastTaskId: string | undefined;
let lastContextId: string | undefined;
let lastState: string | undefined;
let sentFinalResponse = false;
for await (const streamEvent of stream) {
const extracted = extractFromStreamEvent(streamEvent);
if (extracted.taskId) lastTaskId = extracted.taskId;
if (extracted.contextId) lastContextId = extracted.contextId;
if (extracted.state) lastState = extracted.state;
// Check for tool approvals needing user input
const pendingApprovals = extracted.toolApprovals.filter(
(a) => a.status === 'awaiting_approval',
);
if (pendingApprovals.length > 0) {
const firstApproval = pendingApprovals[0];
session.pendingToolApproval = {
callId: firstApproval.callId,
taskId: firstApproval.taskId,
toolName: firstApproval.displayName || firstApproval.name,
};
// Push tool approval card to Chat
const approvalResponse = renderResponse(
{
kind: 'task',
id: firstApproval.taskId,
contextId: lastContextId ?? session.contextId,
status: {
state: 'input-required',
timestamp: new Date().toISOString(),
message:
streamEvent.kind === 'status-update'
? streamEvent.status?.message
: undefined,
},
history: [],
artifacts: [],
},
message.thread.threadKey || threadName,
threadName,
);
await this.chatApiClient.sendMessage(spaceName, threadName, {
text: approvalResponse.text,
cardsV2: approvalResponse.cardsV2,
});
sentFinalResponse = true;
logger.info(
`[ChatBridge] Pushed tool approval card: ${firstApproval.displayName || firstApproval.name}`,
);
}
// Track latest text content
if (extracted.text) {
lastText = extracted.text;
}
// On terminal or input-required state, stop streaming
if (
extracted.state &&
(TERMINAL_STATES.has(extracted.state) ||
extracted.state === 'input-required')
) {
break;
}
}
// Update session IDs
if (lastContextId) session.contextId = lastContextId;
// Clear taskId on terminal states so next message starts a fresh task
const isTerminal = lastState ? TERMINAL_STATES.has(lastState) : false;
this.sessionStore.updateTaskId(
threadName,
isTerminal ? undefined : lastTaskId,
);
// Push final text response if we haven't already pushed a tool approval
if (lastText && !sentFinalResponse) {
await this.chatApiClient.sendMessage(spaceName, threadName, {
text: lastText,
});
logger.info(
`[ChatBridge] Pushed final response (${lastText.length} chars)`,
);
} else if (!lastText && !sentFinalResponse) {
await this.chatApiClient.sendMessage(spaceName, threadName, {
text: '_Agent completed without generating a response._',
});
}
} catch (error) {
const errorMsg = error instanceof Error ? error.message : 'Unknown error';
logger.error(`[ChatBridge] Async processing error: ${errorMsg}`, error);
await this.chatApiClient.sendMessage(spaceName, threadName, {
text: `Sorry, I encountered an error: ${errorMsg}`,
});
} finally {
session.asyncProcessing = false;
}
}
/**
* Handles a CARD_CLICKED event: user clicked a button on a card.
* Used for tool approval/rejection flows.
*/
private async handleCardClicked(event: ChatEvent): Promise<ChatResponse> {
const action = event.action;
@@ -323,7 +471,6 @@ export class ChatBridgeHandler {
*/
private handleRemovedFromSpace(event: ChatEvent): ChatResponse {
logger.info(`[ChatBridge] Bot removed from space: ${event.space.name}`);
// Clean up any sessions for this space
return {};
}
}
@@ -21,8 +21,10 @@ import type {
ChatCardSection,
ChatWidget,
} from './types.js';
import type { Part } from '@a2a-js/sdk';
import {
type A2AResponse,
type A2AStreamEventData,
extractAllParts,
extractTextFromParts,
extractA2UIParts,
@@ -411,3 +413,95 @@ function renderToolApprovalCard(approval: ToolApprovalInfo): ChatCardV2 {
},
};
}
/**
* Extracts text and tool approval info from a single streaming event.
* Works with TaskStatusUpdateEvent, Task, and Message events.
*/
export function extractFromStreamEvent(event: A2AStreamEventData): {
text: string;
toolApprovals: ToolApprovalInfo[];
state?: string;
taskId?: string;
contextId?: string;
} {
const toolApprovals: ToolApprovalInfo[] = [];
const agentResponses: AgentResponseInfo[] = [];
const thoughts: Array<{ subject: string; description: string }> = [];
let state: string | undefined;
let taskId: string | undefined;
let contextId: string | undefined;
if (event.kind === 'status-update') {
state = event.status?.state;
taskId = event.taskId;
contextId = event.contextId;
// Extract parts from the status message
const parts: Part[] = event.status?.message?.parts ?? [];
const a2uiGroups = extractA2UIParts(parts);
for (const messages of a2uiGroups) {
parseA2UIMessages(messages, toolApprovals, agentResponses, thoughts);
}
// Also extract plain text
const plainText = extractTextFromParts(parts);
if (plainText) {
agentResponses.push({ text: plainText, status: '' });
}
} else if (event.kind === 'task') {
state = event.status?.state;
taskId = event.id;
contextId = event.contextId;
const parts = extractAllParts(event);
const a2uiGroups = extractA2UIParts(parts);
for (const messages of a2uiGroups) {
parseA2UIMessages(messages, toolApprovals, agentResponses, thoughts);
}
const plainText = extractTextFromParts(parts);
if (plainText) {
agentResponses.push({ text: plainText, status: '' });
}
} else if (event.kind === 'message') {
contextId = event.contextId;
taskId = event.taskId;
const parts: Part[] = event.parts ?? [];
const a2uiGroups = extractA2UIParts(parts);
for (const messages of a2uiGroups) {
parseA2UIMessages(messages, toolApprovals, agentResponses, thoughts);
}
const plainText = extractTextFromParts(parts);
if (plainText) {
agentResponses.push({ text: plainText, status: '' });
}
}
// Build text from the last non-empty agent response
let text = '';
for (let i = agentResponses.length - 1; i >= 0; i--) {
if (agentResponses[i].text) {
text = agentResponses[i].text;
break;
}
}
// Add thought summaries
if (thoughts.length > 0) {
const thoughtText = thoughts
.map((t) => `_${t.subject}_: ${t.description}`)
.join('\n');
text = text ? `${thoughtText}\n\n${text}` : thoughtText;
}
return {
text,
toolApprovals: deduplicateToolApprovals(toolApprovals),
state,
taskId,
contextId,
};
}
@@ -37,6 +37,8 @@ export interface SessionInfo {
pendingToolApproval?: PendingToolApproval;
/** When true, all tool calls are auto-approved. */
yoloMode?: boolean;
/** When true, an async task is currently processing. */
asyncProcessing?: boolean;
}
/** Serializable subset of SessionInfo for GCS persistence. */
@@ -138,4 +138,6 @@ export interface ChatBridgeConfig {
debug?: boolean;
/** GCS bucket name for session persistence (optional) */
gcsBucket?: string;
/** Path to service account key for Chat API auth (optional, uses ADC if not set) */
serviceAccountKeyPath?: string;
}
+1
View File
@@ -218,6 +218,7 @@ export async function createApp() {
projectNumber: process.env['CHAT_PROJECT_NUMBER'],
debug: process.env['CHAT_BRIDGE_DEBUG'] === 'true',
gcsBucket: process.env['GCS_BUCKET_NAME'],
serviceAccountKeyPath: process.env['CHAT_SA_KEY_PATH'],
});
expressApp.use(chatRoutes);
logger.info(