fix(core): improve remote agent streaming UI and UX (#23633)

This commit is contained in:
Adam Weidman
2026-03-24 11:34:04 -04:00
committed by GitHub
parent fc18768155
commit 91d756f391
5 changed files with 161 additions and 38 deletions

View File

@@ -153,7 +153,7 @@ export const SubagentProgressDisplay: React.FC<
})}
</Box>
{progress.state === 'completed' && progress.result && (
{progress.result && (
<Box flexDirection="column" marginTop={1}>
{progress.terminateReason && progress.terminateReason !== 'GOAL' && (
<Box marginBottom={1}>
@@ -164,7 +164,7 @@ export const SubagentProgressDisplay: React.FC<
)}
<MarkdownDisplay
text={safeJsonToMarkdown(progress.result)}
isPending={false}
isPending={progress.state !== 'completed'}
terminalWidth={terminalWidth}
/>
</Box>

View File

@@ -403,7 +403,7 @@ describe('a2aUtils', () => {
const output = reassembler.toString();
expect(output).toBe(
'Analyzing...\n\nProcessing...\n\nArtifact (Code):\nprint("Done")',
'Analyzing...Processing...\n\nArtifact (Code):\nprint("Done")',
);
});

View File

@@ -16,6 +16,7 @@ import type {
AgentInterface,
} from '@a2a-js/sdk';
import type { SendMessageResult } from './a2a-client-manager.js';
import type { SubagentActivityItem } from './types.js';
export const AUTH_REQUIRED_MSG = `[Authorization Required] The agent has indicated it requires authorization to proceed. Please follow the agent's instructions.`;
@@ -123,17 +124,39 @@ export class A2AResultReassembler {
private pushMessage(message: Message | undefined) {
if (!message) return;
const text = extractPartsText(message.parts, '\n');
const text = extractPartsText(message.parts, '');
if (text && this.messageLog[this.messageLog.length - 1] !== text) {
this.messageLog.push(text);
}
}
/**
* Returns an array of activity items representing the current reassembled state.
*/
toActivityItems(): SubagentActivityItem[] {
const isAuthRequired = this.messageLog.includes(AUTH_REQUIRED_MSG);
return [
isAuthRequired
? {
id: 'auth-required',
type: 'thought',
content: AUTH_REQUIRED_MSG,
status: 'running',
}
: {
id: 'pending',
type: 'thought',
content: 'Working...',
status: 'running',
},
];
}
/**
* Returns a human-readable string representation of the current reassembled state.
*/
toString(): string {
const joinedMessages = this.messageLog.join('\n\n');
const joinedMessages = this.messageLog.join('');
const artifactsOutput = Array.from(this.artifacts.keys())
.map((id) => {

View File

@@ -20,7 +20,7 @@ import {
type A2AClientManager,
} from './a2a-client-manager.js';
import type { RemoteAgentDefinition } from './types.js';
import type { RemoteAgentDefinition, SubagentProgress } from './types.js';
import { createMockMessageBus } from '../test-utils/mock-message-bus.js';
import { A2AAuthProviderFactory } from './auth-provider/factory.js';
import type { A2AAuthProvider } from './auth-provider/types.js';
@@ -266,9 +266,11 @@ describe('RemoteAgentInvocation', () => {
);
const result = await invocation.execute(new AbortController().signal);
expect(result.error?.message).toContain(
"Failed to create auth provider for agent 'test-agent'",
);
expect(result.returnDisplay).toMatchObject({
result: expect.stringContaining(
"Failed to create auth provider for agent 'test-agent'",
),
});
});
it('should not load the agent if already present', async () => {
@@ -325,7 +327,9 @@ describe('RemoteAgentInvocation', () => {
// Execute first time
const result1 = await invocation1.execute(new AbortController().signal);
expect(result1.returnDisplay).toBe('Response 1');
expect(result1.returnDisplay).toMatchObject({
result: 'Response 1',
});
expect(mockClientManager.sendMessageStream).toHaveBeenLastCalledWith(
'test-agent',
'first',
@@ -355,7 +359,9 @@ describe('RemoteAgentInvocation', () => {
mockMessageBus,
);
const result2 = await invocation2.execute(new AbortController().signal);
expect(result2.returnDisplay).toBe('Response 2');
expect((result2.returnDisplay as SubagentProgress).result).toBe(
'Response 2',
);
expect(mockClientManager.sendMessageStream).toHaveBeenLastCalledWith(
'test-agent',
@@ -444,8 +450,22 @@ describe('RemoteAgentInvocation', () => {
);
await invocation.execute(new AbortController().signal, updateOutput);
expect(updateOutput).toHaveBeenCalledWith('Hello');
expect(updateOutput).toHaveBeenCalledWith('Hello\n\nHello World');
expect(updateOutput).toHaveBeenCalledWith(
expect.objectContaining({
isSubagentProgress: true,
state: 'running',
recentActivity: expect.arrayContaining([
expect.objectContaining({ content: 'Working...' }),
]),
}),
);
expect(updateOutput).toHaveBeenCalledWith(
expect.objectContaining({
isSubagentProgress: true,
state: 'completed',
result: 'HelloHello World',
}),
);
});
it('should abort when signal is aborted during streaming', async () => {
@@ -478,8 +498,7 @@ describe('RemoteAgentInvocation', () => {
);
const result = await invocation.execute(controller.signal);
expect(result.error).toBeDefined();
expect(result.error?.message).toContain('Operation aborted');
expect(result.returnDisplay).toMatchObject({ state: 'error' });
});
it('should handle errors gracefully', async () => {
@@ -501,9 +520,10 @@ describe('RemoteAgentInvocation', () => {
);
const result = await invocation.execute(new AbortController().signal);
expect(result.error).toBeDefined();
expect(result.error?.message).toContain('Network error');
expect(result.returnDisplay).toContain('Network error');
expect(result.returnDisplay).toMatchObject({
state: 'error',
result: expect.stringContaining('Network error'),
});
});
it('should use a2a helpers for extracting text', async () => {
@@ -534,7 +554,9 @@ describe('RemoteAgentInvocation', () => {
const result = await invocation.execute(new AbortController().signal);
// Just check that text is present, exact formatting depends on helper
expect(result.returnDisplay).toContain('Extracted text');
expect((result.returnDisplay as SubagentProgress).result).toContain(
'Extracted text',
);
});
it('should handle mixed response types during streaming (TaskStatusUpdateEvent + Message)', async () => {
@@ -577,9 +599,25 @@ describe('RemoteAgentInvocation', () => {
updateOutput,
);
expect(updateOutput).toHaveBeenCalledWith('Thinking...');
expect(updateOutput).toHaveBeenCalledWith('Thinking...\n\nFinal Answer');
expect(result.returnDisplay).toBe('Thinking...\n\nFinal Answer');
expect(updateOutput).toHaveBeenCalledWith(
expect.objectContaining({
isSubagentProgress: true,
state: 'running',
recentActivity: expect.arrayContaining([
expect.objectContaining({ content: 'Working...' }),
]),
}),
);
expect(updateOutput).toHaveBeenCalledWith(
expect.objectContaining({
isSubagentProgress: true,
state: 'completed',
result: 'Thinking...Final Answer',
}),
);
expect(result.returnDisplay).toMatchObject({
result: 'Thinking...Final Answer',
});
});
it('should handle artifact reassembly with append: true', async () => {
@@ -635,12 +673,21 @@ describe('RemoteAgentInvocation', () => {
);
await invocation.execute(new AbortController().signal, updateOutput);
expect(updateOutput).toHaveBeenCalledWith('Generating...');
expect(updateOutput).toHaveBeenCalledWith(
'Generating...\n\nArtifact (Result):\nPart 1',
expect.objectContaining({
isSubagentProgress: true,
state: 'running',
recentActivity: expect.arrayContaining([
expect.objectContaining({ content: 'Working...' }),
]),
}),
);
expect(updateOutput).toHaveBeenCalledWith(
'Generating...\n\nArtifact (Result):\nPart 1 Part 2',
expect.objectContaining({
isSubagentProgress: true,
state: 'completed',
result: 'Generating...\n\nArtifact (Result):\nPart 1 Part 2',
}),
);
});
});
@@ -694,8 +741,10 @@ describe('RemoteAgentInvocation', () => {
);
const result = await invocation.execute(new AbortController().signal);
expect(result.error).toBeDefined();
expect(result.returnDisplay).toContain(a2aError.userMessage);
expect(result.returnDisplay).toMatchObject({ state: 'error' });
expect((result.returnDisplay as SubagentProgress).result).toContain(
a2aError.userMessage,
);
});
it('should use generic message for non-A2AAgentError errors', async () => {
@@ -712,8 +761,8 @@ describe('RemoteAgentInvocation', () => {
);
const result = await invocation.execute(new AbortController().signal);
expect(result.error).toBeDefined();
expect(result.returnDisplay).toContain(
expect(result.returnDisplay).toMatchObject({ state: 'error' });
expect((result.returnDisplay as SubagentProgress).result).toContain(
'Error calling remote agent: something unexpected',
);
});
@@ -741,10 +790,14 @@ describe('RemoteAgentInvocation', () => {
);
const result = await invocation.execute(new AbortController().signal);
expect(result.error).toBeDefined();
expect(result.returnDisplay).toMatchObject({ state: 'error' });
// Should contain both the partial output and the error message
expect(result.returnDisplay).toContain('Partial response');
expect(result.returnDisplay).toContain('connection reset');
expect(result.returnDisplay).toMatchObject({
result: expect.stringContaining('Partial response'),
});
expect(result.returnDisplay).toMatchObject({
result: expect.stringContaining('connection reset'),
});
});
});
});

View File

@@ -15,6 +15,7 @@ import {
type RemoteAgentInputs,
type RemoteAgentDefinition,
type AgentInputs,
type SubagentProgress,
} from './types.js';
import { type AgentLoopContext } from '../config/agent-loop-context.js';
import type { MessageBus } from '../confirmation-bus/message-bus.js';
@@ -25,7 +26,6 @@ import type {
import { extractIdsFromResponse, A2AResultReassembler } from './a2aUtils.js';
import type { AuthenticationHandler } from '@a2a-js/sdk/client';
import { debugLogger } from '../utils/debugLogger.js';
import { safeJsonToMarkdown } from '../utils/markdownUtils.js';
import type { AnsiOutput } from '../utils/terminalSerializer.js';
import { A2AAuthProviderFactory } from './auth-provider/factory.js';
import { A2AAgentError } from './a2a-errors.js';
@@ -125,13 +125,30 @@ export class RemoteAgentInvocation extends BaseToolInvocation<
async execute(
_signal: AbortSignal,
updateOutput?: (output: string | AnsiOutput) => void,
updateOutput?: (output: string | AnsiOutput | SubagentProgress) => void,
): Promise<ToolResult> {
// 1. Ensure the agent is loaded (cached by manager)
// We assume the user has provided an access token via some mechanism (TODO),
// or we rely on ADC.
const reassembler = new A2AResultReassembler();
const agentName = this.definition.displayName ?? this.definition.name;
try {
if (updateOutput) {
updateOutput({
isSubagentProgress: true,
agentName,
state: 'running',
recentActivity: [
{
id: 'pending',
type: 'thought',
content: 'Working...',
status: 'running',
},
],
});
}
const priorState = RemoteAgentInvocation.sessionState.get(
this.definition.name,
);
@@ -172,7 +189,13 @@ export class RemoteAgentInvocation extends BaseToolInvocation<
reassembler.update(chunk);
if (updateOutput) {
updateOutput(reassembler.toString());
updateOutput({
isSubagentProgress: true,
agentName,
state: 'running',
recentActivity: reassembler.toActivityItems(),
result: reassembler.toString(),
});
}
const {
@@ -198,9 +221,21 @@ export class RemoteAgentInvocation extends BaseToolInvocation<
`[RemoteAgent] Final response from ${this.definition.name}:\n${JSON.stringify(finalResponse, null, 2)}`,
);
const finalProgress: SubagentProgress = {
isSubagentProgress: true,
agentName,
state: 'completed',
result: finalOutput,
recentActivity: reassembler.toActivityItems(),
};
if (updateOutput) {
updateOutput(finalProgress);
}
return {
llmContent: [{ text: finalOutput }],
returnDisplay: safeJsonToMarkdown(finalOutput),
returnDisplay: finalProgress,
};
} catch (error: unknown) {
const partialOutput = reassembler.toString();
@@ -209,10 +244,22 @@ export class RemoteAgentInvocation extends BaseToolInvocation<
const fullDisplay = partialOutput
? `${partialOutput}\n\n${errorMessage}`
: errorMessage;
const errorProgress: SubagentProgress = {
isSubagentProgress: true,
agentName,
state: 'error',
result: fullDisplay,
recentActivity: reassembler.toActivityItems(),
};
if (updateOutput) {
updateOutput(errorProgress);
}
return {
llmContent: [{ text: fullDisplay }],
returnDisplay: fullDisplay,
error: { message: errorMessage },
returnDisplay: errorProgress,
};
} finally {
// Persist state even on partial failures or aborts to maintain conversational continuity.