mirror of
https://github.com/google-gemini/gemini-cli.git
synced 2026-03-13 07:30:52 -07:00
fix: extract A2UI content from task history in blocking mode
The blocking DefaultRequestHandler accumulates intermediate status-update events into task.history. The A2UI response content from "working" events lives there, while the final "input-required" status has no message. Updated extractAllParts to check history. Reverted to blocking mode since streaming had transport issues.
This commit is contained in:
@@ -58,7 +58,11 @@ export function extractIdsFromResponse(result: A2AResponse): {
|
||||
}
|
||||
|
||||
/**
|
||||
* Extracts all parts from an A2A response (from status message + artifacts).
|
||||
* Extracts all parts from an A2A response.
|
||||
* For Tasks, checks history (accumulated from intermediate status-update events),
|
||||
* the final status message, and artifacts. The blocking DefaultRequestHandler
|
||||
* accumulates intermediate events into task.history, so the A2UI response content
|
||||
* from "working" events lives there even if the final status message is empty.
|
||||
*/
|
||||
export function extractAllParts(result: A2AResponse): Part[] {
|
||||
const parts: Part[] = [];
|
||||
@@ -66,7 +70,15 @@ export function extractAllParts(result: A2AResponse): Part[] {
|
||||
if (result.kind === 'message') {
|
||||
parts.push(...(result.parts ?? []));
|
||||
} else if (result.kind === 'task') {
|
||||
// Parts from the status message
|
||||
// Parts from task history (accumulated intermediate status-update messages)
|
||||
if (result.history) {
|
||||
for (const msg of result.history) {
|
||||
if (msg.parts) {
|
||||
parts.push(...msg.parts);
|
||||
}
|
||||
}
|
||||
}
|
||||
// Parts from the final status message
|
||||
if (result.status?.message?.parts) {
|
||||
parts.push(...result.status.message.parts);
|
||||
}
|
||||
@@ -164,9 +176,10 @@ export class A2ABridgeClient {
|
||||
}
|
||||
|
||||
/**
|
||||
* Sends a text message to the A2A server using streaming.
|
||||
* Uses streaming to capture all intermediate A2UI content from status-update
|
||||
* events, since the final task state may not contain the response content.
|
||||
* Sends a text message to the A2A server using blocking mode.
|
||||
* The blocking DefaultRequestHandler accumulates all intermediate events
|
||||
* (including A2UI content from "working" status updates) into the Task's
|
||||
* history array, so extractAllParts can find them.
|
||||
*/
|
||||
async sendMessage(
|
||||
text: string,
|
||||
@@ -189,9 +202,12 @@ export class A2ABridgeClient {
|
||||
extensions: [A2UI_EXTENSION_URI],
|
||||
},
|
||||
},
|
||||
configuration: {
|
||||
blocking: true,
|
||||
},
|
||||
};
|
||||
|
||||
return this.collectStreamResponse(params);
|
||||
return this.client.sendMessage(params);
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -242,88 +258,11 @@ export class A2ABridgeClient {
|
||||
extensions: [A2UI_EXTENSION_URI],
|
||||
},
|
||||
},
|
||||
configuration: {
|
||||
blocking: true,
|
||||
},
|
||||
};
|
||||
|
||||
return this.collectStreamResponse(params);
|
||||
}
|
||||
|
||||
/**
|
||||
* Sends a message via streaming and collects all A2UI parts from intermediate
|
||||
* status-update events. The A2A server sends response content in "working"
|
||||
* status updates, while the final event (e.g. "input-required") may be empty.
|
||||
* Streaming captures everything.
|
||||
*/
|
||||
private async collectStreamResponse(
|
||||
params: MessageSendParams,
|
||||
): Promise<A2AResponse> {
|
||||
if (!this.client) {
|
||||
throw new Error('A2A client not initialized.');
|
||||
}
|
||||
|
||||
const stream = this.client.sendMessageStream(params);
|
||||
const collectedParts: Part[] = [];
|
||||
let latestTaskId: string | undefined;
|
||||
let latestContextId: string | undefined;
|
||||
let latestState = 'working';
|
||||
|
||||
for await (const event of stream) {
|
||||
switch (event.kind) {
|
||||
case 'status-update':
|
||||
latestTaskId = event.taskId;
|
||||
latestContextId = event.contextId;
|
||||
latestState = event.status.state;
|
||||
if (event.status.message?.parts) {
|
||||
collectedParts.push(...event.status.message.parts);
|
||||
}
|
||||
break;
|
||||
case 'artifact-update':
|
||||
latestTaskId = event.taskId;
|
||||
latestContextId = event.contextId;
|
||||
if (event.artifact?.parts) {
|
||||
collectedParts.push(...event.artifact.parts);
|
||||
}
|
||||
break;
|
||||
case 'task':
|
||||
// Full task response from stream - augment with collected parts
|
||||
if (
|
||||
collectedParts.length > 0 &&
|
||||
event.status &&
|
||||
!event.status.message?.parts?.length
|
||||
) {
|
||||
event.status.message = {
|
||||
kind: 'message',
|
||||
role: 'agent',
|
||||
messageId: uuidv4(),
|
||||
parts: collectedParts,
|
||||
};
|
||||
}
|
||||
return event;
|
||||
case 'message':
|
||||
// Full message response from stream - augment with collected parts
|
||||
if (collectedParts.length > 0 && !event.parts?.length) {
|
||||
event.parts = collectedParts;
|
||||
}
|
||||
return event;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// Stream ended with only status-update/artifact-update events.
|
||||
// Build a synthetic Message with all collected parts.
|
||||
logger.info(
|
||||
`[ChatBridge] Stream completed: taskId=${latestTaskId}, state=${latestState}, parts=${collectedParts.length}`,
|
||||
);
|
||||
|
||||
// Return as a Message containing all collected A2UI parts
|
||||
const response: Message = {
|
||||
kind: 'message',
|
||||
role: 'agent',
|
||||
messageId: uuidv4(),
|
||||
parts: collectedParts,
|
||||
contextId: latestContextId,
|
||||
taskId: latestTaskId,
|
||||
};
|
||||
return response;
|
||||
return this.client.sendMessage(params);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user