diff --git a/packages/a2a-server/src/chat-bridge/a2a-bridge-client.ts b/packages/a2a-server/src/chat-bridge/a2a-bridge-client.ts index 5a33874f3d..657689c8d9 100644 --- a/packages/a2a-server/src/chat-bridge/a2a-bridge-client.ts +++ b/packages/a2a-server/src/chat-bridge/a2a-bridge-client.ts @@ -164,8 +164,9 @@ export class A2ABridgeClient { } /** - * Sends a text message to the A2A server. - * Includes A2UI extension metadata so the server enables A2UI mode. + * Sends a text message to the A2A server using streaming. + * Uses streaming to capture all intermediate A2UI content from status-update + * events, since the final task state may not contain the response content. */ async sendMessage( text: string, @@ -188,12 +189,9 @@ export class A2ABridgeClient { extensions: [A2UI_EXTENSION_URI], }, }, - configuration: { - blocking: true, - }, }; - return this.client.sendMessage(params); + return this.collectStreamResponse(params); } /** @@ -244,11 +242,88 @@ export class A2ABridgeClient { extensions: [A2UI_EXTENSION_URI], }, }, - configuration: { - blocking: true, - }, }; - return this.client.sendMessage(params); + return this.collectStreamResponse(params); + } + + /** + * Sends a message via streaming and collects all A2UI parts from intermediate + * status-update events. The A2A server sends response content in "working" + * status updates, while the final event (e.g. "input-required") may be empty. + * Streaming captures everything. + */ + private async collectStreamResponse( + params: MessageSendParams, + ): Promise { + if (!this.client) { + throw new Error('A2A client not initialized.'); + } + + const stream = this.client.sendMessageStream(params); + const collectedParts: Part[] = []; + let latestTaskId: string | undefined; + let latestContextId: string | undefined; + let latestState = 'working'; + + for await (const event of stream) { + switch (event.kind) { + case 'status-update': + latestTaskId = event.taskId; + latestContextId = event.contextId; + latestState = event.status.state; + if (event.status.message?.parts) { + collectedParts.push(...event.status.message.parts); + } + break; + case 'artifact-update': + latestTaskId = event.taskId; + latestContextId = event.contextId; + if (event.artifact?.parts) { + collectedParts.push(...event.artifact.parts); + } + break; + case 'task': + // Full task response from stream - augment with collected parts + if ( + collectedParts.length > 0 && + event.status && + !event.status.message?.parts?.length + ) { + event.status.message = { + kind: 'message', + role: 'agent', + messageId: uuidv4(), + parts: collectedParts, + }; + } + return event; + case 'message': + // Full message response from stream - augment with collected parts + if (collectedParts.length > 0 && !event.parts?.length) { + event.parts = collectedParts; + } + return event; + default: + break; + } + } + + // Stream ended with only status-update/artifact-update events. + // Build a synthetic Message with all collected parts. + logger.info( + `[ChatBridge] Stream completed: taskId=${latestTaskId}, state=${latestState}, parts=${collectedParts.length}`, + ); + + // Return as a Message containing all collected A2UI parts + const response: Message = { + kind: 'message', + role: 'agent', + messageId: uuidv4(), + parts: collectedParts, + contextId: latestContextId, + taskId: latestTaskId, + }; + return response; } }