From 4f3ffc8959eda3ec4d8a9456f7fcd3a1ec2a2550 Mon Sep 17 00:00:00 2001 From: Adam Weidman Date: Sat, 14 Feb 2026 19:54:47 -0700 Subject: [PATCH] feat: async streaming chat bridge with Chat API push Replace blocking A2A calls with streaming: webhook returns immediate "Processing..." response, then streams results from A2A agent and pushes them to Google Chat via REST API. - Add ChatApiClient for proactive messaging via Chat REST API - Add sendMessageStream() to A2ABridgeClient for SSE streaming - Add extractFromStreamEvent() for parsing individual stream events - Refactor handler to fire-and-forget async processing - Fix isTerminal logic to use stream state instead of taskId presence - Add asyncProcessing guard to prevent overlapping requests - Add comprehensive README with deployment and setup guide --- packages/a2a-server/README.md | 289 ++++++++++++++- .../src/chat-bridge/a2a-bridge-client.ts | 44 ++- .../src/chat-bridge/chat-api-client.ts | 151 ++++++++ .../a2a-server/src/chat-bridge/handler.ts | 331 +++++++++++++----- .../src/chat-bridge/response-renderer.ts | 94 +++++ .../src/chat-bridge/session-store.ts | 2 + packages/a2a-server/src/chat-bridge/types.ts | 2 + packages/a2a-server/src/http/app.ts | 1 + 8 files changed, 819 insertions(+), 95 deletions(-) create mode 100644 packages/a2a-server/src/chat-bridge/chat-api-client.ts diff --git a/packages/a2a-server/README.md b/packages/a2a-server/README.md index bd6a2fac45..e04b6b3402 100644 --- a/packages/a2a-server/README.md +++ b/packages/a2a-server/README.md @@ -1,5 +1,290 @@ # Gemini CLI A2A Server -## All code in this package is experimental and under active development +> **Experimental** - This package is under active development. -This package contains the A2A server implementation for the Gemini CLI. +An [A2A (Agent-to-Agent)](https://google.github.io/A2A/) server that wraps the +Gemini CLI agent, enabling remote interaction via the A2A protocol. Includes a +Google Chat bridge for using the agent directly from Google Chat. + +## Architecture + +``` +Google Chat ──webhook──> Chat Bridge ──A2A──> A2A Server ──> Gemini CLI Agent + │ + └── Chat REST API (push responses back to Chat) +``` + +The server runs as a single Cloud Run container with two logical components: + +1. **A2A Server** - Standard A2A protocol endpoint (JSON-RPC + SSE streaming) + that wraps the Gemini CLI agent +2. **Chat Bridge** - Translates between Google Chat webhook events and A2A + protocol, pushing streamed results back via the Chat REST API + +The Chat Bridge responds immediately to webhooks with "Processing..." (avoiding +Google Chat's 30s timeout), then streams results from the A2A agent and pushes +them to Chat as they arrive. + +## Prerequisites + +- **GCP project** with the following APIs enabled: + - Cloud Run API + - Cloud Build API + - Artifact Registry API + - Google Chat API + - Cloud Storage API (for session persistence) +- **gcloud CLI** authenticated with your project +- **Node.js 20+** for local development +- **Gemini API key** from [Google AI Studio](https://aistudio.google.com/) + +## Environment Variables + +| Variable | Required | Description | +| ---------------------------- | -------- | ---------------------------------------------------------------------------------------------------- | +| `GEMINI_API_KEY` | Yes | Gemini API key for the agent | +| `CODER_AGENT_PORT` | No | Server port (default: `8080`) | +| `CODER_AGENT_HOST` | No | Bind host (default: `localhost`, set `0.0.0.0` for containers) | +| `CODER_AGENT_WORKSPACE_PATH` | No | Agent workspace directory (default: `/workspace`) | +| `GCS_BUCKET_NAME` | No | GCS bucket for task & session persistence | +| `GEMINI_YOLO_MODE` | No | Set `true` to auto-approve all tool calls | +| `CHAT_BRIDGE_A2A_URL` | No | A2A server URL for the Chat bridge (e.g. `http://localhost:8080`). Presence enables the Chat bridge. | +| `CHAT_PROJECT_NUMBER` | No | Google Chat project number for JWT verification | +| `CHAT_SA_KEY_PATH` | No | Path to service account key for Chat API auth (uses ADC if not set) | +| `CHAT_BRIDGE_DEBUG` | No | Set `true` for verbose bridge logging | +| `GIT_TERMINAL_PROMPT` | No | Set `0` to prevent git credential prompts in headless environments | + +## Local Development + +### Build + +From the repo root: + +```bash +npm install +npm run build +``` + +### Run + +```bash +export GEMINI_API_KEY="your-api-key" +export CODER_AGENT_PORT=8080 +export CHAT_BRIDGE_A2A_URL=http://localhost:8080 + +node packages/a2a-server/dist/src/http/server.js +``` + +### Test the A2A endpoint + +```bash +# Check the agent card +curl http://localhost:8080/.well-known/agent-card.json | jq . + +# Send a message (JSON-RPC) +curl -X POST http://localhost:8080 \ + -H "Content-Type: application/json" \ + -d '{ + "jsonrpc": "2.0", + "id": 1, + "method": "message/send", + "params": { + "message": { + "kind": "message", + "role": "user", + "messageId": "test-1", + "parts": [{"kind": "text", "text": "Hello, what can you do?"}] + }, + "configuration": {"blocking": true} + } + }' +``` + +### Test the Chat Bridge + +```bash +# Health check +curl http://localhost:8080/chat/health | jq . + +# Simulate a Google Chat MESSAGE event +curl -X POST http://localhost:8080/chat/webhook \ + -H "Content-Type: application/json" \ + -d '{ + "type": "MESSAGE", + "eventTime": "2026-01-01T00:00:00Z", + "message": { + "name": "spaces/test/messages/1", + "text": "Hello agent", + "thread": {"name": "spaces/test/threads/abc"}, + "sender": {"name": "users/1", "displayName": "Test User"}, + "space": {"name": "spaces/test", "type": "DM"} + }, + "space": {"name": "spaces/test", "type": "DM"}, + "user": {"name": "users/1", "displayName": "Test User"} + }' +``` + +## Cloud Run Deployment + +### 1. Create Artifact Registry repository + +```bash +export PROJECT_ID=your-project-id +export REGION=us-central1 + +gcloud artifacts repositories create gemini-a2a \ + --repository-format=docker \ + --location=$REGION \ + --project=$PROJECT_ID +``` + +### 2. Create GCS bucket (optional, for session persistence) + +```bash +gsutil mb -l $REGION gs://gemini-a2a-sessions-$PROJECT_ID +``` + +### 3. Build with Cloud Build + +```bash +gcloud builds submit \ + --config=packages/a2a-server/cloudbuild.yaml \ + --project=$PROJECT_ID +``` + +### 4. Deploy to Cloud Run + +```bash +export IMAGE=us-central1-docker.pkg.dev/$PROJECT_ID/gemini-a2a/a2a-server:latest + +gcloud run deploy gemini-a2a-server \ + --image=$IMAGE \ + --region=$REGION \ + --project=$PROJECT_ID \ + --platform=managed \ + --allow-unauthenticated \ + --memory=2Gi \ + --cpu=2 \ + --timeout=300 \ + --concurrency=1 \ + --max-instances=1 \ + --set-env-vars="GEMINI_YOLO_MODE=true,GCS_BUCKET_NAME=gemini-a2a-sessions-$PROJECT_ID,CHAT_BRIDGE_A2A_URL=http://localhost:8080" \ + --set-secrets="GEMINI_API_KEY=gemini-api-key:latest" +``` + +> **Important**: After initial deployment, always use `--update-env-vars` +> instead of `--set-env-vars` to avoid wiping existing environment variables. + +### 5. Update an existing deployment + +```bash +# Update env vars without replacing existing ones +gcloud run services update gemini-a2a-server \ + --region=$REGION \ + --project=$PROJECT_ID \ + --update-env-vars="NEW_VAR=value" + +# Deploy a new image +gcloud run services update gemini-a2a-server \ + --region=$REGION \ + --project=$PROJECT_ID \ + --image=$IMAGE +``` + +## Google Chat App Configuration + +### 1. Create a service account for Chat API + +The Chat bridge needs a service account with the Chat API scope to push messages +proactively. + +```bash +# Create service account +gcloud iam service-accounts create gemini-chat-bot \ + --display-name="Gemini Chat Bot" \ + --project=$PROJECT_ID + +# Download key (for local dev) +gcloud iam service-accounts keys create chat-sa-key.json \ + --iam-account=gemini-chat-bot@$PROJECT_ID.iam.gserviceaccount.com +``` + +On Cloud Run, use Application Default Credentials (ADC) instead of a key file. +Grant the Cloud Run service account the `chat.bot` scope by configuring it as +the Chat app's service account. + +### 2. Configure the Google Chat app + +1. Go to + [Google Cloud Console > APIs & Services > Google Chat API > Configuration](https://console.cloud.google.com/apis/api/chat.googleapis.com/hangouts-chat) +2. Set **App name** and **Description** +3. Under **Connection settings**, select **HTTP endpoint URL** +4. Set the URL to your Cloud Run service URL + `/chat/webhook`: + ``` + https://gemini-a2a-server-HASH-uc.a.run.app/chat/webhook + ``` +5. Under **Authentication Audience**, select **HTTP endpoint URL** +6. Under **Visibility**, choose who can use the app +7. Under **Permissions**, configure who can install it +8. Click **Save** + +### 3. Grant Cloud Run invoker permission + +If your Cloud Run service requires authentication (recommended): + +```bash +# Get the Chat service account +# It's usually chat@system.gserviceaccount.com + +gcloud run services add-iam-policy-binding gemini-a2a-server \ + --region=$REGION \ + --project=$PROJECT_ID \ + --member="serviceAccount:chat@system.gserviceaccount.com" \ + --role="roles/run.invoker" +``` + +## Chat Bridge Commands + +When messaging the bot in Google Chat: + +| Command | Description | +| ----------------------- | --------------------------------------------------- | +| `/reset` or `reset` | Clear the current session and start fresh | +| `/yolo` | Enable YOLO mode - auto-approve all tool calls | +| `/safe` | Disable YOLO mode - require approval for tool calls | +| `approve` / `yes` / `y` | Approve a pending tool call | +| `reject` / `no` / `n` | Reject a pending tool call | +| `always allow` | Approve and always allow this tool | + +## Troubleshooting + +### "Gemini CLI Agent is not responding" in Google Chat + +This usually means the agent hit Google Chat's 30-second webhook timeout before +the bridge could return "Processing...". Check Cloud Run logs: + +```bash +gcloud run services logs read gemini-a2a-server \ + --region=$REGION --project=$PROJECT_ID --limit=50 +``` + +### Tool approvals appearing in YOLO mode + +Ensure `GEMINI_YOLO_MODE=true` is set. If you used `--set-env-vars` during a +deployment, it may have wiped this variable. Use `--update-env-vars` instead. + +### Agent hangs on git operations + +The `GIT_TERMINAL_PROMPT=0` env var (set in the Dockerfile) prevents git from +prompting for credentials. If git operations require authentication, configure a +credential helper or use `gh auth` with a token. + +### Session state lost after restart + +Enable GCS persistence by setting `GCS_BUCKET_NAME`. Sessions are automatically +flushed to GCS every 30 seconds and restored on startup. + +### Chat responses appear as top-level messages instead of thread replies + +The Chat bridge includes `thread.name` in all responses. If replies still appear +at the top level, ensure the webhook event includes thread information. DM +conversations always thread correctly; spaces may need threading enabled. diff --git a/packages/a2a-server/src/chat-bridge/a2a-bridge-client.ts b/packages/a2a-server/src/chat-bridge/a2a-bridge-client.ts index 7b37f464c3..e08349d39c 100644 --- a/packages/a2a-server/src/chat-bridge/a2a-bridge-client.ts +++ b/packages/a2a-server/src/chat-bridge/a2a-bridge-client.ts @@ -11,7 +11,14 @@ * core/agents/remote-invocation.ts. */ -import type { Message, Task, Part, MessageSendParams } from '@a2a-js/sdk'; +import type { + Message, + Task, + Part, + MessageSendParams, + TaskStatusUpdateEvent, + TaskArtifactUpdateEvent, +} from '@a2a-js/sdk'; import { type Client, ClientFactory, @@ -25,6 +32,11 @@ import { logger } from '../utils/logger.js'; import { A2UI_EXTENSION_URI, A2UI_MIME_TYPE } from '../a2ui/a2ui-extension.js'; export type A2AResponse = Message | Task; +export type A2AStreamEventData = + | Message + | Task + | TaskStatusUpdateEvent + | TaskArtifactUpdateEvent; /** * Extracts contextId and taskId from an A2A response. @@ -210,6 +222,36 @@ export class A2ABridgeClient { return this.client.sendMessage(params); } + /** + * Sends a text message and returns a streaming async generator. + * Each yielded event is a Message, Task, TaskStatusUpdateEvent, + * or TaskArtifactUpdateEvent. + */ + sendMessageStream( + text: string, + options: { contextId?: string; taskId?: string }, + ): AsyncGenerator { + if (!this.client) { + throw new Error('A2A client not initialized. Call initialize() first.'); + } + + const params: MessageSendParams = { + message: { + kind: 'message', + role: 'user', + messageId: uuidv4(), + parts: [{ kind: 'text', text }], + contextId: options.contextId, + taskId: options.taskId, + metadata: { + extensions: [A2UI_EXTENSION_URI], + }, + }, + }; + + return this.client.sendMessageStream(params); + } + /** * Sends a tool confirmation action back to the A2A server. * The action is sent as a DataPart containing the A2UI action message. diff --git a/packages/a2a-server/src/chat-bridge/chat-api-client.ts b/packages/a2a-server/src/chat-bridge/chat-api-client.ts new file mode 100644 index 0000000000..2da6e5c389 --- /dev/null +++ b/packages/a2a-server/src/chat-bridge/chat-api-client.ts @@ -0,0 +1,151 @@ +/** + * @license + * Copyright 2026 Google LLC + * SPDX-License-Identifier: Apache-2.0 + */ + +/** + * Google Chat REST API client for sending proactive messages. + * Used to push agent responses back to Google Chat after the webhook + * has already returned an immediate acknowledgment. + */ + +import { GoogleAuth } from 'google-auth-library'; +import type { ChatCardV2 } from './types.js'; +import { logger } from '../utils/logger.js'; + +const CHAT_API_BASE = 'https://chat.googleapis.com/v1'; + +export interface ChatApiClientConfig { + /** Path to service account key JSON file. If not set, uses ADC. */ + serviceAccountKeyPath?: string; +} + +export class ChatApiClient { + private auth: GoogleAuth; + private initialized = false; + + constructor(config?: ChatApiClientConfig) { + this.auth = new GoogleAuth({ + keyFile: config?.serviceAccountKeyPath, + scopes: ['https://www.googleapis.com/auth/chat.bot'], + }); + } + + async initialize(): Promise { + if (this.initialized) return; + await this.auth.getClient(); + this.initialized = true; + logger.info('[ChatApiClient] Initialized with chat.bot scope'); + } + + /** + * Sends a new message to a Google Chat space in a specific thread. + */ + async sendMessage( + spaceName: string, + threadName: string, + options: { text?: string; cardsV2?: ChatCardV2[] }, + ): Promise { + try { + if (!this.initialized) await this.initialize(); + + const message: Record = {}; + if (options.text) message['text'] = options.text; + if (options.cardsV2) message['cardsV2'] = options.cardsV2; + message['thread'] = { name: threadName }; + + const url = + `${CHAT_API_BASE}/${spaceName}/messages` + + `?messageReplyOption=REPLY_MESSAGE_FALLBACK_TO_NEW_THREAD`; + + const client = await this.auth.getClient(); + const headers = await client.getRequestHeaders(); + + const response = await fetch(url, { + method: 'POST', + headers: { + ...headers, + 'Content-Type': 'application/json', + }, + body: JSON.stringify(message), + }); + + if (!response.ok) { + const body = await response.text(); + logger.error( + `[ChatApiClient] sendMessage failed: ${response.status} ${body}`, + ); + return undefined; + } + + const result: unknown = await response.json(); + let messageName: string | undefined; + if (typeof result === 'object' && result !== null && 'name' in result) { + const rec = result as Record; + if (typeof rec['name'] === 'string') { + messageName = rec['name']; + } + } + + logger.info( + `[ChatApiClient] Message sent to ${spaceName}: ${messageName ?? 'unknown'}`, + ); + return messageName; + } catch (error) { + const msg = error instanceof Error ? error.message : 'Unknown error'; + logger.error(`[ChatApiClient] sendMessage error: ${msg}`, error); + return undefined; + } + } + + /** + * Updates an existing message in Google Chat. + */ + async updateMessage( + messageName: string, + options: { text?: string; cardsV2?: ChatCardV2[] }, + ): Promise { + try { + if (!this.initialized) await this.initialize(); + + const message: Record = {}; + const updateMasks: string[] = []; + + if (options.text) { + message['text'] = options.text; + updateMasks.push('text'); + } + if (options.cardsV2) { + message['cardsV2'] = options.cardsV2; + updateMasks.push('cardsV2'); + } + + const url = `${CHAT_API_BASE}/${messageName}?updateMask=${updateMasks.join(',')}`; + + const client = await this.auth.getClient(); + const headers = await client.getRequestHeaders(); + + const response = await fetch(url, { + method: 'PATCH', + headers: { + ...headers, + 'Content-Type': 'application/json', + }, + body: JSON.stringify(message), + }); + + if (!response.ok) { + const body = await response.text(); + logger.error( + `[ChatApiClient] updateMessage failed: ${response.status} ${body}`, + ); + } else { + logger.info(`[ChatApiClient] Message updated: ${messageName}`); + } + } catch (error) { + const msg = error instanceof Error ? error.message : 'Unknown error'; + logger.error(`[ChatApiClient] updateMessage error: ${msg}`, error); + } + } +} diff --git a/packages/a2a-server/src/chat-bridge/handler.ts b/packages/a2a-server/src/chat-bridge/handler.ts index 483ed9fb2b..837c797391 100644 --- a/packages/a2a-server/src/chat-bridge/handler.ts +++ b/packages/a2a-server/src/chat-bridge/handler.ts @@ -6,36 +6,55 @@ /** * Google Chat webhook handler. - * Processes incoming Google Chat events, forwards them to the A2A server, - * and converts responses back to Google Chat format. + * Responds immediately with "Processing..." and streams results + * from the A2A server, pushing updates to Chat via the REST API. */ import type { ChatEvent, ChatResponse, ChatBridgeConfig } from './types.js'; +import type { SessionInfo } from './session-store.js'; import { SessionStore } from './session-store.js'; import { A2ABridgeClient, extractIdsFromResponse, } from './a2a-bridge-client.js'; -import { renderResponse, extractToolApprovals } from './response-renderer.js'; +import { ChatApiClient } from './chat-api-client.js'; +import { renderResponse, extractFromStreamEvent } from './response-renderer.js'; import { logger } from '../utils/logger.js'; +const TERMINAL_STATES = new Set([ + 'completed', + 'failed', + 'canceled', + 'rejected', +]); + export class ChatBridgeHandler { private sessionStore: SessionStore; private a2aClient: A2ABridgeClient; + private chatApiClient: ChatApiClient; private initialized = false; - constructor(private config: ChatBridgeConfig) { + constructor( + private config: ChatBridgeConfig, + chatApiClient?: ChatApiClient, + ) { this.sessionStore = new SessionStore(config.gcsBucket); this.a2aClient = new A2ABridgeClient(config.a2aServerUrl); + this.chatApiClient = + chatApiClient ?? + new ChatApiClient({ + serviceAccountKeyPath: config.serviceAccountKeyPath, + }); } /** - * Initializes the A2A client connection and restores persisted sessions. - * Must be called before handling events. + * Initializes the A2A client connection, Chat API client, + * and restores persisted sessions. Must be called before handling events. */ async initialize(): Promise { if (this.initialized) return; await this.a2aClient.initialize(); + await this.chatApiClient.initialize(); await this.sessionStore.restore(); this.initialized = true; logger.info( @@ -72,6 +91,8 @@ export class ChatBridgeHandler { /** * Handles a MESSAGE event: user sent a text message in Chat. + * Returns an immediate "Processing..." response and processes + * the A2A request asynchronously, pushing results via Chat API. */ private async handleMessage(event: ChatEvent): Promise { const message = event.message; @@ -87,7 +108,7 @@ export class ChatBridgeHandler { const threadName = message.thread.name; const spaceName = event.space.name; - // Handle slash commands + // Handle slash commands synchronously (fast, no A2A call) const trimmed = text.trim().toLowerCase(); if ( trimmed === '/reset' || @@ -120,104 +141,231 @@ export class ChatBridgeHandler { `[ChatBridge] MESSAGE from ${event.user.displayName}: "${text.substring(0, 100)}"`, ); - // Handle text-based tool approval responses - const lowerText = trimmed; - if ( - session.pendingToolApproval && - (lowerText === 'approve' || - lowerText === 'yes' || - lowerText === 'y' || - lowerText === 'reject' || - lowerText === 'no' || - lowerText === 'n' || - lowerText === 'always allow') - ) { - const approval = session.pendingToolApproval; - const isReject = - lowerText === 'reject' || lowerText === 'no' || lowerText === 'n'; - const isAlwaysAllow = lowerText === 'always allow'; - const outcome = isReject - ? 'cancel' - : isAlwaysAllow - ? 'proceed_always_tool' - : 'proceed_once'; - - logger.info( - `[ChatBridge] Text-based tool ${outcome}: callId=${approval.callId}, taskId=${approval.taskId}`, - ); - - session.pendingToolApproval = undefined; - - try { - const response = await this.a2aClient.sendToolConfirmation( - approval.callId, - outcome, - approval.taskId, - { contextId: session.contextId }, - ); - - const { contextId: newCtxId, taskId: newTaskId } = - extractIdsFromResponse(response); - if (newCtxId) session.contextId = newCtxId; - this.sessionStore.updateTaskId(threadName, newTaskId); - - const threadKey = message.thread.threadKey || threadName; - return renderResponse(response, threadKey, threadName); - } catch (error) { - const errorMsg = - error instanceof Error ? error.message : 'Unknown error'; - logger.error( - `[ChatBridge] Error sending tool confirmation: ${errorMsg}`, - error, - ); - return { text: `Error processing tool confirmation: ${errorMsg}` }; - } + // Handle text-based tool approval responses synchronously + // (sendToolConfirmation is fast — no need for async) + if (session.pendingToolApproval && this.isToolApprovalText(trimmed)) { + return this.handleToolApprovalText(event, session, trimmed); } + // Guard against overlapping async requests + if (session.asyncProcessing) { + return { + text: 'Still processing your previous request. Please wait...', + thread: { name: threadName }, + }; + } + + // Fire-and-forget async processing + this.processMessageAsync(event, session, text).catch((err) => { + const msg = err instanceof Error ? err.message : 'Unknown error'; + logger.error(`[ChatBridge] Async processing failed: ${msg}`, err); + }); + + // Return immediate acknowledgment + return { + text: '_Processing your request..._', + thread: { + threadKey: message.thread.threadKey || threadName, + name: threadName, + }, + }; + } + + /** + * Checks if text is a tool approval command. + */ + private isToolApprovalText(text: string): boolean { + return ( + text === 'approve' || + text === 'yes' || + text === 'y' || + text === 'reject' || + text === 'no' || + text === 'n' || + text === 'always allow' + ); + } + + /** + * Handles text-based tool approval responses synchronously. + */ + private async handleToolApprovalText( + event: ChatEvent, + session: SessionInfo, + trimmed: string, + ): Promise { + const message = event.message!; + const threadName = message.thread.name; + const approval = session.pendingToolApproval!; + + const isReject = + trimmed === 'reject' || trimmed === 'no' || trimmed === 'n'; + const isAlwaysAllow = trimmed === 'always allow'; + const outcome = isReject + ? 'cancel' + : isAlwaysAllow + ? 'proceed_always_tool' + : 'proceed_once'; + + logger.info( + `[ChatBridge] Text-based tool ${outcome}: callId=${approval.callId}, taskId=${approval.taskId}`, + ); + + session.pendingToolApproval = undefined; + try { - const response = await this.a2aClient.sendMessage(text, { - contextId: session.contextId, - taskId: session.taskId, - }); + const response = await this.a2aClient.sendToolConfirmation( + approval.callId, + outcome, + approval.taskId, + { contextId: session.contextId }, + ); - // Update session with new IDs from response - const { contextId, taskId } = extractIdsFromResponse(response); - if (contextId) { - session.contextId = contextId; - } - this.sessionStore.updateTaskId(threadName, taskId); + const { contextId: newCtxId, taskId: newTaskId } = + extractIdsFromResponse(response); + if (newCtxId) session.contextId = newCtxId; + this.sessionStore.updateTaskId(threadName, newTaskId); - // Check for pending tool approvals and store for text-based confirmation - const approvals = extractToolApprovals(response); - if (approvals.length > 0) { - const firstApproval = approvals[0]; - session.pendingToolApproval = { - callId: firstApproval.callId, - taskId: firstApproval.taskId, - toolName: firstApproval.displayName || firstApproval.name, - }; - logger.info( - `[ChatBridge] Pending tool approval: ${firstApproval.displayName || firstApproval.name} callId=${firstApproval.callId}`, - ); - } else { - session.pendingToolApproval = undefined; - } - - // Convert A2A response to Chat format const threadKey = message.thread.threadKey || threadName; return renderResponse(response, threadKey, threadName); } catch (error) { const errorMsg = error instanceof Error ? error.message : 'Unknown error'; - logger.error(`[ChatBridge] Error handling message: ${errorMsg}`, error); - return { - text: `Sorry, I encountered an error processing your request: ${errorMsg}`, - }; + logger.error( + `[ChatBridge] Error sending tool confirmation: ${errorMsg}`, + error, + ); + return { text: `Error processing tool confirmation: ${errorMsg}` }; + } + } + + /** + * Processes a message asynchronously using A2A streaming. + * Pushes results to Google Chat via the REST API. + */ + private async processMessageAsync( + event: ChatEvent, + session: SessionInfo, + text: string, + ): Promise { + const message = event.message!; + const threadName = message.thread.name; + const spaceName = event.space.name; + + session.asyncProcessing = true; + + try { + const stream = this.a2aClient.sendMessageStream(text, { + contextId: session.contextId, + taskId: session.taskId, + }); + + let lastText = ''; + let lastTaskId: string | undefined; + let lastContextId: string | undefined; + let lastState: string | undefined; + let sentFinalResponse = false; + + for await (const streamEvent of stream) { + const extracted = extractFromStreamEvent(streamEvent); + + if (extracted.taskId) lastTaskId = extracted.taskId; + if (extracted.contextId) lastContextId = extracted.contextId; + if (extracted.state) lastState = extracted.state; + + // Check for tool approvals needing user input + const pendingApprovals = extracted.toolApprovals.filter( + (a) => a.status === 'awaiting_approval', + ); + if (pendingApprovals.length > 0) { + const firstApproval = pendingApprovals[0]; + session.pendingToolApproval = { + callId: firstApproval.callId, + taskId: firstApproval.taskId, + toolName: firstApproval.displayName || firstApproval.name, + }; + + // Push tool approval card to Chat + const approvalResponse = renderResponse( + { + kind: 'task', + id: firstApproval.taskId, + contextId: lastContextId ?? session.contextId, + status: { + state: 'input-required', + timestamp: new Date().toISOString(), + message: + streamEvent.kind === 'status-update' + ? streamEvent.status?.message + : undefined, + }, + history: [], + artifacts: [], + }, + message.thread.threadKey || threadName, + threadName, + ); + + await this.chatApiClient.sendMessage(spaceName, threadName, { + text: approvalResponse.text, + cardsV2: approvalResponse.cardsV2, + }); + sentFinalResponse = true; + + logger.info( + `[ChatBridge] Pushed tool approval card: ${firstApproval.displayName || firstApproval.name}`, + ); + } + + // Track latest text content + if (extracted.text) { + lastText = extracted.text; + } + + // On terminal or input-required state, stop streaming + if ( + extracted.state && + (TERMINAL_STATES.has(extracted.state) || + extracted.state === 'input-required') + ) { + break; + } + } + + // Update session IDs + if (lastContextId) session.contextId = lastContextId; + // Clear taskId on terminal states so next message starts a fresh task + const isTerminal = lastState ? TERMINAL_STATES.has(lastState) : false; + this.sessionStore.updateTaskId( + threadName, + isTerminal ? undefined : lastTaskId, + ); + + // Push final text response if we haven't already pushed a tool approval + if (lastText && !sentFinalResponse) { + await this.chatApiClient.sendMessage(spaceName, threadName, { + text: lastText, + }); + logger.info( + `[ChatBridge] Pushed final response (${lastText.length} chars)`, + ); + } else if (!lastText && !sentFinalResponse) { + await this.chatApiClient.sendMessage(spaceName, threadName, { + text: '_Agent completed without generating a response._', + }); + } + } catch (error) { + const errorMsg = error instanceof Error ? error.message : 'Unknown error'; + logger.error(`[ChatBridge] Async processing error: ${errorMsg}`, error); + await this.chatApiClient.sendMessage(spaceName, threadName, { + text: `Sorry, I encountered an error: ${errorMsg}`, + }); + } finally { + session.asyncProcessing = false; } } /** * Handles a CARD_CLICKED event: user clicked a button on a card. - * Used for tool approval/rejection flows. */ private async handleCardClicked(event: ChatEvent): Promise { const action = event.action; @@ -323,7 +471,6 @@ export class ChatBridgeHandler { */ private handleRemovedFromSpace(event: ChatEvent): ChatResponse { logger.info(`[ChatBridge] Bot removed from space: ${event.space.name}`); - // Clean up any sessions for this space return {}; } } diff --git a/packages/a2a-server/src/chat-bridge/response-renderer.ts b/packages/a2a-server/src/chat-bridge/response-renderer.ts index 31148fa68f..474c25da1a 100644 --- a/packages/a2a-server/src/chat-bridge/response-renderer.ts +++ b/packages/a2a-server/src/chat-bridge/response-renderer.ts @@ -21,8 +21,10 @@ import type { ChatCardSection, ChatWidget, } from './types.js'; +import type { Part } from '@a2a-js/sdk'; import { type A2AResponse, + type A2AStreamEventData, extractAllParts, extractTextFromParts, extractA2UIParts, @@ -411,3 +413,95 @@ function renderToolApprovalCard(approval: ToolApprovalInfo): ChatCardV2 { }, }; } + +/** + * Extracts text and tool approval info from a single streaming event. + * Works with TaskStatusUpdateEvent, Task, and Message events. + */ +export function extractFromStreamEvent(event: A2AStreamEventData): { + text: string; + toolApprovals: ToolApprovalInfo[]; + state?: string; + taskId?: string; + contextId?: string; +} { + const toolApprovals: ToolApprovalInfo[] = []; + const agentResponses: AgentResponseInfo[] = []; + const thoughts: Array<{ subject: string; description: string }> = []; + let state: string | undefined; + let taskId: string | undefined; + let contextId: string | undefined; + + if (event.kind === 'status-update') { + state = event.status?.state; + taskId = event.taskId; + contextId = event.contextId; + + // Extract parts from the status message + const parts: Part[] = event.status?.message?.parts ?? []; + const a2uiGroups = extractA2UIParts(parts); + for (const messages of a2uiGroups) { + parseA2UIMessages(messages, toolApprovals, agentResponses, thoughts); + } + + // Also extract plain text + const plainText = extractTextFromParts(parts); + if (plainText) { + agentResponses.push({ text: plainText, status: '' }); + } + } else if (event.kind === 'task') { + state = event.status?.state; + taskId = event.id; + contextId = event.contextId; + + const parts = extractAllParts(event); + const a2uiGroups = extractA2UIParts(parts); + for (const messages of a2uiGroups) { + parseA2UIMessages(messages, toolApprovals, agentResponses, thoughts); + } + + const plainText = extractTextFromParts(parts); + if (plainText) { + agentResponses.push({ text: plainText, status: '' }); + } + } else if (event.kind === 'message') { + contextId = event.contextId; + taskId = event.taskId; + + const parts: Part[] = event.parts ?? []; + const a2uiGroups = extractA2UIParts(parts); + for (const messages of a2uiGroups) { + parseA2UIMessages(messages, toolApprovals, agentResponses, thoughts); + } + + const plainText = extractTextFromParts(parts); + if (plainText) { + agentResponses.push({ text: plainText, status: '' }); + } + } + + // Build text from the last non-empty agent response + let text = ''; + for (let i = agentResponses.length - 1; i >= 0; i--) { + if (agentResponses[i].text) { + text = agentResponses[i].text; + break; + } + } + + // Add thought summaries + if (thoughts.length > 0) { + const thoughtText = thoughts + .map((t) => `_${t.subject}_: ${t.description}`) + .join('\n'); + text = text ? `${thoughtText}\n\n${text}` : thoughtText; + } + + return { + text, + toolApprovals: deduplicateToolApprovals(toolApprovals), + state, + taskId, + contextId, + }; +} diff --git a/packages/a2a-server/src/chat-bridge/session-store.ts b/packages/a2a-server/src/chat-bridge/session-store.ts index ec6dae46df..8142973ecc 100644 --- a/packages/a2a-server/src/chat-bridge/session-store.ts +++ b/packages/a2a-server/src/chat-bridge/session-store.ts @@ -37,6 +37,8 @@ export interface SessionInfo { pendingToolApproval?: PendingToolApproval; /** When true, all tool calls are auto-approved. */ yoloMode?: boolean; + /** When true, an async task is currently processing. */ + asyncProcessing?: boolean; } /** Serializable subset of SessionInfo for GCS persistence. */ diff --git a/packages/a2a-server/src/chat-bridge/types.ts b/packages/a2a-server/src/chat-bridge/types.ts index adfeaa85ad..22ac34728d 100644 --- a/packages/a2a-server/src/chat-bridge/types.ts +++ b/packages/a2a-server/src/chat-bridge/types.ts @@ -138,4 +138,6 @@ export interface ChatBridgeConfig { debug?: boolean; /** GCS bucket name for session persistence (optional) */ gcsBucket?: string; + /** Path to service account key for Chat API auth (optional, uses ADC if not set) */ + serviceAccountKeyPath?: string; } diff --git a/packages/a2a-server/src/http/app.ts b/packages/a2a-server/src/http/app.ts index 1af6da3835..feb5037b22 100644 --- a/packages/a2a-server/src/http/app.ts +++ b/packages/a2a-server/src/http/app.ts @@ -218,6 +218,7 @@ export async function createApp() { projectNumber: process.env['CHAT_PROJECT_NUMBER'], debug: process.env['CHAT_BRIDGE_DEBUG'] === 'true', gcsBucket: process.env['GCS_BUCKET_NAME'], + serviceAccountKeyPath: process.env['CHAT_SA_KEY_PATH'], }); expressApp.use(chatRoutes); logger.info(