feat: switch chat bridge to streaming for full A2UI content capture

Blocking mode only returns the final task state, missing intermediate
A2UI response content from working status-update events. Streaming
captures all events and aggregates parts into the response.
This commit is contained in:
Adam Weidman
2026-02-12 10:36:58 -05:00
parent 72ea38b306
commit b85a3bafe5

View File

@@ -164,8 +164,9 @@ export class A2ABridgeClient {
}
/**
* Sends a text message to the A2A server.
* Includes A2UI extension metadata so the server enables A2UI mode.
* Sends a text message to the A2A server using streaming.
* Uses streaming to capture all intermediate A2UI content from status-update
* events, since the final task state may not contain the response content.
*/
async sendMessage(
text: string,
@@ -188,12 +189,9 @@ export class A2ABridgeClient {
extensions: [A2UI_EXTENSION_URI],
},
},
configuration: {
blocking: true,
},
};
return this.client.sendMessage(params);
return this.collectStreamResponse(params);
}
/**
@@ -244,11 +242,88 @@ export class A2ABridgeClient {
extensions: [A2UI_EXTENSION_URI],
},
},
configuration: {
blocking: true,
},
};
return this.client.sendMessage(params);
return this.collectStreamResponse(params);
}
/**
* Sends a message via streaming and collects all A2UI parts from intermediate
* status-update events. The A2A server sends response content in "working"
* status updates, while the final event (e.g. "input-required") may be empty.
* Streaming captures everything.
*/
private async collectStreamResponse(
params: MessageSendParams,
): Promise<A2AResponse> {
if (!this.client) {
throw new Error('A2A client not initialized.');
}
const stream = this.client.sendMessageStream(params);
const collectedParts: Part[] = [];
let latestTaskId: string | undefined;
let latestContextId: string | undefined;
let latestState = 'working';
for await (const event of stream) {
switch (event.kind) {
case 'status-update':
latestTaskId = event.taskId;
latestContextId = event.contextId;
latestState = event.status.state;
if (event.status.message?.parts) {
collectedParts.push(...event.status.message.parts);
}
break;
case 'artifact-update':
latestTaskId = event.taskId;
latestContextId = event.contextId;
if (event.artifact?.parts) {
collectedParts.push(...event.artifact.parts);
}
break;
case 'task':
// Full task response from stream - augment with collected parts
if (
collectedParts.length > 0 &&
event.status &&
!event.status.message?.parts?.length
) {
event.status.message = {
kind: 'message',
role: 'agent',
messageId: uuidv4(),
parts: collectedParts,
};
}
return event;
case 'message':
// Full message response from stream - augment with collected parts
if (collectedParts.length > 0 && !event.parts?.length) {
event.parts = collectedParts;
}
return event;
default:
break;
}
}
// Stream ended with only status-update/artifact-update events.
// Build a synthetic Message with all collected parts.
logger.info(
`[ChatBridge] Stream completed: taskId=${latestTaskId}, state=${latestState}, parts=${collectedParts.length}`,
);
// Return as a Message containing all collected A2UI parts
const response: Message = {
kind: 'message',
role: 'agent',
messageId: uuidv4(),
parts: collectedParts,
contextId: latestContextId,
taskId: latestTaskId,
};
return response;
}
}