diff --git a/packages/cli/src/ui/components/messages/SubagentProgressDisplay.tsx b/packages/cli/src/ui/components/messages/SubagentProgressDisplay.tsx index 5d1086c759..a84429cd10 100644 --- a/packages/cli/src/ui/components/messages/SubagentProgressDisplay.tsx +++ b/packages/cli/src/ui/components/messages/SubagentProgressDisplay.tsx @@ -153,7 +153,7 @@ export const SubagentProgressDisplay: React.FC< })} - {progress.state === 'completed' && progress.result && ( + {progress.result && ( {progress.terminateReason && progress.terminateReason !== 'GOAL' && ( @@ -164,7 +164,7 @@ export const SubagentProgressDisplay: React.FC< )} diff --git a/packages/core/src/agents/a2aUtils.test.ts b/packages/core/src/agents/a2aUtils.test.ts index 0dce551be4..f8416ae2ad 100644 --- a/packages/core/src/agents/a2aUtils.test.ts +++ b/packages/core/src/agents/a2aUtils.test.ts @@ -403,7 +403,7 @@ describe('a2aUtils', () => { const output = reassembler.toString(); expect(output).toBe( - 'Analyzing...\n\nProcessing...\n\nArtifact (Code):\nprint("Done")', + 'Analyzing...Processing...\n\nArtifact (Code):\nprint("Done")', ); }); diff --git a/packages/core/src/agents/a2aUtils.ts b/packages/core/src/agents/a2aUtils.ts index 70fc9cf557..b617082416 100644 --- a/packages/core/src/agents/a2aUtils.ts +++ b/packages/core/src/agents/a2aUtils.ts @@ -16,6 +16,7 @@ import type { AgentInterface, } from '@a2a-js/sdk'; import type { SendMessageResult } from './a2a-client-manager.js'; +import type { SubagentActivityItem } from './types.js'; export const AUTH_REQUIRED_MSG = `[Authorization Required] The agent has indicated it requires authorization to proceed. Please follow the agent's instructions.`; @@ -123,17 +124,39 @@ export class A2AResultReassembler { private pushMessage(message: Message | undefined) { if (!message) return; - const text = extractPartsText(message.parts, '\n'); + const text = extractPartsText(message.parts, ''); if (text && this.messageLog[this.messageLog.length - 1] !== text) { this.messageLog.push(text); } } + /** + * Returns an array of activity items representing the current reassembled state. + */ + toActivityItems(): SubagentActivityItem[] { + const isAuthRequired = this.messageLog.includes(AUTH_REQUIRED_MSG); + return [ + isAuthRequired + ? { + id: 'auth-required', + type: 'thought', + content: AUTH_REQUIRED_MSG, + status: 'running', + } + : { + id: 'pending', + type: 'thought', + content: 'Working...', + status: 'running', + }, + ]; + } + /** * Returns a human-readable string representation of the current reassembled state. */ toString(): string { - const joinedMessages = this.messageLog.join('\n\n'); + const joinedMessages = this.messageLog.join(''); const artifactsOutput = Array.from(this.artifacts.keys()) .map((id) => { diff --git a/packages/core/src/agents/remote-invocation.test.ts b/packages/core/src/agents/remote-invocation.test.ts index 870071b321..b5fdd4a4fa 100644 --- a/packages/core/src/agents/remote-invocation.test.ts +++ b/packages/core/src/agents/remote-invocation.test.ts @@ -20,7 +20,7 @@ import { type A2AClientManager, } from './a2a-client-manager.js'; -import type { RemoteAgentDefinition } from './types.js'; +import type { RemoteAgentDefinition, SubagentProgress } from './types.js'; import { createMockMessageBus } from '../test-utils/mock-message-bus.js'; import { A2AAuthProviderFactory } from './auth-provider/factory.js'; import type { A2AAuthProvider } from './auth-provider/types.js'; @@ -266,9 +266,11 @@ describe('RemoteAgentInvocation', () => { ); const result = await invocation.execute(new AbortController().signal); - expect(result.error?.message).toContain( - "Failed to create auth provider for agent 'test-agent'", - ); + expect(result.returnDisplay).toMatchObject({ + result: expect.stringContaining( + "Failed to create auth provider for agent 'test-agent'", + ), + }); }); it('should not load the agent if already present', async () => { @@ -325,7 +327,9 @@ describe('RemoteAgentInvocation', () => { // Execute first time const result1 = await invocation1.execute(new AbortController().signal); - expect(result1.returnDisplay).toBe('Response 1'); + expect(result1.returnDisplay).toMatchObject({ + result: 'Response 1', + }); expect(mockClientManager.sendMessageStream).toHaveBeenLastCalledWith( 'test-agent', 'first', @@ -355,7 +359,9 @@ describe('RemoteAgentInvocation', () => { mockMessageBus, ); const result2 = await invocation2.execute(new AbortController().signal); - expect(result2.returnDisplay).toBe('Response 2'); + expect((result2.returnDisplay as SubagentProgress).result).toBe( + 'Response 2', + ); expect(mockClientManager.sendMessageStream).toHaveBeenLastCalledWith( 'test-agent', @@ -444,8 +450,22 @@ describe('RemoteAgentInvocation', () => { ); await invocation.execute(new AbortController().signal, updateOutput); - expect(updateOutput).toHaveBeenCalledWith('Hello'); - expect(updateOutput).toHaveBeenCalledWith('Hello\n\nHello World'); + expect(updateOutput).toHaveBeenCalledWith( + expect.objectContaining({ + isSubagentProgress: true, + state: 'running', + recentActivity: expect.arrayContaining([ + expect.objectContaining({ content: 'Working...' }), + ]), + }), + ); + expect(updateOutput).toHaveBeenCalledWith( + expect.objectContaining({ + isSubagentProgress: true, + state: 'completed', + result: 'HelloHello World', + }), + ); }); it('should abort when signal is aborted during streaming', async () => { @@ -478,8 +498,7 @@ describe('RemoteAgentInvocation', () => { ); const result = await invocation.execute(controller.signal); - expect(result.error).toBeDefined(); - expect(result.error?.message).toContain('Operation aborted'); + expect(result.returnDisplay).toMatchObject({ state: 'error' }); }); it('should handle errors gracefully', async () => { @@ -501,9 +520,10 @@ describe('RemoteAgentInvocation', () => { ); const result = await invocation.execute(new AbortController().signal); - expect(result.error).toBeDefined(); - expect(result.error?.message).toContain('Network error'); - expect(result.returnDisplay).toContain('Network error'); + expect(result.returnDisplay).toMatchObject({ + state: 'error', + result: expect.stringContaining('Network error'), + }); }); it('should use a2a helpers for extracting text', async () => { @@ -534,7 +554,9 @@ describe('RemoteAgentInvocation', () => { const result = await invocation.execute(new AbortController().signal); // Just check that text is present, exact formatting depends on helper - expect(result.returnDisplay).toContain('Extracted text'); + expect((result.returnDisplay as SubagentProgress).result).toContain( + 'Extracted text', + ); }); it('should handle mixed response types during streaming (TaskStatusUpdateEvent + Message)', async () => { @@ -577,9 +599,25 @@ describe('RemoteAgentInvocation', () => { updateOutput, ); - expect(updateOutput).toHaveBeenCalledWith('Thinking...'); - expect(updateOutput).toHaveBeenCalledWith('Thinking...\n\nFinal Answer'); - expect(result.returnDisplay).toBe('Thinking...\n\nFinal Answer'); + expect(updateOutput).toHaveBeenCalledWith( + expect.objectContaining({ + isSubagentProgress: true, + state: 'running', + recentActivity: expect.arrayContaining([ + expect.objectContaining({ content: 'Working...' }), + ]), + }), + ); + expect(updateOutput).toHaveBeenCalledWith( + expect.objectContaining({ + isSubagentProgress: true, + state: 'completed', + result: 'Thinking...Final Answer', + }), + ); + expect(result.returnDisplay).toMatchObject({ + result: 'Thinking...Final Answer', + }); }); it('should handle artifact reassembly with append: true', async () => { @@ -635,12 +673,21 @@ describe('RemoteAgentInvocation', () => { ); await invocation.execute(new AbortController().signal, updateOutput); - expect(updateOutput).toHaveBeenCalledWith('Generating...'); expect(updateOutput).toHaveBeenCalledWith( - 'Generating...\n\nArtifact (Result):\nPart 1', + expect.objectContaining({ + isSubagentProgress: true, + state: 'running', + recentActivity: expect.arrayContaining([ + expect.objectContaining({ content: 'Working...' }), + ]), + }), ); expect(updateOutput).toHaveBeenCalledWith( - 'Generating...\n\nArtifact (Result):\nPart 1 Part 2', + expect.objectContaining({ + isSubagentProgress: true, + state: 'completed', + result: 'Generating...\n\nArtifact (Result):\nPart 1 Part 2', + }), ); }); }); @@ -694,8 +741,10 @@ describe('RemoteAgentInvocation', () => { ); const result = await invocation.execute(new AbortController().signal); - expect(result.error).toBeDefined(); - expect(result.returnDisplay).toContain(a2aError.userMessage); + expect(result.returnDisplay).toMatchObject({ state: 'error' }); + expect((result.returnDisplay as SubagentProgress).result).toContain( + a2aError.userMessage, + ); }); it('should use generic message for non-A2AAgentError errors', async () => { @@ -712,8 +761,8 @@ describe('RemoteAgentInvocation', () => { ); const result = await invocation.execute(new AbortController().signal); - expect(result.error).toBeDefined(); - expect(result.returnDisplay).toContain( + expect(result.returnDisplay).toMatchObject({ state: 'error' }); + expect((result.returnDisplay as SubagentProgress).result).toContain( 'Error calling remote agent: something unexpected', ); }); @@ -741,10 +790,14 @@ describe('RemoteAgentInvocation', () => { ); const result = await invocation.execute(new AbortController().signal); - expect(result.error).toBeDefined(); + expect(result.returnDisplay).toMatchObject({ state: 'error' }); // Should contain both the partial output and the error message - expect(result.returnDisplay).toContain('Partial response'); - expect(result.returnDisplay).toContain('connection reset'); + expect(result.returnDisplay).toMatchObject({ + result: expect.stringContaining('Partial response'), + }); + expect(result.returnDisplay).toMatchObject({ + result: expect.stringContaining('connection reset'), + }); }); }); }); diff --git a/packages/core/src/agents/remote-invocation.ts b/packages/core/src/agents/remote-invocation.ts index 0933ca026e..130f0f1a38 100644 --- a/packages/core/src/agents/remote-invocation.ts +++ b/packages/core/src/agents/remote-invocation.ts @@ -15,6 +15,7 @@ import { type RemoteAgentInputs, type RemoteAgentDefinition, type AgentInputs, + type SubagentProgress, } from './types.js'; import { type AgentLoopContext } from '../config/agent-loop-context.js'; import type { MessageBus } from '../confirmation-bus/message-bus.js'; @@ -25,7 +26,6 @@ import type { import { extractIdsFromResponse, A2AResultReassembler } from './a2aUtils.js'; import type { AuthenticationHandler } from '@a2a-js/sdk/client'; import { debugLogger } from '../utils/debugLogger.js'; -import { safeJsonToMarkdown } from '../utils/markdownUtils.js'; import type { AnsiOutput } from '../utils/terminalSerializer.js'; import { A2AAuthProviderFactory } from './auth-provider/factory.js'; import { A2AAgentError } from './a2a-errors.js'; @@ -125,13 +125,30 @@ export class RemoteAgentInvocation extends BaseToolInvocation< async execute( _signal: AbortSignal, - updateOutput?: (output: string | AnsiOutput) => void, + updateOutput?: (output: string | AnsiOutput | SubagentProgress) => void, ): Promise { // 1. Ensure the agent is loaded (cached by manager) // We assume the user has provided an access token via some mechanism (TODO), // or we rely on ADC. const reassembler = new A2AResultReassembler(); + const agentName = this.definition.displayName ?? this.definition.name; try { + if (updateOutput) { + updateOutput({ + isSubagentProgress: true, + agentName, + state: 'running', + recentActivity: [ + { + id: 'pending', + type: 'thought', + content: 'Working...', + status: 'running', + }, + ], + }); + } + const priorState = RemoteAgentInvocation.sessionState.get( this.definition.name, ); @@ -172,7 +189,13 @@ export class RemoteAgentInvocation extends BaseToolInvocation< reassembler.update(chunk); if (updateOutput) { - updateOutput(reassembler.toString()); + updateOutput({ + isSubagentProgress: true, + agentName, + state: 'running', + recentActivity: reassembler.toActivityItems(), + result: reassembler.toString(), + }); } const { @@ -198,9 +221,21 @@ export class RemoteAgentInvocation extends BaseToolInvocation< `[RemoteAgent] Final response from ${this.definition.name}:\n${JSON.stringify(finalResponse, null, 2)}`, ); + const finalProgress: SubagentProgress = { + isSubagentProgress: true, + agentName, + state: 'completed', + result: finalOutput, + recentActivity: reassembler.toActivityItems(), + }; + + if (updateOutput) { + updateOutput(finalProgress); + } + return { llmContent: [{ text: finalOutput }], - returnDisplay: safeJsonToMarkdown(finalOutput), + returnDisplay: finalProgress, }; } catch (error: unknown) { const partialOutput = reassembler.toString(); @@ -209,10 +244,22 @@ export class RemoteAgentInvocation extends BaseToolInvocation< const fullDisplay = partialOutput ? `${partialOutput}\n\n${errorMessage}` : errorMessage; + + const errorProgress: SubagentProgress = { + isSubagentProgress: true, + agentName, + state: 'error', + result: fullDisplay, + recentActivity: reassembler.toActivityItems(), + }; + + if (updateOutput) { + updateOutput(errorProgress); + } + return { llmContent: [{ text: fullDisplay }], - returnDisplay: fullDisplay, - error: { message: errorMessage }, + returnDisplay: errorProgress, }; } finally { // Persist state even on partial failures or aborts to maintain conversational continuity.