From 470228e7a04ebe5ad82c4bba709e75fef883d0b4 Mon Sep 17 00:00:00 2001 From: Adam Weidman Date: Thu, 12 Feb 2026 11:10:50 -0500 Subject: [PATCH] fix: extract A2UI content from task history in blocking mode The blocking DefaultRequestHandler accumulates intermediate status-update events into task.history. The A2UI response content from "working" events lives there, while the final "input-required" status has no message. Updated extractAllParts to check history. Reverted to blocking mode since streaming had transport issues. --- .../src/chat-bridge/a2a-bridge-client.ts | 113 ++++-------------- 1 file changed, 26 insertions(+), 87 deletions(-) diff --git a/packages/a2a-server/src/chat-bridge/a2a-bridge-client.ts b/packages/a2a-server/src/chat-bridge/a2a-bridge-client.ts index 657689c8d9..7b37f464c3 100644 --- a/packages/a2a-server/src/chat-bridge/a2a-bridge-client.ts +++ b/packages/a2a-server/src/chat-bridge/a2a-bridge-client.ts @@ -58,7 +58,11 @@ export function extractIdsFromResponse(result: A2AResponse): { } /** - * Extracts all parts from an A2A response (from status message + artifacts). + * Extracts all parts from an A2A response. + * For Tasks, checks history (accumulated from intermediate status-update events), + * the final status message, and artifacts. The blocking DefaultRequestHandler + * accumulates intermediate events into task.history, so the A2UI response content + * from "working" events lives there even if the final status message is empty. */ export function extractAllParts(result: A2AResponse): Part[] { const parts: Part[] = []; @@ -66,7 +70,15 @@ export function extractAllParts(result: A2AResponse): Part[] { if (result.kind === 'message') { parts.push(...(result.parts ?? [])); } else if (result.kind === 'task') { - // Parts from the status message + // Parts from task history (accumulated intermediate status-update messages) + if (result.history) { + for (const msg of result.history) { + if (msg.parts) { + parts.push(...msg.parts); + } + } + } + // Parts from the final status message if (result.status?.message?.parts) { parts.push(...result.status.message.parts); } @@ -164,9 +176,10 @@ export class A2ABridgeClient { } /** - * Sends a text message to the A2A server using streaming. - * Uses streaming to capture all intermediate A2UI content from status-update - * events, since the final task state may not contain the response content. + * Sends a text message to the A2A server using blocking mode. + * The blocking DefaultRequestHandler accumulates all intermediate events + * (including A2UI content from "working" status updates) into the Task's + * history array, so extractAllParts can find them. */ async sendMessage( text: string, @@ -189,9 +202,12 @@ export class A2ABridgeClient { extensions: [A2UI_EXTENSION_URI], }, }, + configuration: { + blocking: true, + }, }; - return this.collectStreamResponse(params); + return this.client.sendMessage(params); } /** @@ -242,88 +258,11 @@ export class A2ABridgeClient { extensions: [A2UI_EXTENSION_URI], }, }, + configuration: { + blocking: true, + }, }; - return this.collectStreamResponse(params); - } - - /** - * Sends a message via streaming and collects all A2UI parts from intermediate - * status-update events. The A2A server sends response content in "working" - * status updates, while the final event (e.g. "input-required") may be empty. - * Streaming captures everything. - */ - private async collectStreamResponse( - params: MessageSendParams, - ): Promise { - if (!this.client) { - throw new Error('A2A client not initialized.'); - } - - const stream = this.client.sendMessageStream(params); - const collectedParts: Part[] = []; - let latestTaskId: string | undefined; - let latestContextId: string | undefined; - let latestState = 'working'; - - for await (const event of stream) { - switch (event.kind) { - case 'status-update': - latestTaskId = event.taskId; - latestContextId = event.contextId; - latestState = event.status.state; - if (event.status.message?.parts) { - collectedParts.push(...event.status.message.parts); - } - break; - case 'artifact-update': - latestTaskId = event.taskId; - latestContextId = event.contextId; - if (event.artifact?.parts) { - collectedParts.push(...event.artifact.parts); - } - break; - case 'task': - // Full task response from stream - augment with collected parts - if ( - collectedParts.length > 0 && - event.status && - !event.status.message?.parts?.length - ) { - event.status.message = { - kind: 'message', - role: 'agent', - messageId: uuidv4(), - parts: collectedParts, - }; - } - return event; - case 'message': - // Full message response from stream - augment with collected parts - if (collectedParts.length > 0 && !event.parts?.length) { - event.parts = collectedParts; - } - return event; - default: - break; - } - } - - // Stream ended with only status-update/artifact-update events. - // Build a synthetic Message with all collected parts. - logger.info( - `[ChatBridge] Stream completed: taskId=${latestTaskId}, state=${latestState}, parts=${collectedParts.length}`, - ); - - // Return as a Message containing all collected A2UI parts - const response: Message = { - kind: 'message', - role: 'agent', - messageId: uuidv4(), - parts: collectedParts, - contextId: latestContextId, - taskId: latestTaskId, - }; - return response; + return this.client.sendMessage(params); } }