fix: session cancellation, YOLO auto-approval, threading, and approval UX

- Add session cancellation on /reset to stop in-flight async streams
- Implement per-session YOLO auto-approval with batch tool confirmations
  (sendBatchToolConfirmations sends all approvals in one A2A message to
  avoid hangs when agent needs ALL tools approved before proceeding)
- Fix threading: include thread info in Add-ons response wrapper so
  replies appear in the user's thread instead of top-level
- Make tool approval async: return immediate ack, process confirmation
  in background, push result via Chat API (fixes "Agent is processing..."
  empty response after approve)
- Replace text-based approval with clickable Approve/Always Allow/Reject
  buttons on compact Cards V2
- Wire CARD_CLICKED handler to async approval flow (fire-and-forget
  with UPDATE_MESSAGE ack)

Tested via Cloud Run proxy curl suite:
  /reset, simple messages, async guard, /yolo, /safe, CARD_CLICKED
  (approve + reject), ADDED_TO_SPACE, empty message, cancellation.
This commit is contained in:
Adam Weidman
2026-02-19 11:42:06 -05:00
parent 45c9545f78
commit fc9623248d
5 changed files with 405 additions and 100 deletions
@@ -331,4 +331,68 @@ export class A2ABridgeClient {
return this.client.sendMessage(params);
}
/**
* Sends multiple tool confirmations in a single A2A message.
* Needed when the agent requests multiple tool approvals at once —
* sending them one at a time with blocking mode would hang because
* the agent waits for ALL approvals before proceeding.
*/
async sendBatchToolConfirmations(
approvals: Array<{ callId: string; outcome: string; taskId: string }>,
options: { contextId?: string },
): Promise<A2AResponse> {
if (!this.client) {
throw new Error('A2A client not initialized. Call initialize() first.');
}
const parts: Part[] = approvals.map(
(approval) =>
({
kind: 'data',
// eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion
data: [
{
version: 'v0.10',
action: {
name: 'tool_confirmation',
surfaceId: `tool_approval_${approval.taskId}_${approval.callId}`,
sourceComponentId:
approval.outcome === 'cancel'
? 'reject_button'
: 'approve_button',
timestamp: new Date().toISOString(),
context: {
callId: approval.callId,
outcome: approval.outcome,
taskId: approval.taskId,
},
},
},
] as unknown as Record<string, unknown>,
metadata: {
mimeType: A2UI_MIME_TYPE,
},
}) as Part,
);
const params: MessageSendParams = {
message: {
kind: 'message',
role: 'user',
messageId: uuidv4(),
parts,
contextId: options.contextId,
taskId: approvals[0]?.taskId,
metadata: {
extensions: [A2UI_EXTENSION_URI],
},
},
configuration: {
blocking: true,
},
};
return this.client.sendMessage(params);
}
}
+275 -66
View File
@@ -16,9 +16,15 @@ import { SessionStore } from './session-store.js';
import {
A2ABridgeClient,
extractIdsFromResponse,
extractAllParts,
extractTextFromParts,
} from './a2a-bridge-client.js';
import { ChatApiClient } from './chat-api-client.js';
import { renderResponse, extractFromStreamEvent } from './response-renderer.js';
import {
renderResponse,
extractFromStreamEvent,
extractToolApprovals,
} from './response-renderer.js';
import { logger } from '../utils/logger.js';
const TERMINAL_STATES = new Set([
@@ -190,13 +196,15 @@ export class ChatBridgeHandler {
}
/**
* Handles text-based tool approval responses synchronously.
* Handles text-based tool approval responses.
* Returns an immediate acknowledgment and processes the confirmation
* asynchronously, pushing the agent's response via Chat API.
*/
private async handleToolApprovalText(
private handleToolApprovalText(
event: ChatEvent,
session: SessionInfo,
trimmed: string,
): Promise<ChatResponse> {
): ChatResponse {
const message = event.message!;
const threadName = message.thread.name;
const approval = session.pendingToolApproval!;
@@ -216,6 +224,46 @@ export class ChatBridgeHandler {
session.pendingToolApproval = undefined;
// Fire-and-forget async processing of the tool confirmation
this.processToolApprovalAsync(event, session, approval, outcome).catch(
(err) => {
const msg = err instanceof Error ? err.message : 'Unknown error';
logger.error(
`[ChatBridge] Tool approval async processing failed: ${msg}`,
err,
);
},
);
const ackText = isReject
? '_Tool rejected._'
: '_Tool approved, processing..._';
return {
text: ackText,
thread: {
threadKey: message.thread.threadKey || threadName,
name: threadName,
},
};
}
/**
* Processes a tool confirmation asynchronously.
* Sends the confirmation to the A2A server, handles the response,
* and pushes results to Google Chat via the REST API.
*/
private async processToolApprovalAsync(
event: ChatEvent,
session: SessionInfo,
approval: { callId: string; taskId: string; toolName: string },
outcome: string,
): Promise<void> {
const message = event.message!;
const threadName = message.thread.name;
const spaceName = event.space.name;
session.asyncProcessing = true;
try {
const response = await this.a2aClient.sendToolConfirmation(
approval.callId,
@@ -224,20 +272,83 @@ export class ChatBridgeHandler {
{ contextId: session.contextId },
);
if (session.cancelled) return;
const { contextId: newCtxId, taskId: newTaskId } =
extractIdsFromResponse(response);
if (newCtxId) session.contextId = newCtxId;
this.sessionStore.updateTaskId(threadName, newTaskId);
const threadKey = message.thread.threadKey || threadName;
return renderResponse(response, threadKey, threadName);
// Check for new pending approvals in the response
const newApprovals = extractToolApprovals(response).filter(
(a) => a.status === 'awaiting_approval',
);
if (session.yoloMode && newApprovals.length > 0) {
// YOLO: auto-approve any new tools
const autoResult = await this.autoApproveTools(
session,
newApprovals,
session.contextId,
);
if (autoResult.lastContextId)
session.contextId = autoResult.lastContextId;
if (autoResult.lastTaskId !== undefined) {
const isTerminal = autoResult.lastState
? TERMINAL_STATES.has(autoResult.lastState)
: false;
this.sessionStore.updateTaskId(
threadName,
isTerminal ? undefined : autoResult.lastTaskId,
);
}
if (autoResult.text) {
await this.chatApiClient.sendMessage(spaceName, threadName, {
text: autoResult.text,
});
}
} else if (newApprovals.length > 0) {
// Non-YOLO: push new approval card
session.pendingToolApproval = {
callId: newApprovals[0].callId,
taskId: newApprovals[0].taskId,
toolName: newApprovals[0].displayName || newApprovals[0].name,
};
const rendered = renderResponse(
response,
message.thread.threadKey || threadName,
threadName,
);
await this.chatApiClient.sendMessage(spaceName, threadName, {
text: rendered.text,
cardsV2: rendered.cardsV2,
});
logger.info(
`[ChatBridge] Pushed new approval card after confirmation: ${newApprovals[0].displayName || newApprovals[0].name}`,
);
} else {
// No more approvals — push the agent's response
const rendered = renderResponse(response);
const responseText = rendered.text || '_Agent completed._';
await this.chatApiClient.sendMessage(spaceName, threadName, {
text: responseText,
});
logger.info(
`[ChatBridge] Pushed post-approval response (${responseText.length} chars)`,
);
}
} catch (error) {
if (session.cancelled) return;
const errorMsg = error instanceof Error ? error.message : 'Unknown error';
logger.error(
`[ChatBridge] Error sending tool confirmation: ${errorMsg}`,
`[ChatBridge] Error in tool approval async: ${errorMsg}`,
error,
);
return { text: `Error processing tool confirmation: ${errorMsg}` };
await this.chatApiClient.sendMessage(spaceName, threadName, {
text: `Error processing tool confirmation: ${errorMsg}`,
});
} finally {
session.asyncProcessing = false;
}
}
@@ -269,6 +380,14 @@ export class ChatBridgeHandler {
let sentFinalResponse = false;
for await (const streamEvent of stream) {
// Check if session was cancelled (e.g. by /reset)
if (session.cancelled) {
logger.info(
`[ChatBridge] Session cancelled, stopping stream for ${threadName}`,
);
break;
}
const extracted = extractFromStreamEvent(streamEvent);
if (extracted.taskId) lastTaskId = extracted.taskId;
@@ -280,6 +399,23 @@ export class ChatBridgeHandler {
(a) => a.status === 'awaiting_approval',
);
if (pendingApprovals.length > 0) {
// YOLO mode: auto-approve all tools without user interaction
if (session.yoloMode) {
const autoApproved = await this.autoApproveTools(
session,
pendingApprovals,
lastContextId,
);
if (autoApproved.lastContextId)
lastContextId = autoApproved.lastContextId;
if (autoApproved.lastTaskId) lastTaskId = autoApproved.lastTaskId;
if (autoApproved.lastState) lastState = autoApproved.lastState;
if (autoApproved.text) lastText = autoApproved.text;
// Auto-approval loop handles everything; break out of stream
break;
}
// Non-YOLO: push approval card and wait for user input
const firstApproval = pendingApprovals[0];
session.pendingToolApproval = {
callId: firstApproval.callId,
@@ -317,6 +453,11 @@ export class ChatBridgeHandler {
logger.info(
`[ChatBridge] Pushed tool approval card: ${firstApproval.displayName || firstApproval.name}`,
);
// Break immediately — the server is waiting for the client to
// respond to the approval. If we keep waiting for stream events,
// asyncProcessing stays true and the user's "approve" message
// hits the async guard.
break;
}
// Track latest text content
@@ -334,6 +475,14 @@ export class ChatBridgeHandler {
}
}
// If session was cancelled, don't push any messages
if (session.cancelled) {
logger.info(
`[ChatBridge] Skipping response push for cancelled session ${threadName}`,
);
return;
}
// Update session IDs
if (lastContextId) session.contextId = lastContextId;
// Clear taskId on terminal states so next message starts a fresh task
@@ -357,6 +506,7 @@ export class ChatBridgeHandler {
});
}
} catch (error) {
if (session.cancelled) return; // Don't push errors for cancelled sessions
const errorMsg = error instanceof Error ? error.message : 'Unknown error';
logger.error(`[ChatBridge] Async processing error: ${errorMsg}`, error);
await this.chatApiClient.sendMessage(spaceName, threadName, {
@@ -368,9 +518,90 @@ export class ChatBridgeHandler {
}
/**
* Handles a CARD_CLICKED event: user clicked a button on a card.
* Auto-approves tool calls in YOLO mode.
* Sends all pending approvals in a single batch message to avoid hanging
* when the agent needs ALL tools approved before proceeding.
* Loops if the response contains further approval requests.
*/
private async handleCardClicked(event: ChatEvent): Promise<ChatResponse> {
private async autoApproveTools(
session: SessionInfo,
initialApprovals: Array<{
callId: string;
taskId: string;
name: string;
displayName: string;
}>,
contextId: string | undefined,
): Promise<{
lastContextId?: string;
lastTaskId?: string;
lastState?: string;
text?: string;
}> {
let approvalsToProcess = initialApprovals;
let lastContextId = contextId;
let lastTaskId: string | undefined;
let lastState: string | undefined;
let lastText: string | undefined;
const approvedNames: string[] = [];
const MAX_ROUNDS = 10;
for (let round = 0; round < MAX_ROUNDS && !session.cancelled; round++) {
if (approvalsToProcess.length === 0) break;
// Log what we're approving
for (const a of approvalsToProcess) {
const label = a.displayName || a.name;
logger.info(`[ChatBridge] YOLO auto-approving: ${label}`);
approvedNames.push(label);
}
// Send ALL approvals in a single batch message
const response = await this.a2aClient.sendBatchToolConfirmations(
approvalsToProcess.map((a) => ({
callId: a.callId,
outcome: 'proceed_once',
taskId: a.taskId,
})),
{ contextId: lastContextId ?? session.contextId },
);
const { contextId: newCtxId, taskId: newTaskId } =
extractIdsFromResponse(response);
if (newCtxId) lastContextId = newCtxId;
if (newTaskId) lastTaskId = newTaskId;
if (response.kind === 'task' && response.status?.state) {
lastState = response.status.state;
}
// Extract text from this response
const responseParts = extractAllParts(response);
const responseText = extractTextFromParts(responseParts);
if (responseText) lastText = responseText;
// Break if terminal
if (lastState && TERMINAL_STATES.has(lastState)) break;
// Check for more pending approvals
const newApprovals = extractToolApprovals(response).filter(
(a) => a.status === 'awaiting_approval',
);
approvalsToProcess = newApprovals;
}
logger.info(
`[ChatBridge] YOLO auto-approved ${approvedNames.length} tools: ${approvedNames.join(', ')}`,
);
return { lastContextId, lastTaskId, lastState, text: lastText };
}
/**
* Handles a CARD_CLICKED event: user clicked a button on a card.
* Fires async processing and returns an immediate UPDATE_MESSAGE ack.
*/
private handleCardClicked(event: ChatEvent): ChatResponse {
const action = event.action;
if (!action) {
return { text: 'Error: Missing action data.' };
@@ -391,67 +622,45 @@ export class ChatBridgeHandler {
);
if (action.actionMethodName === 'tool_confirmation') {
return this.handleToolConfirmation(event, session.contextId);
const params = action.parameters || [];
const paramMap = new Map(params.map((p) => [p.key, p.value]));
const callId = paramMap.get('callId');
const outcome = paramMap.get('outcome');
const taskId = paramMap.get('taskId');
if (!callId || !outcome || !taskId) {
return { text: 'Error: Missing tool confirmation parameters.' };
}
const isReject = outcome === 'cancel';
const toolName = session.pendingToolApproval?.toolName ?? 'Tool';
// Clear pending approval tracked for text-based flow
session.pendingToolApproval = undefined;
// Fire-and-forget async processing
this.processToolApprovalAsync(
event,
session,
{ callId, taskId, toolName },
outcome,
).catch((err) => {
const msg = err instanceof Error ? err.message : 'Unknown error';
logger.error(`[ChatBridge] Card click async failed: ${msg}`, err);
});
// Update the card in-place with an acknowledgment
return {
actionResponse: { type: 'UPDATE_MESSAGE' },
text: isReject
? `*${toolName} — Rejected*`
: `*${toolName} — Approved, processing...*`,
};
}
return { text: `Unknown action: ${action.actionMethodName}` };
}
/**
* Handles tool confirmation actions from card button clicks.
*/
private async handleToolConfirmation(
event: ChatEvent,
contextId: string,
): Promise<ChatResponse> {
const params = event.action?.parameters || [];
const paramMap = new Map(params.map((p) => [p.key, p.value]));
const callId = paramMap.get('callId');
const outcome = paramMap.get('outcome');
const taskId = paramMap.get('taskId');
if (!callId || !outcome || !taskId) {
return { text: 'Error: Missing tool confirmation parameters.' };
}
logger.info(
`[ChatBridge] Tool confirmation: callId=${callId}, outcome=${outcome}, taskId=${taskId}`,
);
try {
const response = await this.a2aClient.sendToolConfirmation(
callId,
outcome,
taskId,
{ contextId },
);
// Update session
const threadName = event.message?.thread?.name;
if (threadName) {
const { contextId: newContextId, taskId: newTaskId } =
extractIdsFromResponse(response);
if (newContextId) {
const session = this.sessionStore.get(threadName);
if (session) session.contextId = newContextId;
}
this.sessionStore.updateTaskId(threadName, newTaskId);
}
return renderResponse(response);
} catch (error) {
const errorMsg = error instanceof Error ? error.message : 'Unknown error';
logger.error(
`[ChatBridge] Error sending tool confirmation: ${errorMsg}`,
error,
);
return {
text: `Error processing tool confirmation: ${errorMsg}`,
};
}
}
/**
* Handles ADDED_TO_SPACE event: bot was added to a space or DM.
*/
@@ -15,12 +15,7 @@
* server-side rendering to a constrained card format.
*/
import type {
ChatResponse,
ChatCardV2,
ChatCardSection,
ChatWidget,
} from './types.js';
import type { ChatResponse, ChatCardV2, ChatWidget } from './types.js';
import type { Part } from '@a2a-js/sdk';
import {
type A2AResponse,
@@ -355,61 +350,95 @@ function extractCommandSummary(approval: ToolApprovalInfo): string {
}
/**
* Renders a tool approval surface as a Google Chat Card V2.
* Renders a tool approval surface as a compact Google Chat Card V2
* with clickable Approve/Reject buttons.
*/
function renderToolApprovalCard(approval: ToolApprovalInfo): ChatCardV2 {
const widgets: ChatWidget[] = [];
const toolLabel = approval.displayName || approval.name;
// Show a concise summary of what the tool will do.
// For shell commands, extract just the command string from the args JSON.
const commandSummary = extractCommandSummary(approval);
if (commandSummary) {
widgets.push({
decoratedText: {
text: `\`${commandSummary}\``,
topLabel: approval.displayName || approval.name,
topLabel: toolLabel,
startIcon: { knownIcon: 'DESCRIPTION' },
wrapText: true,
},
});
} else if (approval.args && approval.args !== 'No arguments') {
// Fallback: show truncated args
const truncatedArgs =
approval.args.length > 300
? approval.args.substring(0, 300) + '...'
approval.args.length > 200
? approval.args.substring(0, 200) + '...'
: approval.args;
widgets.push({
decoratedText: {
text: truncatedArgs,
topLabel: approval.displayName || approval.name,
topLabel: toolLabel,
startIcon: { knownIcon: 'DESCRIPTION' },
wrapText: true,
},
});
}
// Text-based approval instructions (card click buttons don't work
// with the current Add-ons routing configuration)
// Clickable buttons for approve/reject
widgets.push({
textParagraph: {
text: 'Reply <b>approve</b>, <b>always allow</b>, or <b>reject</b>',
buttonList: {
buttons: [
{
text: 'Approve',
onClick: {
action: {
function: 'tool_confirmation',
parameters: [
{ key: 'callId', value: approval.callId },
{ key: 'outcome', value: 'proceed_once' },
{ key: 'taskId', value: approval.taskId },
],
},
},
},
{
text: 'Always Allow',
onClick: {
action: {
function: 'tool_confirmation',
parameters: [
{ key: 'callId', value: approval.callId },
{ key: 'outcome', value: 'proceed_always_tool' },
{ key: 'taskId', value: approval.taskId },
],
},
},
},
{
text: 'Reject',
onClick: {
action: {
function: 'tool_confirmation',
parameters: [
{ key: 'callId', value: approval.callId },
{ key: 'outcome', value: 'cancel' },
{ key: 'taskId', value: approval.taskId },
],
},
},
color: { red: 0.8, green: 0.2, blue: 0.2 },
},
],
},
});
const sections: ChatCardSection[] = [
{
widgets,
},
];
return {
cardId: `tool_approval_${approval.callId}`,
card: {
header: {
title: 'Tool Approval Required',
subtitle: approval.displayName || approval.name,
title: toolLabel,
subtitle: 'Approval Required',
},
sections,
sections: [{ widgets }],
},
};
}
@@ -245,7 +245,9 @@ function normalizeEvent(raw: Record<string, unknown>): ChatEvent | null {
* Add-ons expects: {hostAppDataAction: {chatDataAction: {createMessageAction: {message}}}}
*/
function wrapAddOnsResponse(response: ChatResponse): Record<string, unknown> {
// Build the message object for the Add-ons format
// Build the message object for the Add-ons format.
// Include thread info so replies appear in the same thread as the user's
// message. Without it, createMessageAction creates a top-level message.
const message: Record<string, unknown> = {};
if (response.text) {
message['text'] = response.text;
@@ -253,14 +255,8 @@ function wrapAddOnsResponse(response: ChatResponse): Record<string, unknown> {
if (response.cardsV2) {
message['cardsV2'] = response.cardsV2;
}
// Include thread info so the reply goes to the user's thread
// instead of appearing as a top-level message
if (response.thread) {
const thread: Record<string, string> = {};
if (response.thread.name) thread['name'] = response.thread.name;
if (response.thread.threadKey)
thread['threadKey'] = response.thread.threadKey;
message['thread'] = thread;
message['thread'] = response.thread;
}
// For action responses (like CARD_CLICKED acknowledgments), use updateMessageAction
@@ -39,6 +39,8 @@ export interface SessionInfo {
yoloMode?: boolean;
/** When true, an async task is currently processing. */
asyncProcessing?: boolean;
/** When true, session has been cancelled (e.g. by /reset). Signals async processing to stop. */
cancelled?: boolean;
}
/** Serializable subset of SessionInfo for GCS persistence. */
@@ -194,6 +196,11 @@ export class SessionStore {
* Removes a session (e.g. when bot is removed from space).
*/
remove(threadName: string): void {
const session = this.sessions.get(threadName);
if (session) {
// Signal any in-flight async processing to stop
session.cancelled = true;
}
this.sessions.delete(threadName);
this.dirty = true;
}