mirror of
https://github.com/google-gemini/gemini-cli.git
synced 2026-05-18 07:43:00 -07:00
fix: streaming text extraction, YOLO timing, and message chunking
Streaming fixes: - Fix A2UI text extraction preferring incremental chunks over accumulated text by swapping push order in extractFromStreamEvent - Defer tool approval handling to post-stream — server YOLO mode briefly publishes awaiting_approval before auto-approving, which was breaking the stream loop prematurely - Always update latestPendingApprovals to clear stale approvals after server auto-approves Message chunking: - Split long messages at paragraph/line boundaries to stay within Google Chat's 4096 character limit - Cards attach to first chunk only Streaming tool confirmations: - Add sendToolConfirmationStream and sendBatchToolConfirmationsStream for SSE-based tool approval flow README: - Update bridge deployment to max-instances=1 (single instance for in-memory async guard consistency) - Add Known Limitations section Working: - Server YOLO mode with streaming text extraction - Chat API push with proper threading - Session persistence across Cloud Run restarts - Retry logic for agent concurrency exhaustion - Text chunking for long responses - Per-session /yolo and /safe commands Not working: - Tool confirmation streaming with GEMINI_YOLO_MODE=false (executor aborts on SSE disconnect — SDK-level issue) - CARD_CLICKED button routing through Add-ons (text-based approval works as fallback)
This commit is contained in:
@@ -216,7 +216,7 @@ gcloud run deploy gemini-chat-bridge \
|
||||
--cpu=1 \
|
||||
--timeout=60 \
|
||||
--concurrency=80 \
|
||||
--max-instances=5 \
|
||||
--max-instances=1 \
|
||||
--set-env-vars="A2A_SERVER_URL=$A2A_URL,GCS_BUCKET_NAME=gemini-a2a-sessions-$PROJECT_ID"
|
||||
```
|
||||
|
||||
@@ -342,3 +342,15 @@ flushed to GCS every 30 seconds and restored on startup.
|
||||
The Chat bridge includes `thread.name` in all responses. If replies still appear
|
||||
at the top level, ensure the webhook event includes thread information. DM
|
||||
conversations always thread correctly; spaces may need threading enabled.
|
||||
|
||||
## Known Limitations
|
||||
|
||||
- **Google Chat 4096 character limit**: Long agent responses may be truncated by
|
||||
Google Chat. The bridge does not currently split messages into chunks.
|
||||
- **Single bridge instance**: The bridge uses `max-instances=1` so that the
|
||||
in-memory async processing guard works correctly. This means no redundancy
|
||||
during deploys (brief downtime during revision rollover).
|
||||
- **Tool confirmation in streaming mode**: When the A2A server has
|
||||
`GEMINI_YOLO_MODE=false`, tool confirmations via streaming may not return text
|
||||
due to an SDK-level issue (executor aborts on SSE disconnect). Server YOLO
|
||||
mode works correctly.
|
||||
|
||||
@@ -332,6 +332,58 @@ export class A2ABridgeClient {
|
||||
return this.client.sendMessage(params);
|
||||
}
|
||||
|
||||
/**
|
||||
* Sends a tool confirmation via streaming (SSE) so the caller can
|
||||
* follow the full task lifecycle after approval.
|
||||
*/
|
||||
sendToolConfirmationStream(
|
||||
callId: string,
|
||||
outcome: string,
|
||||
taskId: string,
|
||||
options: { contextId?: string },
|
||||
): AsyncGenerator<A2AStreamEventData, void, undefined> {
|
||||
if (!this.client) {
|
||||
throw new Error('A2A client not initialized. Call initialize() first.');
|
||||
}
|
||||
|
||||
const actionPart: Part = {
|
||||
kind: 'data',
|
||||
// eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion
|
||||
data: [
|
||||
{
|
||||
version: 'v0.10',
|
||||
action: {
|
||||
name: 'tool_confirmation',
|
||||
surfaceId: `tool_approval_${taskId}_${callId}`,
|
||||
sourceComponentId:
|
||||
outcome === 'cancel' ? 'reject_button' : 'approve_button',
|
||||
timestamp: new Date().toISOString(),
|
||||
context: { callId, outcome, taskId },
|
||||
},
|
||||
},
|
||||
] as unknown as Record<string, unknown>,
|
||||
metadata: {
|
||||
mimeType: A2UI_MIME_TYPE,
|
||||
},
|
||||
} as Part;
|
||||
|
||||
const params: MessageSendParams = {
|
||||
message: {
|
||||
kind: 'message',
|
||||
role: 'user',
|
||||
messageId: uuidv4(),
|
||||
parts: [actionPart],
|
||||
contextId: options.contextId,
|
||||
taskId,
|
||||
metadata: {
|
||||
extensions: [A2UI_EXTENSION_URI],
|
||||
},
|
||||
},
|
||||
};
|
||||
|
||||
return this.client.sendMessageStream(params);
|
||||
}
|
||||
|
||||
/**
|
||||
* Sends multiple tool confirmations in a single A2A message.
|
||||
* Needed when the agent requests multiple tool approvals at once —
|
||||
@@ -395,4 +447,64 @@ export class A2ABridgeClient {
|
||||
|
||||
return this.client.sendMessage(params);
|
||||
}
|
||||
|
||||
/**
|
||||
* Sends batch tool confirmations via streaming (SSE) so the caller can
|
||||
* follow the full task lifecycle after approval. Returns an async generator
|
||||
* that yields events until the task reaches a terminal state.
|
||||
*/
|
||||
sendBatchToolConfirmationsStream(
|
||||
approvals: Array<{ callId: string; outcome: string; taskId: string }>,
|
||||
options: { contextId?: string },
|
||||
): AsyncGenerator<A2AStreamEventData, void, undefined> {
|
||||
if (!this.client) {
|
||||
throw new Error('A2A client not initialized. Call initialize() first.');
|
||||
}
|
||||
|
||||
const parts: Part[] = approvals.map(
|
||||
(approval) =>
|
||||
({
|
||||
kind: 'data',
|
||||
// eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion
|
||||
data: [
|
||||
{
|
||||
version: 'v0.10',
|
||||
action: {
|
||||
name: 'tool_confirmation',
|
||||
surfaceId: `tool_approval_${approval.taskId}_${approval.callId}`,
|
||||
sourceComponentId:
|
||||
approval.outcome === 'cancel'
|
||||
? 'reject_button'
|
||||
: 'approve_button',
|
||||
timestamp: new Date().toISOString(),
|
||||
context: {
|
||||
callId: approval.callId,
|
||||
outcome: approval.outcome,
|
||||
taskId: approval.taskId,
|
||||
},
|
||||
},
|
||||
},
|
||||
] as unknown as Record<string, unknown>,
|
||||
metadata: {
|
||||
mimeType: A2UI_MIME_TYPE,
|
||||
},
|
||||
}) as Part,
|
||||
);
|
||||
|
||||
const params: MessageSendParams = {
|
||||
message: {
|
||||
kind: 'message',
|
||||
role: 'user',
|
||||
messageId: uuidv4(),
|
||||
parts,
|
||||
contextId: options.contextId,
|
||||
taskId: approvals[0]?.taskId,
|
||||
metadata: {
|
||||
extensions: [A2UI_EXTENSION_URI],
|
||||
},
|
||||
},
|
||||
};
|
||||
|
||||
return this.client.sendMessageStream(params);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -15,6 +15,8 @@ import type { ChatCardV2 } from './types.js';
|
||||
import { logger } from '../utils/logger.js';
|
||||
|
||||
const CHAT_API_BASE = 'https://chat.googleapis.com/v1';
|
||||
/** Google Chat max text length. Leave margin for formatting overhead. */
|
||||
const MAX_TEXT_LENGTH = 4000;
|
||||
|
||||
export interface ChatApiClientConfig {
|
||||
/** Path to service account key JSON file. If not set, uses ADC. */
|
||||
@@ -41,20 +43,38 @@ export class ChatApiClient {
|
||||
|
||||
/**
|
||||
* Sends a new message to a Google Chat space in a specific thread.
|
||||
* Automatically splits text longer than 4000 chars into multiple messages.
|
||||
*/
|
||||
async sendMessage(
|
||||
spaceName: string,
|
||||
threadName: string,
|
||||
options: { text?: string; cardsV2?: ChatCardV2[] },
|
||||
): Promise<string | undefined> {
|
||||
try {
|
||||
if (!this.initialized) await this.initialize();
|
||||
if (!this.initialized) await this.initialize();
|
||||
|
||||
const chunks = options.text ? splitText(options.text) : [''];
|
||||
|
||||
// First chunk gets the cards (if any). Subsequent chunks are text-only.
|
||||
let lastMessageName: string | undefined;
|
||||
for (let i = 0; i < chunks.length; i++) {
|
||||
const message: Record<string, unknown> = {};
|
||||
if (options.text) message['text'] = options.text;
|
||||
if (options.cardsV2) message['cardsV2'] = options.cardsV2;
|
||||
if (chunks[i]) message['text'] = chunks[i];
|
||||
if (i === 0 && options.cardsV2) message['cardsV2'] = options.cardsV2;
|
||||
message['thread'] = { name: threadName };
|
||||
|
||||
const name = await this.postMessage(spaceName, message);
|
||||
if (name) lastMessageName = name;
|
||||
}
|
||||
|
||||
return lastMessageName;
|
||||
}
|
||||
|
||||
/** Posts a single message to the Chat API. */
|
||||
private async postMessage(
|
||||
spaceName: string,
|
||||
message: Record<string, unknown>,
|
||||
): Promise<string | undefined> {
|
||||
try {
|
||||
const url =
|
||||
`${CHAT_API_BASE}/${spaceName}/messages` +
|
||||
`?messageReplyOption=REPLY_MESSAGE_FALLBACK_TO_NEW_THREAD`;
|
||||
@@ -149,3 +169,44 @@ export class ChatApiClient {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Splits text into chunks that fit within Google Chat's character limit.
|
||||
* Splits on paragraph boundaries (double newline) first, then single
|
||||
* newlines, then hard-splits as a last resort.
|
||||
*/
|
||||
function splitText(text: string): string[] {
|
||||
if (text.length <= MAX_TEXT_LENGTH) return [text];
|
||||
|
||||
const chunks: string[] = [];
|
||||
let remaining = text;
|
||||
|
||||
while (remaining.length > MAX_TEXT_LENGTH) {
|
||||
let splitAt = -1;
|
||||
|
||||
// Try splitting at a paragraph boundary
|
||||
const paraIdx = remaining.lastIndexOf('\n\n', MAX_TEXT_LENGTH);
|
||||
if (paraIdx > MAX_TEXT_LENGTH * 0.3) {
|
||||
splitAt = paraIdx + 2; // include the double newline in the first chunk
|
||||
}
|
||||
|
||||
// Fall back to single newline
|
||||
if (splitAt < 0) {
|
||||
const lineIdx = remaining.lastIndexOf('\n', MAX_TEXT_LENGTH);
|
||||
if (lineIdx > MAX_TEXT_LENGTH * 0.3) {
|
||||
splitAt = lineIdx + 1;
|
||||
}
|
||||
}
|
||||
|
||||
// Hard split as last resort
|
||||
if (splitAt < 0) {
|
||||
splitAt = MAX_TEXT_LENGTH;
|
||||
}
|
||||
|
||||
chunks.push(remaining.substring(0, splitAt));
|
||||
remaining = remaining.substring(splitAt);
|
||||
}
|
||||
|
||||
if (remaining) chunks.push(remaining);
|
||||
return chunks;
|
||||
}
|
||||
|
||||
@@ -16,16 +16,9 @@ import { SessionStore } from './session-store.js';
|
||||
import {
|
||||
A2ABridgeClient,
|
||||
type A2AStreamEventData,
|
||||
extractIdsFromResponse,
|
||||
extractAllParts,
|
||||
extractTextFromParts,
|
||||
} from './a2a-bridge-client.js';
|
||||
import { ChatApiClient } from './chat-api-client.js';
|
||||
import {
|
||||
renderResponse,
|
||||
extractFromStreamEvent,
|
||||
extractToolApprovals,
|
||||
} from './response-renderer.js';
|
||||
import { renderResponse, extractFromStreamEvent } from './response-renderer.js';
|
||||
import { logger } from '../utils/logger.js';
|
||||
|
||||
const TERMINAL_STATES = new Set([
|
||||
@@ -40,6 +33,8 @@ export class ChatBridgeHandler {
|
||||
private a2aClient: A2ABridgeClient;
|
||||
private chatApiClient: ChatApiClient;
|
||||
private initialized = false;
|
||||
/** Full webhook URL for card button actions (HTTP Add-ons need a URL, not a function name). */
|
||||
private webhookUrl: string | undefined;
|
||||
|
||||
constructor(
|
||||
private config: ChatBridgeConfig,
|
||||
@@ -52,6 +47,12 @@ export class ChatBridgeHandler {
|
||||
new ChatApiClient({
|
||||
serviceAccountKeyPath: config.serviceAccountKeyPath,
|
||||
});
|
||||
// For HTTP Add-ons, card button action.function must be a full HTTPS URL.
|
||||
// Set CHAT_WEBHOOK_URL env var to the bridge's public webhook endpoint.
|
||||
this.webhookUrl = process.env['CHAT_WEBHOOK_URL'] || undefined;
|
||||
if (this.webhookUrl) {
|
||||
logger.info(`[ChatBridge] Button action URL: ${this.webhookUrl}`);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -288,76 +289,104 @@ export class ChatBridgeHandler {
|
||||
session.asyncProcessing = true;
|
||||
|
||||
try {
|
||||
const response = await this.a2aClient.sendToolConfirmation(
|
||||
// Stream the tool confirmation to collect text as it arrives
|
||||
const stream = this.a2aClient.sendToolConfirmationStream(
|
||||
approval.callId,
|
||||
outcome,
|
||||
approval.taskId,
|
||||
{ contextId: session.contextId },
|
||||
);
|
||||
|
||||
let lastText = '';
|
||||
let lastTaskId: string | undefined;
|
||||
let lastContextId: string | undefined;
|
||||
let lastState: string | undefined;
|
||||
let sentResponse = false;
|
||||
|
||||
for await (const streamEvent of stream) {
|
||||
if (session.cancelled) break;
|
||||
|
||||
const extracted = extractFromStreamEvent(streamEvent);
|
||||
if (extracted.taskId) lastTaskId = extracted.taskId;
|
||||
if (extracted.contextId) lastContextId = extracted.contextId;
|
||||
if (extracted.state) lastState = extracted.state;
|
||||
if (extracted.text) lastText = extracted.text;
|
||||
|
||||
// Check for new tool approvals
|
||||
const pending = extracted.toolApprovals.filter(
|
||||
(a) => a.status === 'awaiting_approval',
|
||||
);
|
||||
if (pending.length > 0) {
|
||||
if (session.yoloMode) {
|
||||
// YOLO: auto-approve via streaming
|
||||
const autoResult = await this.autoApproveTools(
|
||||
session,
|
||||
pending,
|
||||
lastContextId,
|
||||
);
|
||||
if (autoResult.lastContextId)
|
||||
lastContextId = autoResult.lastContextId;
|
||||
if (autoResult.lastTaskId) lastTaskId = autoResult.lastTaskId;
|
||||
if (autoResult.lastState) lastState = autoResult.lastState;
|
||||
if (autoResult.text) lastText = autoResult.text;
|
||||
} else {
|
||||
// Non-YOLO: push approval card
|
||||
session.pendingToolApproval = {
|
||||
callId: pending[0].callId,
|
||||
taskId: pending[0].taskId,
|
||||
toolName: pending[0].displayName || pending[0].name,
|
||||
};
|
||||
// Build a minimal task response for the card renderer
|
||||
const cardResponse = renderResponse(
|
||||
{
|
||||
kind: 'task',
|
||||
id: pending[0].taskId,
|
||||
contextId: lastContextId ?? session.contextId,
|
||||
status: {
|
||||
state: 'input-required',
|
||||
timestamp: new Date().toISOString(),
|
||||
message:
|
||||
streamEvent.kind === 'status-update'
|
||||
? streamEvent.status?.message
|
||||
: undefined,
|
||||
},
|
||||
history: [],
|
||||
artifacts: [],
|
||||
},
|
||||
message.thread.threadKey || threadName,
|
||||
threadName,
|
||||
this.webhookUrl,
|
||||
);
|
||||
await this.chatApiClient.sendMessage(spaceName, threadName, {
|
||||
text: cardResponse.text,
|
||||
cardsV2: cardResponse.cardsV2,
|
||||
});
|
||||
sentResponse = true;
|
||||
logger.info(
|
||||
`[ChatBridge] Pushed approval card after confirmation: ${pending[0].displayName || pending[0].name}`,
|
||||
);
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (session.cancelled) return;
|
||||
|
||||
const { contextId: newCtxId, taskId: newTaskId } =
|
||||
extractIdsFromResponse(response);
|
||||
if (newCtxId) session.contextId = newCtxId;
|
||||
this.sessionStore.updateTaskId(threadName, newTaskId);
|
||||
|
||||
// Check for new pending approvals in the response
|
||||
const newApprovals = extractToolApprovals(response).filter(
|
||||
(a) => a.status === 'awaiting_approval',
|
||||
// Update session IDs
|
||||
if (lastContextId) session.contextId = lastContextId;
|
||||
const isTerminal = lastState ? TERMINAL_STATES.has(lastState) : false;
|
||||
this.sessionStore.updateTaskId(
|
||||
threadName,
|
||||
isTerminal ? undefined : lastTaskId,
|
||||
);
|
||||
|
||||
if (session.yoloMode && newApprovals.length > 0) {
|
||||
// YOLO: auto-approve any new tools
|
||||
const autoResult = await this.autoApproveTools(
|
||||
session,
|
||||
newApprovals,
|
||||
session.contextId,
|
||||
);
|
||||
if (autoResult.lastContextId)
|
||||
session.contextId = autoResult.lastContextId;
|
||||
if (autoResult.lastTaskId !== undefined) {
|
||||
const isTerminal = autoResult.lastState
|
||||
? TERMINAL_STATES.has(autoResult.lastState)
|
||||
: false;
|
||||
this.sessionStore.updateTaskId(
|
||||
threadName,
|
||||
isTerminal ? undefined : autoResult.lastTaskId,
|
||||
);
|
||||
}
|
||||
if (autoResult.text) {
|
||||
await this.chatApiClient.sendMessage(spaceName, threadName, {
|
||||
text: autoResult.text,
|
||||
});
|
||||
}
|
||||
} else if (newApprovals.length > 0) {
|
||||
// Non-YOLO: push new approval card
|
||||
session.pendingToolApproval = {
|
||||
callId: newApprovals[0].callId,
|
||||
taskId: newApprovals[0].taskId,
|
||||
toolName: newApprovals[0].displayName || newApprovals[0].name,
|
||||
};
|
||||
const rendered = renderResponse(
|
||||
response,
|
||||
message.thread.threadKey || threadName,
|
||||
threadName,
|
||||
);
|
||||
// Push final text if we haven't already pushed a card
|
||||
if (lastText && !sentResponse) {
|
||||
await this.chatApiClient.sendMessage(spaceName, threadName, {
|
||||
text: rendered.text,
|
||||
cardsV2: rendered.cardsV2,
|
||||
text: lastText,
|
||||
});
|
||||
logger.info(
|
||||
`[ChatBridge] Pushed new approval card after confirmation: ${newApprovals[0].displayName || newApprovals[0].name}`,
|
||||
);
|
||||
} else {
|
||||
// No more approvals — push the agent's response
|
||||
const rendered = renderResponse(response);
|
||||
const responseText = rendered.text || '_Agent completed._';
|
||||
await this.chatApiClient.sendMessage(spaceName, threadName, {
|
||||
text: responseText,
|
||||
});
|
||||
logger.info(
|
||||
`[ChatBridge] Pushed post-approval response (${responseText.length} chars)`,
|
||||
`[ChatBridge] Pushed post-approval response (${lastText.length} chars): "${lastText.substring(0, 200)}"`,
|
||||
);
|
||||
}
|
||||
} catch (error) {
|
||||
@@ -408,6 +437,19 @@ export class ChatBridgeHandler {
|
||||
let lastState: string | undefined;
|
||||
let sentFinalResponse = false;
|
||||
|
||||
let eventCount = 0;
|
||||
// Track the latest pending approvals across events — only act on them
|
||||
// when the server signals input-required (meaning it actually needs input).
|
||||
// In server YOLO mode, tools are auto-approved so the stream continues
|
||||
// past the brief 'awaiting_approval' status without hitting input-required.
|
||||
let latestPendingApprovals: Array<{
|
||||
callId: string;
|
||||
taskId: string;
|
||||
name: string;
|
||||
displayName: string;
|
||||
}> = [];
|
||||
let approvalStatusMessage: unknown;
|
||||
|
||||
for await (const streamEvent of stream) {
|
||||
// Check if session was cancelled (e.g. by /reset)
|
||||
if (session.cancelled) {
|
||||
@@ -417,42 +459,79 @@ export class ChatBridgeHandler {
|
||||
break;
|
||||
}
|
||||
|
||||
eventCount++;
|
||||
const extracted = extractFromStreamEvent(streamEvent);
|
||||
|
||||
if (extracted.taskId) lastTaskId = extracted.taskId;
|
||||
if (extracted.contextId) lastContextId = extracted.contextId;
|
||||
if (extracted.state) lastState = extracted.state;
|
||||
|
||||
// Check for tool approvals needing user input
|
||||
const pendingApprovals = extracted.toolApprovals.filter(
|
||||
// Log each event for debugging
|
||||
logger.info(
|
||||
`[ChatBridge] Stream event #${eventCount}: kind=${streamEvent.kind}, ` +
|
||||
`state=${extracted.state ?? 'n/a'}, text=${extracted.text.length} chars, ` +
|
||||
`approvals=${extracted.toolApprovals.filter((a) => a.status === 'awaiting_approval').length}`,
|
||||
);
|
||||
|
||||
// Track latest text content
|
||||
if (extracted.text) {
|
||||
lastText = extracted.text;
|
||||
}
|
||||
|
||||
// Track tool approvals — always update with latest state so
|
||||
// stale approvals are cleared when the server auto-approves.
|
||||
const pending = extracted.toolApprovals.filter(
|
||||
(a) => a.status === 'awaiting_approval',
|
||||
);
|
||||
if (pendingApprovals.length > 0) {
|
||||
// YOLO mode: auto-approve all tools without user interaction
|
||||
if (session.yoloMode) {
|
||||
const autoApproved = await this.autoApproveTools(
|
||||
session,
|
||||
pendingApprovals,
|
||||
lastContextId,
|
||||
);
|
||||
if (autoApproved.lastContextId)
|
||||
lastContextId = autoApproved.lastContextId;
|
||||
if (autoApproved.lastTaskId) lastTaskId = autoApproved.lastTaskId;
|
||||
if (autoApproved.lastState) lastState = autoApproved.lastState;
|
||||
if (autoApproved.text) lastText = autoApproved.text;
|
||||
// Auto-approval loop handles everything; break out of stream
|
||||
break;
|
||||
}
|
||||
latestPendingApprovals = pending;
|
||||
if (pending.length > 0) {
|
||||
approvalStatusMessage =
|
||||
streamEvent.kind === 'status-update'
|
||||
? streamEvent.status?.message
|
||||
: undefined;
|
||||
}
|
||||
|
||||
// On terminal or input-required state, stop streaming.
|
||||
// input-required means the server is asking for user action
|
||||
// (tool approval or follow-up message).
|
||||
if (
|
||||
extracted.state &&
|
||||
(TERMINAL_STATES.has(extracted.state) ||
|
||||
extracted.state === 'input-required')
|
||||
) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
logger.info(
|
||||
`[ChatBridge] Stream complete: ${eventCount} events, ` +
|
||||
`state=${lastState ?? 'none'}, text=${lastText.length} chars, ` +
|
||||
`pendingApprovals=${latestPendingApprovals.length}`,
|
||||
);
|
||||
|
||||
// Handle pending approvals (only relevant when server sent input-required)
|
||||
if (latestPendingApprovals.length > 0 && lastState === 'input-required') {
|
||||
if (session.yoloMode) {
|
||||
// Bridge YOLO mode: auto-approve all tools
|
||||
const autoApproved = await this.autoApproveTools(
|
||||
session,
|
||||
latestPendingApprovals,
|
||||
lastContextId,
|
||||
);
|
||||
if (autoApproved.lastContextId)
|
||||
lastContextId = autoApproved.lastContextId;
|
||||
if (autoApproved.lastTaskId) lastTaskId = autoApproved.lastTaskId;
|
||||
if (autoApproved.lastState) lastState = autoApproved.lastState;
|
||||
if (autoApproved.text) lastText = autoApproved.text;
|
||||
} else {
|
||||
// Non-YOLO: push approval card and wait for user input
|
||||
const firstApproval = pendingApprovals[0];
|
||||
const firstApproval = latestPendingApprovals[0];
|
||||
session.pendingToolApproval = {
|
||||
callId: firstApproval.callId,
|
||||
taskId: firstApproval.taskId,
|
||||
toolName: firstApproval.displayName || firstApproval.name,
|
||||
};
|
||||
|
||||
// Push tool approval card to Chat
|
||||
const approvalResponse = renderResponse(
|
||||
{
|
||||
kind: 'task',
|
||||
@@ -461,16 +540,17 @@ export class ChatBridgeHandler {
|
||||
status: {
|
||||
state: 'input-required',
|
||||
timestamp: new Date().toISOString(),
|
||||
message:
|
||||
streamEvent.kind === 'status-update'
|
||||
? streamEvent.status?.message
|
||||
: undefined,
|
||||
// eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion
|
||||
message: approvalStatusMessage as
|
||||
| import('@a2a-js/sdk').Message
|
||||
| undefined,
|
||||
},
|
||||
history: [],
|
||||
artifacts: [],
|
||||
},
|
||||
message.thread.threadKey || threadName,
|
||||
threadName,
|
||||
this.webhookUrl,
|
||||
);
|
||||
|
||||
await this.chatApiClient.sendMessage(spaceName, threadName, {
|
||||
@@ -482,25 +562,6 @@ export class ChatBridgeHandler {
|
||||
logger.info(
|
||||
`[ChatBridge] Pushed tool approval card: ${firstApproval.displayName || firstApproval.name}`,
|
||||
);
|
||||
// Break immediately — the server is waiting for the client to
|
||||
// respond to the approval. If we keep waiting for stream events,
|
||||
// asyncProcessing stays true and the user's "approve" message
|
||||
// hits the async guard.
|
||||
break;
|
||||
}
|
||||
|
||||
// Track latest text content
|
||||
if (extracted.text) {
|
||||
lastText = extracted.text;
|
||||
}
|
||||
|
||||
// On terminal or input-required state, stop streaming
|
||||
if (
|
||||
extracted.state &&
|
||||
(TERMINAL_STATES.has(extracted.state) ||
|
||||
extracted.state === 'input-required')
|
||||
) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -527,7 +588,7 @@ export class ChatBridgeHandler {
|
||||
text: lastText,
|
||||
});
|
||||
logger.info(
|
||||
`[ChatBridge] Pushed final response (${lastText.length} chars)`,
|
||||
`[ChatBridge] Pushed final response (${lastText.length} chars): "${lastText.substring(0, 200)}"`,
|
||||
);
|
||||
} else if (!lastText && !sentFinalResponse) {
|
||||
await this.chatApiClient.sendMessage(spaceName, threadName, {
|
||||
@@ -595,10 +656,10 @@ export class ChatBridgeHandler {
|
||||
}
|
||||
|
||||
/**
|
||||
* Auto-approves tool calls in YOLO mode.
|
||||
* Sends all pending approvals in a single batch message to avoid hanging
|
||||
* when the agent needs ALL tools approved before proceeding.
|
||||
* Loops if the response contains further approval requests.
|
||||
* Auto-approves tool calls in YOLO mode using streaming.
|
||||
* Sends approvals and collects streamed text from the SSE response.
|
||||
* The A2A server streams text incrementally and closes with final:true
|
||||
* at input-required (more tools) or completed (done).
|
||||
*/
|
||||
private async autoApproveTools(
|
||||
session: SessionInfo,
|
||||
@@ -621,20 +682,19 @@ export class ChatBridgeHandler {
|
||||
let lastState: string | undefined;
|
||||
let lastText: string | undefined;
|
||||
const approvedNames: string[] = [];
|
||||
const MAX_ROUNDS = 10;
|
||||
const MAX_ROUNDS = 20;
|
||||
|
||||
for (let round = 0; round < MAX_ROUNDS && !session.cancelled; round++) {
|
||||
if (approvalsToProcess.length === 0) break;
|
||||
|
||||
// Log what we're approving
|
||||
for (const a of approvalsToProcess) {
|
||||
const label = a.displayName || a.name;
|
||||
logger.info(`[ChatBridge] YOLO auto-approving: ${label}`);
|
||||
approvedNames.push(label);
|
||||
}
|
||||
|
||||
// Send ALL approvals in a single batch message
|
||||
const response = await this.a2aClient.sendBatchToolConfirmations(
|
||||
// Stream tool confirmations — text arrives incrementally via SSE
|
||||
const stream = this.a2aClient.sendBatchToolConfirmationsStream(
|
||||
approvalsToProcess.map((a) => ({
|
||||
callId: a.callId,
|
||||
outcome: 'proceed_once',
|
||||
@@ -643,28 +703,41 @@ export class ChatBridgeHandler {
|
||||
{ contextId: lastContextId ?? session.contextId },
|
||||
);
|
||||
|
||||
const { contextId: newCtxId, taskId: newTaskId } =
|
||||
extractIdsFromResponse(response);
|
||||
if (newCtxId) lastContextId = newCtxId;
|
||||
if (newTaskId) lastTaskId = newTaskId;
|
||||
approvalsToProcess = [];
|
||||
|
||||
if (response.kind === 'task' && response.status?.state) {
|
||||
lastState = response.status.state;
|
||||
// Consume the stream, collecting text and detecting new tool approvals
|
||||
let eventCount = 0;
|
||||
for await (const event of stream) {
|
||||
if (session.cancelled) break;
|
||||
eventCount++;
|
||||
|
||||
const extracted = extractFromStreamEvent(event);
|
||||
if (extracted.taskId) lastTaskId = extracted.taskId;
|
||||
if (extracted.contextId) lastContextId = extracted.contextId;
|
||||
if (extracted.state) lastState = extracted.state;
|
||||
if (extracted.text) lastText = extracted.text;
|
||||
|
||||
logger.info(
|
||||
`[ChatBridge] YOLO event #${eventCount}: kind=${event.kind}, ` +
|
||||
`state=${extracted.state ?? 'n/a'}, text=${extracted.text.length} chars`,
|
||||
);
|
||||
|
||||
// New tool approvals → break to send them in next round
|
||||
const pending = extracted.toolApprovals.filter(
|
||||
(a) => a.status === 'awaiting_approval',
|
||||
);
|
||||
if (pending.length > 0) {
|
||||
approvalsToProcess = pending;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// Extract text from this response
|
||||
const responseParts = extractAllParts(response);
|
||||
const responseText = extractTextFromParts(responseParts);
|
||||
if (responseText) lastText = responseText;
|
||||
|
||||
// Break if terminal
|
||||
if (lastState && TERMINAL_STATES.has(lastState)) break;
|
||||
|
||||
// Check for more pending approvals
|
||||
const newApprovals = extractToolApprovals(response).filter(
|
||||
(a) => a.status === 'awaiting_approval',
|
||||
logger.info(
|
||||
`[ChatBridge] YOLO round ${round}: state=${lastState ?? 'none'}, ` +
|
||||
`text=${lastText?.length ?? 0} chars, newApprovals=${approvalsToProcess.length}`,
|
||||
);
|
||||
approvalsToProcess = newApprovals;
|
||||
|
||||
if (lastState && TERMINAL_STATES.has(lastState)) break;
|
||||
}
|
||||
|
||||
logger.info(
|
||||
@@ -698,7 +771,10 @@ export class ChatBridgeHandler {
|
||||
`[ChatBridge] CARD_CLICKED: function=${action.actionMethodName}`,
|
||||
);
|
||||
|
||||
if (action.actionMethodName === 'tool_confirmation') {
|
||||
if (
|
||||
action.actionMethodName === 'tool_confirmation' ||
|
||||
action.actionMethodName === this.webhookUrl
|
||||
) {
|
||||
const params = action.parameters || [];
|
||||
const paramMap = new Map(params.map((p) => [p.key, p.value]));
|
||||
const callId = paramMap.get('callId');
|
||||
|
||||
@@ -69,6 +69,7 @@ export function renderResponse(
|
||||
response: A2AResponse,
|
||||
threadKey?: string,
|
||||
threadName?: string,
|
||||
webhookUrl?: string,
|
||||
): ChatResponse {
|
||||
const parts = extractAllParts(response);
|
||||
const textContent = extractTextFromParts(parts);
|
||||
@@ -94,7 +95,7 @@ export function renderResponse(
|
||||
// so we skip rendering approval cards for those.
|
||||
for (const approval of dedupedApprovals) {
|
||||
if (approval.status === 'awaiting_approval') {
|
||||
cards.push(renderToolApprovalCard(approval));
|
||||
cards.push(renderToolApprovalCard(approval, webhookUrl));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -353,7 +354,10 @@ function extractCommandSummary(approval: ToolApprovalInfo): string {
|
||||
* Renders a tool approval surface as a compact Google Chat Card V2
|
||||
* with clickable Approve/Reject buttons.
|
||||
*/
|
||||
function renderToolApprovalCard(approval: ToolApprovalInfo): ChatCardV2 {
|
||||
function renderToolApprovalCard(
|
||||
approval: ToolApprovalInfo,
|
||||
webhookUrl?: string,
|
||||
): ChatCardV2 {
|
||||
const widgets: ChatWidget[] = [];
|
||||
const toolLabel = approval.displayName || approval.name;
|
||||
|
||||
@@ -391,7 +395,7 @@ function renderToolApprovalCard(approval: ToolApprovalInfo): ChatCardV2 {
|
||||
text: 'Approve',
|
||||
onClick: {
|
||||
action: {
|
||||
function: 'tool_confirmation',
|
||||
function: webhookUrl ?? 'tool_confirmation',
|
||||
parameters: [
|
||||
{ key: 'callId', value: approval.callId },
|
||||
{ key: 'outcome', value: 'proceed_once' },
|
||||
@@ -404,7 +408,7 @@ function renderToolApprovalCard(approval: ToolApprovalInfo): ChatCardV2 {
|
||||
text: 'Always Allow',
|
||||
onClick: {
|
||||
action: {
|
||||
function: 'tool_confirmation',
|
||||
function: webhookUrl ?? 'tool_confirmation',
|
||||
parameters: [
|
||||
{ key: 'callId', value: approval.callId },
|
||||
{ key: 'outcome', value: 'proceed_always_tool' },
|
||||
@@ -417,7 +421,7 @@ function renderToolApprovalCard(approval: ToolApprovalInfo): ChatCardV2 {
|
||||
text: 'Reject',
|
||||
onClick: {
|
||||
action: {
|
||||
function: 'tool_confirmation',
|
||||
function: webhookUrl ?? 'tool_confirmation',
|
||||
parameters: [
|
||||
{ key: 'callId', value: approval.callId },
|
||||
{ key: 'outcome', value: 'cancel' },
|
||||
@@ -466,47 +470,50 @@ export function extractFromStreamEvent(event: A2AStreamEventData): {
|
||||
taskId = event.taskId;
|
||||
contextId = event.contextId;
|
||||
|
||||
// Extract parts from the status message
|
||||
const parts: Part[] = event.status?.message?.parts ?? [];
|
||||
const a2uiGroups = extractA2UIParts(parts);
|
||||
for (const messages of a2uiGroups) {
|
||||
parseA2UIMessages(messages, toolApprovals, agentResponses, thoughts);
|
||||
}
|
||||
|
||||
// Also extract plain text
|
||||
// Extract plain text FIRST (incremental chunks) so A2UI accumulated
|
||||
// text is added AFTER — backward iteration will prefer A2UI.
|
||||
const plainText = extractTextFromParts(parts);
|
||||
if (plainText) {
|
||||
agentResponses.push({ text: plainText, status: '' });
|
||||
}
|
||||
|
||||
const a2uiGroups = extractA2UIParts(parts);
|
||||
for (const messages of a2uiGroups) {
|
||||
parseA2UIMessages(messages, toolApprovals, agentResponses, thoughts);
|
||||
}
|
||||
} else if (event.kind === 'task') {
|
||||
state = event.status?.state;
|
||||
taskId = event.id;
|
||||
contextId = event.contextId;
|
||||
|
||||
const parts = extractAllParts(event);
|
||||
const a2uiGroups = extractA2UIParts(parts);
|
||||
for (const messages of a2uiGroups) {
|
||||
parseA2UIMessages(messages, toolApprovals, agentResponses, thoughts);
|
||||
}
|
||||
|
||||
const plainText = extractTextFromParts(parts);
|
||||
if (plainText) {
|
||||
agentResponses.push({ text: plainText, status: '' });
|
||||
}
|
||||
|
||||
const a2uiGroups = extractA2UIParts(parts);
|
||||
for (const messages of a2uiGroups) {
|
||||
parseA2UIMessages(messages, toolApprovals, agentResponses, thoughts);
|
||||
}
|
||||
} else if (event.kind === 'message') {
|
||||
contextId = event.contextId;
|
||||
taskId = event.taskId;
|
||||
|
||||
const parts: Part[] = event.parts ?? [];
|
||||
const a2uiGroups = extractA2UIParts(parts);
|
||||
for (const messages of a2uiGroups) {
|
||||
parseA2UIMessages(messages, toolApprovals, agentResponses, thoughts);
|
||||
}
|
||||
|
||||
const plainText = extractTextFromParts(parts);
|
||||
if (plainText) {
|
||||
agentResponses.push({ text: plainText, status: '' });
|
||||
}
|
||||
|
||||
const a2uiGroups = extractA2UIParts(parts);
|
||||
for (const messages of a2uiGroups) {
|
||||
parseA2UIMessages(messages, toolApprovals, agentResponses, thoughts);
|
||||
}
|
||||
}
|
||||
|
||||
// Build text from the last non-empty agent response
|
||||
|
||||
Reference in New Issue
Block a user