diff --git a/packages/a2a-server/README.md b/packages/a2a-server/README.md index f511420629..54d430a89b 100644 --- a/packages/a2a-server/README.md +++ b/packages/a2a-server/README.md @@ -216,7 +216,7 @@ gcloud run deploy gemini-chat-bridge \ --cpu=1 \ --timeout=60 \ --concurrency=80 \ - --max-instances=5 \ + --max-instances=1 \ --set-env-vars="A2A_SERVER_URL=$A2A_URL,GCS_BUCKET_NAME=gemini-a2a-sessions-$PROJECT_ID" ``` @@ -342,3 +342,15 @@ flushed to GCS every 30 seconds and restored on startup. The Chat bridge includes `thread.name` in all responses. If replies still appear at the top level, ensure the webhook event includes thread information. DM conversations always thread correctly; spaces may need threading enabled. + +## Known Limitations + +- **Google Chat 4096 character limit**: Long agent responses may be truncated by + Google Chat. The bridge does not currently split messages into chunks. +- **Single bridge instance**: The bridge uses `max-instances=1` so that the + in-memory async processing guard works correctly. This means no redundancy + during deploys (brief downtime during revision rollover). +- **Tool confirmation in streaming mode**: When the A2A server has + `GEMINI_YOLO_MODE=false`, tool confirmations via streaming may not return text + due to an SDK-level issue (executor aborts on SSE disconnect). Server YOLO + mode works correctly. diff --git a/packages/a2a-server/src/chat-bridge/a2a-bridge-client.ts b/packages/a2a-server/src/chat-bridge/a2a-bridge-client.ts index 71a3e51b44..80af71657f 100644 --- a/packages/a2a-server/src/chat-bridge/a2a-bridge-client.ts +++ b/packages/a2a-server/src/chat-bridge/a2a-bridge-client.ts @@ -332,6 +332,58 @@ export class A2ABridgeClient { return this.client.sendMessage(params); } + /** + * Sends a tool confirmation via streaming (SSE) so the caller can + * follow the full task lifecycle after approval. + */ + sendToolConfirmationStream( + callId: string, + outcome: string, + taskId: string, + options: { contextId?: string }, + ): AsyncGenerator { + if (!this.client) { + throw new Error('A2A client not initialized. Call initialize() first.'); + } + + const actionPart: Part = { + kind: 'data', + // eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion + data: [ + { + version: 'v0.10', + action: { + name: 'tool_confirmation', + surfaceId: `tool_approval_${taskId}_${callId}`, + sourceComponentId: + outcome === 'cancel' ? 'reject_button' : 'approve_button', + timestamp: new Date().toISOString(), + context: { callId, outcome, taskId }, + }, + }, + ] as unknown as Record, + metadata: { + mimeType: A2UI_MIME_TYPE, + }, + } as Part; + + const params: MessageSendParams = { + message: { + kind: 'message', + role: 'user', + messageId: uuidv4(), + parts: [actionPart], + contextId: options.contextId, + taskId, + metadata: { + extensions: [A2UI_EXTENSION_URI], + }, + }, + }; + + return this.client.sendMessageStream(params); + } + /** * Sends multiple tool confirmations in a single A2A message. * Needed when the agent requests multiple tool approvals at once — @@ -395,4 +447,64 @@ export class A2ABridgeClient { return this.client.sendMessage(params); } + + /** + * Sends batch tool confirmations via streaming (SSE) so the caller can + * follow the full task lifecycle after approval. Returns an async generator + * that yields events until the task reaches a terminal state. + */ + sendBatchToolConfirmationsStream( + approvals: Array<{ callId: string; outcome: string; taskId: string }>, + options: { contextId?: string }, + ): AsyncGenerator { + if (!this.client) { + throw new Error('A2A client not initialized. Call initialize() first.'); + } + + const parts: Part[] = approvals.map( + (approval) => + ({ + kind: 'data', + // eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion + data: [ + { + version: 'v0.10', + action: { + name: 'tool_confirmation', + surfaceId: `tool_approval_${approval.taskId}_${approval.callId}`, + sourceComponentId: + approval.outcome === 'cancel' + ? 'reject_button' + : 'approve_button', + timestamp: new Date().toISOString(), + context: { + callId: approval.callId, + outcome: approval.outcome, + taskId: approval.taskId, + }, + }, + }, + ] as unknown as Record, + metadata: { + mimeType: A2UI_MIME_TYPE, + }, + }) as Part, + ); + + const params: MessageSendParams = { + message: { + kind: 'message', + role: 'user', + messageId: uuidv4(), + parts, + contextId: options.contextId, + taskId: approvals[0]?.taskId, + metadata: { + extensions: [A2UI_EXTENSION_URI], + }, + }, + }; + + return this.client.sendMessageStream(params); + } } diff --git a/packages/a2a-server/src/chat-bridge/chat-api-client.ts b/packages/a2a-server/src/chat-bridge/chat-api-client.ts index 2da6e5c389..31f1f1db60 100644 --- a/packages/a2a-server/src/chat-bridge/chat-api-client.ts +++ b/packages/a2a-server/src/chat-bridge/chat-api-client.ts @@ -15,6 +15,8 @@ import type { ChatCardV2 } from './types.js'; import { logger } from '../utils/logger.js'; const CHAT_API_BASE = 'https://chat.googleapis.com/v1'; +/** Google Chat max text length. Leave margin for formatting overhead. */ +const MAX_TEXT_LENGTH = 4000; export interface ChatApiClientConfig { /** Path to service account key JSON file. If not set, uses ADC. */ @@ -41,20 +43,38 @@ export class ChatApiClient { /** * Sends a new message to a Google Chat space in a specific thread. + * Automatically splits text longer than 4000 chars into multiple messages. */ async sendMessage( spaceName: string, threadName: string, options: { text?: string; cardsV2?: ChatCardV2[] }, ): Promise { - try { - if (!this.initialized) await this.initialize(); + if (!this.initialized) await this.initialize(); + const chunks = options.text ? splitText(options.text) : ['']; + + // First chunk gets the cards (if any). Subsequent chunks are text-only. + let lastMessageName: string | undefined; + for (let i = 0; i < chunks.length; i++) { const message: Record = {}; - if (options.text) message['text'] = options.text; - if (options.cardsV2) message['cardsV2'] = options.cardsV2; + if (chunks[i]) message['text'] = chunks[i]; + if (i === 0 && options.cardsV2) message['cardsV2'] = options.cardsV2; message['thread'] = { name: threadName }; + const name = await this.postMessage(spaceName, message); + if (name) lastMessageName = name; + } + + return lastMessageName; + } + + /** Posts a single message to the Chat API. */ + private async postMessage( + spaceName: string, + message: Record, + ): Promise { + try { const url = `${CHAT_API_BASE}/${spaceName}/messages` + `?messageReplyOption=REPLY_MESSAGE_FALLBACK_TO_NEW_THREAD`; @@ -149,3 +169,44 @@ export class ChatApiClient { } } } + +/** + * Splits text into chunks that fit within Google Chat's character limit. + * Splits on paragraph boundaries (double newline) first, then single + * newlines, then hard-splits as a last resort. + */ +function splitText(text: string): string[] { + if (text.length <= MAX_TEXT_LENGTH) return [text]; + + const chunks: string[] = []; + let remaining = text; + + while (remaining.length > MAX_TEXT_LENGTH) { + let splitAt = -1; + + // Try splitting at a paragraph boundary + const paraIdx = remaining.lastIndexOf('\n\n', MAX_TEXT_LENGTH); + if (paraIdx > MAX_TEXT_LENGTH * 0.3) { + splitAt = paraIdx + 2; // include the double newline in the first chunk + } + + // Fall back to single newline + if (splitAt < 0) { + const lineIdx = remaining.lastIndexOf('\n', MAX_TEXT_LENGTH); + if (lineIdx > MAX_TEXT_LENGTH * 0.3) { + splitAt = lineIdx + 1; + } + } + + // Hard split as last resort + if (splitAt < 0) { + splitAt = MAX_TEXT_LENGTH; + } + + chunks.push(remaining.substring(0, splitAt)); + remaining = remaining.substring(splitAt); + } + + if (remaining) chunks.push(remaining); + return chunks; +} diff --git a/packages/a2a-server/src/chat-bridge/handler.ts b/packages/a2a-server/src/chat-bridge/handler.ts index 119f7d65a5..6a19dfb46a 100644 --- a/packages/a2a-server/src/chat-bridge/handler.ts +++ b/packages/a2a-server/src/chat-bridge/handler.ts @@ -16,16 +16,9 @@ import { SessionStore } from './session-store.js'; import { A2ABridgeClient, type A2AStreamEventData, - extractIdsFromResponse, - extractAllParts, - extractTextFromParts, } from './a2a-bridge-client.js'; import { ChatApiClient } from './chat-api-client.js'; -import { - renderResponse, - extractFromStreamEvent, - extractToolApprovals, -} from './response-renderer.js'; +import { renderResponse, extractFromStreamEvent } from './response-renderer.js'; import { logger } from '../utils/logger.js'; const TERMINAL_STATES = new Set([ @@ -40,6 +33,8 @@ export class ChatBridgeHandler { private a2aClient: A2ABridgeClient; private chatApiClient: ChatApiClient; private initialized = false; + /** Full webhook URL for card button actions (HTTP Add-ons need a URL, not a function name). */ + private webhookUrl: string | undefined; constructor( private config: ChatBridgeConfig, @@ -52,6 +47,12 @@ export class ChatBridgeHandler { new ChatApiClient({ serviceAccountKeyPath: config.serviceAccountKeyPath, }); + // For HTTP Add-ons, card button action.function must be a full HTTPS URL. + // Set CHAT_WEBHOOK_URL env var to the bridge's public webhook endpoint. + this.webhookUrl = process.env['CHAT_WEBHOOK_URL'] || undefined; + if (this.webhookUrl) { + logger.info(`[ChatBridge] Button action URL: ${this.webhookUrl}`); + } } /** @@ -288,76 +289,104 @@ export class ChatBridgeHandler { session.asyncProcessing = true; try { - const response = await this.a2aClient.sendToolConfirmation( + // Stream the tool confirmation to collect text as it arrives + const stream = this.a2aClient.sendToolConfirmationStream( approval.callId, outcome, approval.taskId, { contextId: session.contextId }, ); + let lastText = ''; + let lastTaskId: string | undefined; + let lastContextId: string | undefined; + let lastState: string | undefined; + let sentResponse = false; + + for await (const streamEvent of stream) { + if (session.cancelled) break; + + const extracted = extractFromStreamEvent(streamEvent); + if (extracted.taskId) lastTaskId = extracted.taskId; + if (extracted.contextId) lastContextId = extracted.contextId; + if (extracted.state) lastState = extracted.state; + if (extracted.text) lastText = extracted.text; + + // Check for new tool approvals + const pending = extracted.toolApprovals.filter( + (a) => a.status === 'awaiting_approval', + ); + if (pending.length > 0) { + if (session.yoloMode) { + // YOLO: auto-approve via streaming + const autoResult = await this.autoApproveTools( + session, + pending, + lastContextId, + ); + if (autoResult.lastContextId) + lastContextId = autoResult.lastContextId; + if (autoResult.lastTaskId) lastTaskId = autoResult.lastTaskId; + if (autoResult.lastState) lastState = autoResult.lastState; + if (autoResult.text) lastText = autoResult.text; + } else { + // Non-YOLO: push approval card + session.pendingToolApproval = { + callId: pending[0].callId, + taskId: pending[0].taskId, + toolName: pending[0].displayName || pending[0].name, + }; + // Build a minimal task response for the card renderer + const cardResponse = renderResponse( + { + kind: 'task', + id: pending[0].taskId, + contextId: lastContextId ?? session.contextId, + status: { + state: 'input-required', + timestamp: new Date().toISOString(), + message: + streamEvent.kind === 'status-update' + ? streamEvent.status?.message + : undefined, + }, + history: [], + artifacts: [], + }, + message.thread.threadKey || threadName, + threadName, + this.webhookUrl, + ); + await this.chatApiClient.sendMessage(spaceName, threadName, { + text: cardResponse.text, + cardsV2: cardResponse.cardsV2, + }); + sentResponse = true; + logger.info( + `[ChatBridge] Pushed approval card after confirmation: ${pending[0].displayName || pending[0].name}`, + ); + } + break; + } + } + if (session.cancelled) return; - const { contextId: newCtxId, taskId: newTaskId } = - extractIdsFromResponse(response); - if (newCtxId) session.contextId = newCtxId; - this.sessionStore.updateTaskId(threadName, newTaskId); - - // Check for new pending approvals in the response - const newApprovals = extractToolApprovals(response).filter( - (a) => a.status === 'awaiting_approval', + // Update session IDs + if (lastContextId) session.contextId = lastContextId; + const isTerminal = lastState ? TERMINAL_STATES.has(lastState) : false; + this.sessionStore.updateTaskId( + threadName, + isTerminal ? undefined : lastTaskId, ); - if (session.yoloMode && newApprovals.length > 0) { - // YOLO: auto-approve any new tools - const autoResult = await this.autoApproveTools( - session, - newApprovals, - session.contextId, - ); - if (autoResult.lastContextId) - session.contextId = autoResult.lastContextId; - if (autoResult.lastTaskId !== undefined) { - const isTerminal = autoResult.lastState - ? TERMINAL_STATES.has(autoResult.lastState) - : false; - this.sessionStore.updateTaskId( - threadName, - isTerminal ? undefined : autoResult.lastTaskId, - ); - } - if (autoResult.text) { - await this.chatApiClient.sendMessage(spaceName, threadName, { - text: autoResult.text, - }); - } - } else if (newApprovals.length > 0) { - // Non-YOLO: push new approval card - session.pendingToolApproval = { - callId: newApprovals[0].callId, - taskId: newApprovals[0].taskId, - toolName: newApprovals[0].displayName || newApprovals[0].name, - }; - const rendered = renderResponse( - response, - message.thread.threadKey || threadName, - threadName, - ); + // Push final text if we haven't already pushed a card + if (lastText && !sentResponse) { await this.chatApiClient.sendMessage(spaceName, threadName, { - text: rendered.text, - cardsV2: rendered.cardsV2, + text: lastText, }); logger.info( - `[ChatBridge] Pushed new approval card after confirmation: ${newApprovals[0].displayName || newApprovals[0].name}`, - ); - } else { - // No more approvals — push the agent's response - const rendered = renderResponse(response); - const responseText = rendered.text || '_Agent completed._'; - await this.chatApiClient.sendMessage(spaceName, threadName, { - text: responseText, - }); - logger.info( - `[ChatBridge] Pushed post-approval response (${responseText.length} chars)`, + `[ChatBridge] Pushed post-approval response (${lastText.length} chars): "${lastText.substring(0, 200)}"`, ); } } catch (error) { @@ -408,6 +437,19 @@ export class ChatBridgeHandler { let lastState: string | undefined; let sentFinalResponse = false; + let eventCount = 0; + // Track the latest pending approvals across events — only act on them + // when the server signals input-required (meaning it actually needs input). + // In server YOLO mode, tools are auto-approved so the stream continues + // past the brief 'awaiting_approval' status without hitting input-required. + let latestPendingApprovals: Array<{ + callId: string; + taskId: string; + name: string; + displayName: string; + }> = []; + let approvalStatusMessage: unknown; + for await (const streamEvent of stream) { // Check if session was cancelled (e.g. by /reset) if (session.cancelled) { @@ -417,42 +459,79 @@ export class ChatBridgeHandler { break; } + eventCount++; const extracted = extractFromStreamEvent(streamEvent); if (extracted.taskId) lastTaskId = extracted.taskId; if (extracted.contextId) lastContextId = extracted.contextId; if (extracted.state) lastState = extracted.state; - // Check for tool approvals needing user input - const pendingApprovals = extracted.toolApprovals.filter( + // Log each event for debugging + logger.info( + `[ChatBridge] Stream event #${eventCount}: kind=${streamEvent.kind}, ` + + `state=${extracted.state ?? 'n/a'}, text=${extracted.text.length} chars, ` + + `approvals=${extracted.toolApprovals.filter((a) => a.status === 'awaiting_approval').length}`, + ); + + // Track latest text content + if (extracted.text) { + lastText = extracted.text; + } + + // Track tool approvals — always update with latest state so + // stale approvals are cleared when the server auto-approves. + const pending = extracted.toolApprovals.filter( (a) => a.status === 'awaiting_approval', ); - if (pendingApprovals.length > 0) { - // YOLO mode: auto-approve all tools without user interaction - if (session.yoloMode) { - const autoApproved = await this.autoApproveTools( - session, - pendingApprovals, - lastContextId, - ); - if (autoApproved.lastContextId) - lastContextId = autoApproved.lastContextId; - if (autoApproved.lastTaskId) lastTaskId = autoApproved.lastTaskId; - if (autoApproved.lastState) lastState = autoApproved.lastState; - if (autoApproved.text) lastText = autoApproved.text; - // Auto-approval loop handles everything; break out of stream - break; - } + latestPendingApprovals = pending; + if (pending.length > 0) { + approvalStatusMessage = + streamEvent.kind === 'status-update' + ? streamEvent.status?.message + : undefined; + } + // On terminal or input-required state, stop streaming. + // input-required means the server is asking for user action + // (tool approval or follow-up message). + if ( + extracted.state && + (TERMINAL_STATES.has(extracted.state) || + extracted.state === 'input-required') + ) { + break; + } + } + + logger.info( + `[ChatBridge] Stream complete: ${eventCount} events, ` + + `state=${lastState ?? 'none'}, text=${lastText.length} chars, ` + + `pendingApprovals=${latestPendingApprovals.length}`, + ); + + // Handle pending approvals (only relevant when server sent input-required) + if (latestPendingApprovals.length > 0 && lastState === 'input-required') { + if (session.yoloMode) { + // Bridge YOLO mode: auto-approve all tools + const autoApproved = await this.autoApproveTools( + session, + latestPendingApprovals, + lastContextId, + ); + if (autoApproved.lastContextId) + lastContextId = autoApproved.lastContextId; + if (autoApproved.lastTaskId) lastTaskId = autoApproved.lastTaskId; + if (autoApproved.lastState) lastState = autoApproved.lastState; + if (autoApproved.text) lastText = autoApproved.text; + } else { // Non-YOLO: push approval card and wait for user input - const firstApproval = pendingApprovals[0]; + const firstApproval = latestPendingApprovals[0]; session.pendingToolApproval = { callId: firstApproval.callId, taskId: firstApproval.taskId, toolName: firstApproval.displayName || firstApproval.name, }; - // Push tool approval card to Chat const approvalResponse = renderResponse( { kind: 'task', @@ -461,16 +540,17 @@ export class ChatBridgeHandler { status: { state: 'input-required', timestamp: new Date().toISOString(), - message: - streamEvent.kind === 'status-update' - ? streamEvent.status?.message - : undefined, + // eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion + message: approvalStatusMessage as + | import('@a2a-js/sdk').Message + | undefined, }, history: [], artifacts: [], }, message.thread.threadKey || threadName, threadName, + this.webhookUrl, ); await this.chatApiClient.sendMessage(spaceName, threadName, { @@ -482,25 +562,6 @@ export class ChatBridgeHandler { logger.info( `[ChatBridge] Pushed tool approval card: ${firstApproval.displayName || firstApproval.name}`, ); - // Break immediately — the server is waiting for the client to - // respond to the approval. If we keep waiting for stream events, - // asyncProcessing stays true and the user's "approve" message - // hits the async guard. - break; - } - - // Track latest text content - if (extracted.text) { - lastText = extracted.text; - } - - // On terminal or input-required state, stop streaming - if ( - extracted.state && - (TERMINAL_STATES.has(extracted.state) || - extracted.state === 'input-required') - ) { - break; } } @@ -527,7 +588,7 @@ export class ChatBridgeHandler { text: lastText, }); logger.info( - `[ChatBridge] Pushed final response (${lastText.length} chars)`, + `[ChatBridge] Pushed final response (${lastText.length} chars): "${lastText.substring(0, 200)}"`, ); } else if (!lastText && !sentFinalResponse) { await this.chatApiClient.sendMessage(spaceName, threadName, { @@ -595,10 +656,10 @@ export class ChatBridgeHandler { } /** - * Auto-approves tool calls in YOLO mode. - * Sends all pending approvals in a single batch message to avoid hanging - * when the agent needs ALL tools approved before proceeding. - * Loops if the response contains further approval requests. + * Auto-approves tool calls in YOLO mode using streaming. + * Sends approvals and collects streamed text from the SSE response. + * The A2A server streams text incrementally and closes with final:true + * at input-required (more tools) or completed (done). */ private async autoApproveTools( session: SessionInfo, @@ -621,20 +682,19 @@ export class ChatBridgeHandler { let lastState: string | undefined; let lastText: string | undefined; const approvedNames: string[] = []; - const MAX_ROUNDS = 10; + const MAX_ROUNDS = 20; for (let round = 0; round < MAX_ROUNDS && !session.cancelled; round++) { if (approvalsToProcess.length === 0) break; - // Log what we're approving for (const a of approvalsToProcess) { const label = a.displayName || a.name; logger.info(`[ChatBridge] YOLO auto-approving: ${label}`); approvedNames.push(label); } - // Send ALL approvals in a single batch message - const response = await this.a2aClient.sendBatchToolConfirmations( + // Stream tool confirmations — text arrives incrementally via SSE + const stream = this.a2aClient.sendBatchToolConfirmationsStream( approvalsToProcess.map((a) => ({ callId: a.callId, outcome: 'proceed_once', @@ -643,28 +703,41 @@ export class ChatBridgeHandler { { contextId: lastContextId ?? session.contextId }, ); - const { contextId: newCtxId, taskId: newTaskId } = - extractIdsFromResponse(response); - if (newCtxId) lastContextId = newCtxId; - if (newTaskId) lastTaskId = newTaskId; + approvalsToProcess = []; - if (response.kind === 'task' && response.status?.state) { - lastState = response.status.state; + // Consume the stream, collecting text and detecting new tool approvals + let eventCount = 0; + for await (const event of stream) { + if (session.cancelled) break; + eventCount++; + + const extracted = extractFromStreamEvent(event); + if (extracted.taskId) lastTaskId = extracted.taskId; + if (extracted.contextId) lastContextId = extracted.contextId; + if (extracted.state) lastState = extracted.state; + if (extracted.text) lastText = extracted.text; + + logger.info( + `[ChatBridge] YOLO event #${eventCount}: kind=${event.kind}, ` + + `state=${extracted.state ?? 'n/a'}, text=${extracted.text.length} chars`, + ); + + // New tool approvals → break to send them in next round + const pending = extracted.toolApprovals.filter( + (a) => a.status === 'awaiting_approval', + ); + if (pending.length > 0) { + approvalsToProcess = pending; + break; + } } - // Extract text from this response - const responseParts = extractAllParts(response); - const responseText = extractTextFromParts(responseParts); - if (responseText) lastText = responseText; - - // Break if terminal - if (lastState && TERMINAL_STATES.has(lastState)) break; - - // Check for more pending approvals - const newApprovals = extractToolApprovals(response).filter( - (a) => a.status === 'awaiting_approval', + logger.info( + `[ChatBridge] YOLO round ${round}: state=${lastState ?? 'none'}, ` + + `text=${lastText?.length ?? 0} chars, newApprovals=${approvalsToProcess.length}`, ); - approvalsToProcess = newApprovals; + + if (lastState && TERMINAL_STATES.has(lastState)) break; } logger.info( @@ -698,7 +771,10 @@ export class ChatBridgeHandler { `[ChatBridge] CARD_CLICKED: function=${action.actionMethodName}`, ); - if (action.actionMethodName === 'tool_confirmation') { + if ( + action.actionMethodName === 'tool_confirmation' || + action.actionMethodName === this.webhookUrl + ) { const params = action.parameters || []; const paramMap = new Map(params.map((p) => [p.key, p.value])); const callId = paramMap.get('callId'); diff --git a/packages/a2a-server/src/chat-bridge/response-renderer.ts b/packages/a2a-server/src/chat-bridge/response-renderer.ts index 58d4a53136..49c5acff6b 100644 --- a/packages/a2a-server/src/chat-bridge/response-renderer.ts +++ b/packages/a2a-server/src/chat-bridge/response-renderer.ts @@ -69,6 +69,7 @@ export function renderResponse( response: A2AResponse, threadKey?: string, threadName?: string, + webhookUrl?: string, ): ChatResponse { const parts = extractAllParts(response); const textContent = extractTextFromParts(parts); @@ -94,7 +95,7 @@ export function renderResponse( // so we skip rendering approval cards for those. for (const approval of dedupedApprovals) { if (approval.status === 'awaiting_approval') { - cards.push(renderToolApprovalCard(approval)); + cards.push(renderToolApprovalCard(approval, webhookUrl)); } } @@ -353,7 +354,10 @@ function extractCommandSummary(approval: ToolApprovalInfo): string { * Renders a tool approval surface as a compact Google Chat Card V2 * with clickable Approve/Reject buttons. */ -function renderToolApprovalCard(approval: ToolApprovalInfo): ChatCardV2 { +function renderToolApprovalCard( + approval: ToolApprovalInfo, + webhookUrl?: string, +): ChatCardV2 { const widgets: ChatWidget[] = []; const toolLabel = approval.displayName || approval.name; @@ -391,7 +395,7 @@ function renderToolApprovalCard(approval: ToolApprovalInfo): ChatCardV2 { text: 'Approve', onClick: { action: { - function: 'tool_confirmation', + function: webhookUrl ?? 'tool_confirmation', parameters: [ { key: 'callId', value: approval.callId }, { key: 'outcome', value: 'proceed_once' }, @@ -404,7 +408,7 @@ function renderToolApprovalCard(approval: ToolApprovalInfo): ChatCardV2 { text: 'Always Allow', onClick: { action: { - function: 'tool_confirmation', + function: webhookUrl ?? 'tool_confirmation', parameters: [ { key: 'callId', value: approval.callId }, { key: 'outcome', value: 'proceed_always_tool' }, @@ -417,7 +421,7 @@ function renderToolApprovalCard(approval: ToolApprovalInfo): ChatCardV2 { text: 'Reject', onClick: { action: { - function: 'tool_confirmation', + function: webhookUrl ?? 'tool_confirmation', parameters: [ { key: 'callId', value: approval.callId }, { key: 'outcome', value: 'cancel' }, @@ -466,47 +470,50 @@ export function extractFromStreamEvent(event: A2AStreamEventData): { taskId = event.taskId; contextId = event.contextId; - // Extract parts from the status message const parts: Part[] = event.status?.message?.parts ?? []; - const a2uiGroups = extractA2UIParts(parts); - for (const messages of a2uiGroups) { - parseA2UIMessages(messages, toolApprovals, agentResponses, thoughts); - } - // Also extract plain text + // Extract plain text FIRST (incremental chunks) so A2UI accumulated + // text is added AFTER — backward iteration will prefer A2UI. const plainText = extractTextFromParts(parts); if (plainText) { agentResponses.push({ text: plainText, status: '' }); } + + const a2uiGroups = extractA2UIParts(parts); + for (const messages of a2uiGroups) { + parseA2UIMessages(messages, toolApprovals, agentResponses, thoughts); + } } else if (event.kind === 'task') { state = event.status?.state; taskId = event.id; contextId = event.contextId; const parts = extractAllParts(event); - const a2uiGroups = extractA2UIParts(parts); - for (const messages of a2uiGroups) { - parseA2UIMessages(messages, toolApprovals, agentResponses, thoughts); - } const plainText = extractTextFromParts(parts); if (plainText) { agentResponses.push({ text: plainText, status: '' }); } + + const a2uiGroups = extractA2UIParts(parts); + for (const messages of a2uiGroups) { + parseA2UIMessages(messages, toolApprovals, agentResponses, thoughts); + } } else if (event.kind === 'message') { contextId = event.contextId; taskId = event.taskId; const parts: Part[] = event.parts ?? []; - const a2uiGroups = extractA2UIParts(parts); - for (const messages of a2uiGroups) { - parseA2UIMessages(messages, toolApprovals, agentResponses, thoughts); - } const plainText = extractTextFromParts(parts); if (plainText) { agentResponses.push({ text: plainText, status: '' }); } + + const a2uiGroups = extractA2UIParts(parts); + for (const messages of a2uiGroups) { + parseA2UIMessages(messages, toolApprovals, agentResponses, thoughts); + } } // Build text from the last non-empty agent response