diff --git a/packages/a2a-server/src/chat-bridge/handler.ts b/packages/a2a-server/src/chat-bridge/handler.ts index 76fd12dec5..119f7d65a5 100644 --- a/packages/a2a-server/src/chat-bridge/handler.ts +++ b/packages/a2a-server/src/chat-bridge/handler.ts @@ -15,6 +15,7 @@ import type { SessionInfo } from './session-store.js'; import { SessionStore } from './session-store.js'; import { A2ABridgeClient, + type A2AStreamEventData, extractIdsFromResponse, extractAllParts, extractTextFromParts, @@ -95,10 +96,30 @@ export class ChatBridgeHandler { } } + /** + * Pushes a text message via Chat API (properly threaded) and returns + * an empty webhook response. Add-ons createMessageAction ignores + * thread info and always creates a top-level message in Spaces, + * so ALL user-visible messages must go through the Chat API. + */ + private pushAndReturn( + spaceName: string, + threadName: string, + text: string, + ): ChatResponse { + this.chatApiClient + .sendMessage(spaceName, threadName, { text }) + .catch((err) => { + const msg = err instanceof Error ? err.message : 'Unknown error'; + logger.warn(`[ChatBridge] Failed to push message: ${msg}`); + }); + return {}; + } + /** * Handles a MESSAGE event: user sent a text message in Chat. - * Returns an immediate "Processing..." response and processes - * the A2A request asynchronously, pushing results via Chat API. + * All responses are pushed via Chat API for proper threading in Spaces. + * The webhook always returns an empty response. */ private async handleMessage(event: ChatEvent): Promise { const message = event.message; @@ -117,7 +138,7 @@ export class ChatBridgeHandler { const threadName = message.thread.name; const spaceName = event.space.name; - // Handle slash commands synchronously (fast, no A2A call) + // Handle slash commands — push response via Chat API for threading const trimmed = text.trim().toLowerCase(); if ( trimmed === '/reset' || @@ -127,7 +148,11 @@ export class ChatBridgeHandler { ) { this.sessionStore.remove(threadName); logger.info(`[ChatBridge] Session cleared for thread ${threadName}`); - return { text: 'Session cleared. Send a new message to start fresh.' }; + return this.pushAndReturn( + spaceName, + threadName, + 'Session cleared. Send a new message to start fresh.', + ); } const session = this.sessionStore.getOrCreate(threadName, spaceName); @@ -135,33 +160,39 @@ export class ChatBridgeHandler { if (trimmed === '/yolo') { session.yoloMode = true; logger.info(`[ChatBridge] YOLO mode enabled for thread ${threadName}`); - return { - text: 'YOLO mode enabled. All tool calls will be auto-approved.', - }; + return this.pushAndReturn( + spaceName, + threadName, + 'YOLO mode enabled. All tool calls will be auto-approved.', + ); } if (trimmed === '/safe') { session.yoloMode = false; logger.info(`[ChatBridge] YOLO mode disabled for thread ${threadName}`); - return { text: 'Safe mode enabled. Tool calls will require approval.' }; + return this.pushAndReturn( + spaceName, + threadName, + 'Safe mode enabled. Tool calls will require approval.', + ); } logger.info( `[ChatBridge] MESSAGE from ${event.user.displayName}: "${text.substring(0, 100)}"`, ); - // Handle text-based tool approval responses synchronously - // (sendToolConfirmation is fast — no need for async) + // Handle text-based tool approval responses if (session.pendingToolApproval && this.isToolApprovalText(trimmed)) { return this.handleToolApprovalText(event, session, trimmed); } // Guard against overlapping async requests if (session.asyncProcessing) { - return { - text: 'Still processing your previous request. Please wait...', - thread: { name: threadName }, - }; + return this.pushAndReturn( + spaceName, + threadName, + 'Still processing your previous request. Please wait...', + ); } // Fire-and-forget async processing @@ -170,14 +201,12 @@ export class ChatBridgeHandler { logger.error(`[ChatBridge] Async processing failed: ${msg}`, err); }); - // Return immediate acknowledgment - return { - text: '_Processing your request..._', - thread: { - threadKey: message.thread.threadKey || threadName, - name: threadName, - }, - }; + // Push "Processing..." via Chat API for proper threading + return this.pushAndReturn( + spaceName, + threadName, + '_Processing your request..._', + ); } /** @@ -238,13 +267,7 @@ export class ChatBridgeHandler { const ackText = isReject ? '_Tool rejected._' : '_Tool approved, processing..._'; - return { - text: ackText, - thread: { - threadKey: message.thread.threadKey || threadName, - name: threadName, - }, - }; + return this.pushAndReturn(event.space.name, threadName, ackText); } /** @@ -368,10 +391,16 @@ export class ChatBridgeHandler { session.asyncProcessing = true; try { - const stream = this.a2aClient.sendMessageStream(text, { - contextId: session.contextId, - taskId: session.taskId, - }); + // Retry streaming if the A2A server returns 500 (no available instance). + // With concurrency=1, this happens when another request is in-flight. + const stream = await this.retryStream( + () => + this.a2aClient.sendMessageStream(text, { + contextId: session.contextId, + taskId: session.taskId, + }), + session, + ); let lastText = ''; let lastTaskId: string | undefined; @@ -517,6 +546,54 @@ export class ChatBridgeHandler { } } + /** + * Retries creating a stream when the A2A server returns 500. + * Cloud Run returns 500 "no available instance" when concurrency is + * exhausted. We retry with exponential backoff up to 3 times. + */ + private async retryStream( + createStream: () => AsyncGenerator, + session: SessionInfo, + ): Promise> { + const MAX_RETRIES = 3; + const BASE_DELAY_MS = 5000; + + for (let attempt = 0; attempt <= MAX_RETRIES; attempt++) { + if (session.cancelled) return createStream(); // will be caught by caller + try { + const stream = createStream(); + // Try to get the first value to verify the stream connects + const iter = stream[Symbol.asyncIterator](); + const first = await iter.next(); + + // Re-wrap into an async generator that yields the first value + // then delegates to the rest of the iterator + async function* replayStream(): AsyncGenerator< + A2AStreamEventData, + void, + undefined + > { + if (!first.done) { + yield first.value; + yield* { [Symbol.asyncIterator]: () => iter }; + } + } + return replayStream(); + } catch (error) { + const msg = error instanceof Error ? error.message : ''; + const isRetryable = msg.includes('500') || msg.includes('503'); + if (!isRetryable || attempt === MAX_RETRIES) throw error; + const delay = BASE_DELAY_MS * Math.pow(2, attempt); + logger.warn( + `[ChatBridge] A2A server unavailable, retrying in ${delay}ms (attempt ${attempt + 1}/${MAX_RETRIES})`, + ); + await new Promise((r) => setTimeout(r, delay)); + } + } + // Should not reach here, but just in case + return createStream(); + } + /** * Auto-approves tool calls in YOLO mode. * Sends all pending approvals in a single batch message to avoid hanging diff --git a/packages/a2a-server/src/chat-bridge/routes.ts b/packages/a2a-server/src/chat-bridge/routes.ts index b15cfa3e97..d5609dc8b4 100644 --- a/packages/a2a-server/src/chat-bridge/routes.ts +++ b/packages/a2a-server/src/chat-bridge/routes.ts @@ -325,12 +325,19 @@ export function createChatBridgeRoutes(config: ChatBridgeConfig): Router { } if (isAddOnsFormat) { - // Wrap in Workspace Add-ons response format - const addOnsResponse = wrapAddOnsResponse(response); - logger.info( - `[ChatBridge] Add-ons response: ${JSON.stringify(addOnsResponse).substring(0, 200)}`, - ); - res.json(addOnsResponse); + // If the handler returned an empty response (messages pushed via + // Chat API), return a bare {} so Add-ons doesn't try to create + // an empty message — which causes Google Chat to retry the webhook. + if (!response.text && !response.cardsV2 && !response.actionResponse) { + logger.info(`[ChatBridge] Add-ons response: {} (empty ack)`); + res.json({}); + } else { + const addOnsResponse = wrapAddOnsResponse(response); + logger.info( + `[ChatBridge] Add-ons response: ${JSON.stringify(addOnsResponse).substring(0, 200)}`, + ); + res.json(addOnsResponse); + } } else { res.json(response); } diff --git a/packages/a2a-server/src/config/config.ts b/packages/a2a-server/src/config/config.ts index 48daffbe42..48d1006463 100644 --- a/packages/a2a-server/src/config/config.ts +++ b/packages/a2a-server/src/config/config.ts @@ -59,7 +59,7 @@ export async function loadConfig( const configParams: ConfigParameters = { sessionId: taskId, - model: PREVIEW_GEMINI_MODEL, + model: process.env['GEMINI_MODEL'] || PREVIEW_GEMINI_MODEL, embeddingModel: DEFAULT_GEMINI_EMBEDDING_MODEL, sandbox: undefined, // Sandbox might not be relevant for a server-side agent targetDir: workspaceDir, // Or a specific directory the agent operates on