diff --git a/packages/a2a-server/src/chat-bridge/a2a-bridge-client.ts b/packages/a2a-server/src/chat-bridge/a2a-bridge-client.ts index aab777b450..71a3e51b44 100644 --- a/packages/a2a-server/src/chat-bridge/a2a-bridge-client.ts +++ b/packages/a2a-server/src/chat-bridge/a2a-bridge-client.ts @@ -331,4 +331,68 @@ export class A2ABridgeClient { return this.client.sendMessage(params); } + + /** + * Sends multiple tool confirmations in a single A2A message. + * Needed when the agent requests multiple tool approvals at once — + * sending them one at a time with blocking mode would hang because + * the agent waits for ALL approvals before proceeding. + */ + async sendBatchToolConfirmations( + approvals: Array<{ callId: string; outcome: string; taskId: string }>, + options: { contextId?: string }, + ): Promise { + if (!this.client) { + throw new Error('A2A client not initialized. Call initialize() first.'); + } + + const parts: Part[] = approvals.map( + (approval) => + ({ + kind: 'data', + // eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion + data: [ + { + version: 'v0.10', + action: { + name: 'tool_confirmation', + surfaceId: `tool_approval_${approval.taskId}_${approval.callId}`, + sourceComponentId: + approval.outcome === 'cancel' + ? 'reject_button' + : 'approve_button', + timestamp: new Date().toISOString(), + context: { + callId: approval.callId, + outcome: approval.outcome, + taskId: approval.taskId, + }, + }, + }, + ] as unknown as Record, + metadata: { + mimeType: A2UI_MIME_TYPE, + }, + }) as Part, + ); + + const params: MessageSendParams = { + message: { + kind: 'message', + role: 'user', + messageId: uuidv4(), + parts, + contextId: options.contextId, + taskId: approvals[0]?.taskId, + metadata: { + extensions: [A2UI_EXTENSION_URI], + }, + }, + configuration: { + blocking: true, + }, + }; + + return this.client.sendMessage(params); + } } diff --git a/packages/a2a-server/src/chat-bridge/handler.ts b/packages/a2a-server/src/chat-bridge/handler.ts index 16238599d8..76fd12dec5 100644 --- a/packages/a2a-server/src/chat-bridge/handler.ts +++ b/packages/a2a-server/src/chat-bridge/handler.ts @@ -16,9 +16,15 @@ import { SessionStore } from './session-store.js'; import { A2ABridgeClient, extractIdsFromResponse, + extractAllParts, + extractTextFromParts, } from './a2a-bridge-client.js'; import { ChatApiClient } from './chat-api-client.js'; -import { renderResponse, extractFromStreamEvent } from './response-renderer.js'; +import { + renderResponse, + extractFromStreamEvent, + extractToolApprovals, +} from './response-renderer.js'; import { logger } from '../utils/logger.js'; const TERMINAL_STATES = new Set([ @@ -190,13 +196,15 @@ export class ChatBridgeHandler { } /** - * Handles text-based tool approval responses synchronously. + * Handles text-based tool approval responses. + * Returns an immediate acknowledgment and processes the confirmation + * asynchronously, pushing the agent's response via Chat API. */ - private async handleToolApprovalText( + private handleToolApprovalText( event: ChatEvent, session: SessionInfo, trimmed: string, - ): Promise { + ): ChatResponse { const message = event.message!; const threadName = message.thread.name; const approval = session.pendingToolApproval!; @@ -216,6 +224,46 @@ export class ChatBridgeHandler { session.pendingToolApproval = undefined; + // Fire-and-forget async processing of the tool confirmation + this.processToolApprovalAsync(event, session, approval, outcome).catch( + (err) => { + const msg = err instanceof Error ? err.message : 'Unknown error'; + logger.error( + `[ChatBridge] Tool approval async processing failed: ${msg}`, + err, + ); + }, + ); + + const ackText = isReject + ? '_Tool rejected._' + : '_Tool approved, processing..._'; + return { + text: ackText, + thread: { + threadKey: message.thread.threadKey || threadName, + name: threadName, + }, + }; + } + + /** + * Processes a tool confirmation asynchronously. + * Sends the confirmation to the A2A server, handles the response, + * and pushes results to Google Chat via the REST API. + */ + private async processToolApprovalAsync( + event: ChatEvent, + session: SessionInfo, + approval: { callId: string; taskId: string; toolName: string }, + outcome: string, + ): Promise { + const message = event.message!; + const threadName = message.thread.name; + const spaceName = event.space.name; + + session.asyncProcessing = true; + try { const response = await this.a2aClient.sendToolConfirmation( approval.callId, @@ -224,20 +272,83 @@ export class ChatBridgeHandler { { contextId: session.contextId }, ); + if (session.cancelled) return; + const { contextId: newCtxId, taskId: newTaskId } = extractIdsFromResponse(response); if (newCtxId) session.contextId = newCtxId; this.sessionStore.updateTaskId(threadName, newTaskId); - const threadKey = message.thread.threadKey || threadName; - return renderResponse(response, threadKey, threadName); + // Check for new pending approvals in the response + const newApprovals = extractToolApprovals(response).filter( + (a) => a.status === 'awaiting_approval', + ); + + if (session.yoloMode && newApprovals.length > 0) { + // YOLO: auto-approve any new tools + const autoResult = await this.autoApproveTools( + session, + newApprovals, + session.contextId, + ); + if (autoResult.lastContextId) + session.contextId = autoResult.lastContextId; + if (autoResult.lastTaskId !== undefined) { + const isTerminal = autoResult.lastState + ? TERMINAL_STATES.has(autoResult.lastState) + : false; + this.sessionStore.updateTaskId( + threadName, + isTerminal ? undefined : autoResult.lastTaskId, + ); + } + if (autoResult.text) { + await this.chatApiClient.sendMessage(spaceName, threadName, { + text: autoResult.text, + }); + } + } else if (newApprovals.length > 0) { + // Non-YOLO: push new approval card + session.pendingToolApproval = { + callId: newApprovals[0].callId, + taskId: newApprovals[0].taskId, + toolName: newApprovals[0].displayName || newApprovals[0].name, + }; + const rendered = renderResponse( + response, + message.thread.threadKey || threadName, + threadName, + ); + await this.chatApiClient.sendMessage(spaceName, threadName, { + text: rendered.text, + cardsV2: rendered.cardsV2, + }); + logger.info( + `[ChatBridge] Pushed new approval card after confirmation: ${newApprovals[0].displayName || newApprovals[0].name}`, + ); + } else { + // No more approvals — push the agent's response + const rendered = renderResponse(response); + const responseText = rendered.text || '_Agent completed._'; + await this.chatApiClient.sendMessage(spaceName, threadName, { + text: responseText, + }); + logger.info( + `[ChatBridge] Pushed post-approval response (${responseText.length} chars)`, + ); + } } catch (error) { + if (session.cancelled) return; const errorMsg = error instanceof Error ? error.message : 'Unknown error'; logger.error( - `[ChatBridge] Error sending tool confirmation: ${errorMsg}`, + `[ChatBridge] Error in tool approval async: ${errorMsg}`, error, ); - return { text: `Error processing tool confirmation: ${errorMsg}` }; + await this.chatApiClient.sendMessage(spaceName, threadName, { + text: `Error processing tool confirmation: ${errorMsg}`, + }); + } finally { + session.asyncProcessing = false; } } @@ -269,6 +380,14 @@ export class ChatBridgeHandler { let sentFinalResponse = false; for await (const streamEvent of stream) { + // Check if session was cancelled (e.g. by /reset) + if (session.cancelled) { + logger.info( + `[ChatBridge] Session cancelled, stopping stream for ${threadName}`, + ); + break; + } + const extracted = extractFromStreamEvent(streamEvent); if (extracted.taskId) lastTaskId = extracted.taskId; @@ -280,6 +399,23 @@ export class ChatBridgeHandler { (a) => a.status === 'awaiting_approval', ); if (pendingApprovals.length > 0) { + // YOLO mode: auto-approve all tools without user interaction + if (session.yoloMode) { + const autoApproved = await this.autoApproveTools( + session, + pendingApprovals, + lastContextId, + ); + if (autoApproved.lastContextId) + lastContextId = autoApproved.lastContextId; + if (autoApproved.lastTaskId) lastTaskId = autoApproved.lastTaskId; + if (autoApproved.lastState) lastState = autoApproved.lastState; + if (autoApproved.text) lastText = autoApproved.text; + // Auto-approval loop handles everything; break out of stream + break; + } + + // Non-YOLO: push approval card and wait for user input const firstApproval = pendingApprovals[0]; session.pendingToolApproval = { callId: firstApproval.callId, @@ -317,6 +453,11 @@ export class ChatBridgeHandler { logger.info( `[ChatBridge] Pushed tool approval card: ${firstApproval.displayName || firstApproval.name}`, ); + // Break immediately — the server is waiting for the client to + // respond to the approval. If we keep waiting for stream events, + // asyncProcessing stays true and the user's "approve" message + // hits the async guard. + break; } // Track latest text content @@ -334,6 +475,14 @@ export class ChatBridgeHandler { } } + // If session was cancelled, don't push any messages + if (session.cancelled) { + logger.info( + `[ChatBridge] Skipping response push for cancelled session ${threadName}`, + ); + return; + } + // Update session IDs if (lastContextId) session.contextId = lastContextId; // Clear taskId on terminal states so next message starts a fresh task @@ -357,6 +506,7 @@ export class ChatBridgeHandler { }); } } catch (error) { + if (session.cancelled) return; // Don't push errors for cancelled sessions const errorMsg = error instanceof Error ? error.message : 'Unknown error'; logger.error(`[ChatBridge] Async processing error: ${errorMsg}`, error); await this.chatApiClient.sendMessage(spaceName, threadName, { @@ -368,9 +518,90 @@ export class ChatBridgeHandler { } /** - * Handles a CARD_CLICKED event: user clicked a button on a card. + * Auto-approves tool calls in YOLO mode. + * Sends all pending approvals in a single batch message to avoid hanging + * when the agent needs ALL tools approved before proceeding. + * Loops if the response contains further approval requests. */ - private async handleCardClicked(event: ChatEvent): Promise { + private async autoApproveTools( + session: SessionInfo, + initialApprovals: Array<{ + callId: string; + taskId: string; + name: string; + displayName: string; + }>, + contextId: string | undefined, + ): Promise<{ + lastContextId?: string; + lastTaskId?: string; + lastState?: string; + text?: string; + }> { + let approvalsToProcess = initialApprovals; + let lastContextId = contextId; + let lastTaskId: string | undefined; + let lastState: string | undefined; + let lastText: string | undefined; + const approvedNames: string[] = []; + const MAX_ROUNDS = 10; + + for (let round = 0; round < MAX_ROUNDS && !session.cancelled; round++) { + if (approvalsToProcess.length === 0) break; + + // Log what we're approving + for (const a of approvalsToProcess) { + const label = a.displayName || a.name; + logger.info(`[ChatBridge] YOLO auto-approving: ${label}`); + approvedNames.push(label); + } + + // Send ALL approvals in a single batch message + const response = await this.a2aClient.sendBatchToolConfirmations( + approvalsToProcess.map((a) => ({ + callId: a.callId, + outcome: 'proceed_once', + taskId: a.taskId, + })), + { contextId: lastContextId ?? session.contextId }, + ); + + const { contextId: newCtxId, taskId: newTaskId } = + extractIdsFromResponse(response); + if (newCtxId) lastContextId = newCtxId; + if (newTaskId) lastTaskId = newTaskId; + + if (response.kind === 'task' && response.status?.state) { + lastState = response.status.state; + } + + // Extract text from this response + const responseParts = extractAllParts(response); + const responseText = extractTextFromParts(responseParts); + if (responseText) lastText = responseText; + + // Break if terminal + if (lastState && TERMINAL_STATES.has(lastState)) break; + + // Check for more pending approvals + const newApprovals = extractToolApprovals(response).filter( + (a) => a.status === 'awaiting_approval', + ); + approvalsToProcess = newApprovals; + } + + logger.info( + `[ChatBridge] YOLO auto-approved ${approvedNames.length} tools: ${approvedNames.join(', ')}`, + ); + + return { lastContextId, lastTaskId, lastState, text: lastText }; + } + + /** + * Handles a CARD_CLICKED event: user clicked a button on a card. + * Fires async processing and returns an immediate UPDATE_MESSAGE ack. + */ + private handleCardClicked(event: ChatEvent): ChatResponse { const action = event.action; if (!action) { return { text: 'Error: Missing action data.' }; @@ -391,67 +622,45 @@ export class ChatBridgeHandler { ); if (action.actionMethodName === 'tool_confirmation') { - return this.handleToolConfirmation(event, session.contextId); + const params = action.parameters || []; + const paramMap = new Map(params.map((p) => [p.key, p.value])); + const callId = paramMap.get('callId'); + const outcome = paramMap.get('outcome'); + const taskId = paramMap.get('taskId'); + + if (!callId || !outcome || !taskId) { + return { text: 'Error: Missing tool confirmation parameters.' }; + } + + const isReject = outcome === 'cancel'; + const toolName = session.pendingToolApproval?.toolName ?? 'Tool'; + + // Clear pending approval tracked for text-based flow + session.pendingToolApproval = undefined; + + // Fire-and-forget async processing + this.processToolApprovalAsync( + event, + session, + { callId, taskId, toolName }, + outcome, + ).catch((err) => { + const msg = err instanceof Error ? err.message : 'Unknown error'; + logger.error(`[ChatBridge] Card click async failed: ${msg}`, err); + }); + + // Update the card in-place with an acknowledgment + return { + actionResponse: { type: 'UPDATE_MESSAGE' }, + text: isReject + ? `*${toolName} — Rejected*` + : `*${toolName} — Approved, processing...*`, + }; } return { text: `Unknown action: ${action.actionMethodName}` }; } - /** - * Handles tool confirmation actions from card button clicks. - */ - private async handleToolConfirmation( - event: ChatEvent, - contextId: string, - ): Promise { - const params = event.action?.parameters || []; - const paramMap = new Map(params.map((p) => [p.key, p.value])); - - const callId = paramMap.get('callId'); - const outcome = paramMap.get('outcome'); - const taskId = paramMap.get('taskId'); - - if (!callId || !outcome || !taskId) { - return { text: 'Error: Missing tool confirmation parameters.' }; - } - - logger.info( - `[ChatBridge] Tool confirmation: callId=${callId}, outcome=${outcome}, taskId=${taskId}`, - ); - - try { - const response = await this.a2aClient.sendToolConfirmation( - callId, - outcome, - taskId, - { contextId }, - ); - - // Update session - const threadName = event.message?.thread?.name; - if (threadName) { - const { contextId: newContextId, taskId: newTaskId } = - extractIdsFromResponse(response); - if (newContextId) { - const session = this.sessionStore.get(threadName); - if (session) session.contextId = newContextId; - } - this.sessionStore.updateTaskId(threadName, newTaskId); - } - - return renderResponse(response); - } catch (error) { - const errorMsg = error instanceof Error ? error.message : 'Unknown error'; - logger.error( - `[ChatBridge] Error sending tool confirmation: ${errorMsg}`, - error, - ); - return { - text: `Error processing tool confirmation: ${errorMsg}`, - }; - } - } - /** * Handles ADDED_TO_SPACE event: bot was added to a space or DM. */ diff --git a/packages/a2a-server/src/chat-bridge/response-renderer.ts b/packages/a2a-server/src/chat-bridge/response-renderer.ts index 474c25da1a..58d4a53136 100644 --- a/packages/a2a-server/src/chat-bridge/response-renderer.ts +++ b/packages/a2a-server/src/chat-bridge/response-renderer.ts @@ -15,12 +15,7 @@ * server-side rendering to a constrained card format. */ -import type { - ChatResponse, - ChatCardV2, - ChatCardSection, - ChatWidget, -} from './types.js'; +import type { ChatResponse, ChatCardV2, ChatWidget } from './types.js'; import type { Part } from '@a2a-js/sdk'; import { type A2AResponse, @@ -355,61 +350,95 @@ function extractCommandSummary(approval: ToolApprovalInfo): string { } /** - * Renders a tool approval surface as a Google Chat Card V2. + * Renders a tool approval surface as a compact Google Chat Card V2 + * with clickable Approve/Reject buttons. */ function renderToolApprovalCard(approval: ToolApprovalInfo): ChatCardV2 { const widgets: ChatWidget[] = []; + const toolLabel = approval.displayName || approval.name; // Show a concise summary of what the tool will do. - // For shell commands, extract just the command string from the args JSON. const commandSummary = extractCommandSummary(approval); if (commandSummary) { widgets.push({ decoratedText: { text: `\`${commandSummary}\``, - topLabel: approval.displayName || approval.name, + topLabel: toolLabel, startIcon: { knownIcon: 'DESCRIPTION' }, wrapText: true, }, }); } else if (approval.args && approval.args !== 'No arguments') { - // Fallback: show truncated args const truncatedArgs = - approval.args.length > 300 - ? approval.args.substring(0, 300) + '...' + approval.args.length > 200 + ? approval.args.substring(0, 200) + '...' : approval.args; widgets.push({ decoratedText: { text: truncatedArgs, - topLabel: approval.displayName || approval.name, + topLabel: toolLabel, startIcon: { knownIcon: 'DESCRIPTION' }, wrapText: true, }, }); } - // Text-based approval instructions (card click buttons don't work - // with the current Add-ons routing configuration) + // Clickable buttons for approve/reject widgets.push({ - textParagraph: { - text: 'Reply approve, always allow, or reject', + buttonList: { + buttons: [ + { + text: 'Approve', + onClick: { + action: { + function: 'tool_confirmation', + parameters: [ + { key: 'callId', value: approval.callId }, + { key: 'outcome', value: 'proceed_once' }, + { key: 'taskId', value: approval.taskId }, + ], + }, + }, + }, + { + text: 'Always Allow', + onClick: { + action: { + function: 'tool_confirmation', + parameters: [ + { key: 'callId', value: approval.callId }, + { key: 'outcome', value: 'proceed_always_tool' }, + { key: 'taskId', value: approval.taskId }, + ], + }, + }, + }, + { + text: 'Reject', + onClick: { + action: { + function: 'tool_confirmation', + parameters: [ + { key: 'callId', value: approval.callId }, + { key: 'outcome', value: 'cancel' }, + { key: 'taskId', value: approval.taskId }, + ], + }, + }, + color: { red: 0.8, green: 0.2, blue: 0.2 }, + }, + ], }, }); - const sections: ChatCardSection[] = [ - { - widgets, - }, - ]; - return { cardId: `tool_approval_${approval.callId}`, card: { header: { - title: 'Tool Approval Required', - subtitle: approval.displayName || approval.name, + title: toolLabel, + subtitle: 'Approval Required', }, - sections, + sections: [{ widgets }], }, }; } diff --git a/packages/a2a-server/src/chat-bridge/routes.ts b/packages/a2a-server/src/chat-bridge/routes.ts index b259f2205d..b15cfa3e97 100644 --- a/packages/a2a-server/src/chat-bridge/routes.ts +++ b/packages/a2a-server/src/chat-bridge/routes.ts @@ -245,7 +245,9 @@ function normalizeEvent(raw: Record): ChatEvent | null { * Add-ons expects: {hostAppDataAction: {chatDataAction: {createMessageAction: {message}}}} */ function wrapAddOnsResponse(response: ChatResponse): Record { - // Build the message object for the Add-ons format + // Build the message object for the Add-ons format. + // Include thread info so replies appear in the same thread as the user's + // message. Without it, createMessageAction creates a top-level message. const message: Record = {}; if (response.text) { message['text'] = response.text; @@ -253,14 +255,8 @@ function wrapAddOnsResponse(response: ChatResponse): Record { if (response.cardsV2) { message['cardsV2'] = response.cardsV2; } - // Include thread info so the reply goes to the user's thread - // instead of appearing as a top-level message if (response.thread) { - const thread: Record = {}; - if (response.thread.name) thread['name'] = response.thread.name; - if (response.thread.threadKey) - thread['threadKey'] = response.thread.threadKey; - message['thread'] = thread; + message['thread'] = response.thread; } // For action responses (like CARD_CLICKED acknowledgments), use updateMessageAction diff --git a/packages/a2a-server/src/chat-bridge/session-store.ts b/packages/a2a-server/src/chat-bridge/session-store.ts index 8142973ecc..b9c126fabe 100644 --- a/packages/a2a-server/src/chat-bridge/session-store.ts +++ b/packages/a2a-server/src/chat-bridge/session-store.ts @@ -39,6 +39,8 @@ export interface SessionInfo { yoloMode?: boolean; /** When true, an async task is currently processing. */ asyncProcessing?: boolean; + /** When true, session has been cancelled (e.g. by /reset). Signals async processing to stop. */ + cancelled?: boolean; } /** Serializable subset of SessionInfo for GCS persistence. */ @@ -194,6 +196,11 @@ export class SessionStore { * Removes a session (e.g. when bot is removed from space). */ remove(threadName: string): void { + const session = this.sessions.get(threadName); + if (session) { + // Signal any in-flight async processing to stop + session.cancelled = true; + } this.sessions.delete(threadName); this.dirty = true; }