From 25aa217306f96baa792bf01f4a105c9862da6c8b Mon Sep 17 00:00:00 2001 From: Alisa Novikova <62909685+alisa-alisa@users.noreply.github.com> Date: Tue, 10 Mar 2026 11:18:28 -0700 Subject: [PATCH] refactor(core): address PR comments on a2aUtils and tests --- packages/core/src/agents/a2aUtils.test.ts | 47 +++-- packages/core/src/agents/a2aUtils.ts | 244 +++++++++++----------- 2 files changed, 156 insertions(+), 135 deletions(-) diff --git a/packages/core/src/agents/a2aUtils.test.ts b/packages/core/src/agents/a2aUtils.test.ts index 4514c36d9f..c3fe170aa5 100644 --- a/packages/core/src/agents/a2aUtils.test.ts +++ b/packages/core/src/agents/a2aUtils.test.ts @@ -4,7 +4,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -import { describe, it, expect, vi, beforeEach } from 'vitest'; +import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest'; import { extractMessageText, extractIdsFromResponse, @@ -27,6 +27,7 @@ import type { TaskArtifactUpdateEvent, } from '@a2a-js/sdk'; import * as dnsPromises from 'node:dns/promises'; +import type { LookupAddress } from 'node:dns'; vi.mock('node:dns/promises', () => ({ lookup: vi.fn(), @@ -37,6 +38,10 @@ describe('a2aUtils', () => { vi.clearAllMocks(); }); + afterEach(() => { + vi.restoreAllMocks(); + }); + describe('getGrpcCredentials', () => { it('should return secure credentials for https', () => { const credentials = getGrpcCredentials('https://test.agent'); @@ -51,10 +56,12 @@ describe('a2aUtils', () => { describe('pinUrlToIp', () => { it('should resolve and pin hostname to IP', async () => { - vi.mocked(dnsPromises.lookup).mockResolvedValue([ - { address: '93.184.216.34', family: 4 }, - // eslint-disable-next-line @typescript-eslint/no-explicit-any - ] as any); + vi.mocked( + dnsPromises.lookup as unknown as ( + hostname: string, + options: { all: true }, + ) => Promise, + ).mockResolvedValue([{ address: '93.184.216.34', family: 4 }]); const { pinnedUrl, hostname } = await pinUrlToIp( 'http://example.com:9000', @@ -65,10 +72,12 @@ describe('a2aUtils', () => { }); it('should handle raw host:port strings (standard for gRPC)', async () => { - vi.mocked(dnsPromises.lookup).mockResolvedValue([ - { address: '93.184.216.34', family: 4 }, - // eslint-disable-next-line @typescript-eslint/no-explicit-any - ] as any); + vi.mocked( + dnsPromises.lookup as unknown as ( + hostname: string, + options: { all: true }, + ) => Promise, + ).mockResolvedValue([{ address: '93.184.216.34', family: 4 }]); const { pinnedUrl, hostname } = await pinUrlToIp( 'example.com:9000', @@ -87,10 +96,12 @@ describe('a2aUtils', () => { }); it('should throw error if resolved to private IP', async () => { - vi.mocked(dnsPromises.lookup).mockResolvedValue([ - { address: '10.0.0.1', family: 4 }, - // eslint-disable-next-line @typescript-eslint/no-explicit-any - ] as any); + vi.mocked( + dnsPromises.lookup as unknown as ( + hostname: string, + options: { all: true }, + ) => Promise, + ).mockResolvedValue([{ address: '10.0.0.1', family: 4 }]); await expect( pinUrlToIp('http://malicious.com', 'test-agent'), @@ -98,10 +109,12 @@ describe('a2aUtils', () => { }); it('should allow localhost/127.0.0.1/::1 exceptions', async () => { - vi.mocked(dnsPromises.lookup).mockResolvedValue([ - { address: '127.0.0.1', family: 4 }, - // eslint-disable-next-line @typescript-eslint/no-explicit-any - ] as any); + vi.mocked( + dnsPromises.lookup as unknown as ( + hostname: string, + options: { all: true }, + ) => Promise, + ).mockResolvedValue([{ address: '127.0.0.1', family: 4 }]); const { pinnedUrl, hostname } = await pinUrlToIp( 'http://localhost:9000', diff --git a/packages/core/src/agents/a2aUtils.ts b/packages/core/src/agents/a2aUtils.ts index 4725f6357f..ec8b36bba1 100644 --- a/packages/core/src/agents/a2aUtils.ts +++ b/packages/core/src/agents/a2aUtils.ts @@ -6,6 +6,7 @@ import * as grpc from '@grpc/grpc-js'; import { lookup } from 'node:dns/promises'; +import { z } from 'zod'; import type { Message, Part, @@ -14,17 +15,40 @@ import type { FilePart, Artifact, TaskState, - TaskStatusUpdateEvent, - TaskArtifactUpdateEvent, AgentCard, AgentInterface, - Task, } from '@a2a-js/sdk'; import { isAddressPrivate } from '../utils/fetch.js'; import type { SendMessageResult } from './a2a-client-manager.js'; export const AUTH_REQUIRED_MSG = `[Authorization Required] The agent has indicated it requires authorization to proceed. Please follow the agent's instructions.`; +const AgentInterfaceSchema = z + .object({ + url: z.string().default(''), + transport: z.string().optional(), + protocolBinding: z.string().optional(), + }) + .passthrough(); + +const AgentCardSchema = z + .object({ + name: z.string().default('unknown'), + description: z.string().default(''), + url: z.string().default(''), + version: z.string().default(''), + protocolVersion: z.string().default(''), + capabilities: z.record(z.unknown()).default({}), + skills: z.array(z.union([z.string(), z.record(z.unknown())])).default([]), + defaultInputModes: z.array(z.string()).default([]), + defaultOutputModes: z.array(z.string()).default([]), + + additionalInterfaces: z.array(AgentInterfaceSchema).optional(), + supportedInterfaces: z.array(AgentInterfaceSchema).optional(), + preferredTransport: z.string().optional(), + }) + .passthrough(); + /** * Reassembles incremental A2A streaming updates into a coherent result. * Shows sequential status/messages followed by all reassembled artifacts. @@ -40,68 +64,79 @@ export class A2AResultReassembler { update(chunk: SendMessageResult) { if (!('kind' in chunk)) return; - if (isStatusUpdateEvent(chunk)) { - this.appendStateInstructions(chunk.status?.state); - this.pushMessage(chunk.status?.message); - } else if (isArtifactUpdateEvent(chunk)) { - if (chunk.artifact) { - const id = chunk.artifact.artifactId; - const existing = this.artifacts.get(id); + switch (chunk.kind) { + case 'status-update': + this.appendStateInstructions(chunk.status?.state); + this.pushMessage(chunk.status?.message); + break; - if (chunk.append && existing) { - for (const part of chunk.artifact.parts) { - existing.parts.push(structuredClone(part)); + case 'artifact-update': + if (chunk.artifact) { + const id = chunk.artifact.artifactId; + const existing = this.artifacts.get(id); + + if (chunk.append && existing) { + for (const part of chunk.artifact.parts) { + existing.parts.push(structuredClone(part)); + } + } else { + this.artifacts.set(id, structuredClone(chunk.artifact)); } - } else { - this.artifacts.set(id, structuredClone(chunk.artifact)); - } - const newText = extractPartsText(chunk.artifact.parts, ''); - let chunks = this.artifactChunks.get(id); - if (!chunks) { - chunks = []; - this.artifactChunks.set(id, chunks); + const newText = extractPartsText(chunk.artifact.parts, ''); + let chunks = this.artifactChunks.get(id); + if (!chunks) { + chunks = []; + this.artifactChunks.set(id, chunks); + } + if (chunk.append) { + chunks.push(newText); + } else { + chunks.length = 0; + chunks.push(newText); + } } - if (chunk.append) { - chunks.push(newText); - } else { - chunks.length = 0; - chunks.push(newText); + break; + + case 'task': + this.appendStateInstructions(chunk.status?.state); + this.pushMessage(chunk.status?.message); + if (chunk.artifacts) { + for (const art of chunk.artifacts) { + this.artifacts.set(art.artifactId, structuredClone(art)); + this.artifactChunks.set(art.artifactId, [ + extractPartsText(art.parts, ''), + ]); + } } - } - } else if (isTask(chunk)) { - this.appendStateInstructions(chunk.status?.state); - this.pushMessage(chunk.status?.message); - if (chunk.artifacts) { - for (const art of chunk.artifacts) { - this.artifacts.set(art.artifactId, structuredClone(art)); - this.artifactChunks.set(art.artifactId, [ - extractPartsText(art.parts, ''), - ]); + // History Fallback: Some agent implementations do not populate the + // status.message in their final terminal response, instead archiving + // the final answer in the task's history array. To ensure we don't + // present an empty result, we fallback to the most recent agent message + // in the history only when the task is terminal and no other content + // (message log or artifacts) has been reassembled. + if ( + isTerminalState(chunk.status?.state) && + this.messageLog.length === 0 && + this.artifacts.size === 0 && + chunk.history && + chunk.history.length > 0 + ) { + const lastAgentMsg = [...chunk.history] + .reverse() + .find((m) => m.role?.toLowerCase().includes('agent')); + if (lastAgentMsg) { + this.pushMessage(lastAgentMsg); + } } - } - // History Fallback: Some agent implementations do not populate the - // status.message in their final terminal response, instead archiving - // the final answer in the task's history array. To ensure we don't - // present an empty result, we fallback to the most recent agent message - // in the history only when the task is terminal and no other content - // (message log or artifacts) has been reassembled. - if ( - isTerminalState(chunk.status?.state) && - this.messageLog.length === 0 && - this.artifacts.size === 0 && - chunk.history && - chunk.history.length > 0 - ) { - const lastAgentMsg = [...chunk.history] - .reverse() - .find((m) => m.role?.toLowerCase().includes('agent')); - if (lastAgentMsg) { - this.pushMessage(lastAgentMsg); - } - } - } else if (isMessage(chunk)) { - this.pushMessage(chunk); + break; + + case 'message': + this.pushMessage(chunk); + break; + default: + // Handle unknown kinds gracefully + break; } } @@ -214,28 +249,20 @@ export function normalizeAgentCard(card: unknown): AgentCard { throw new Error('Agent card is missing.'); } - // Double-cast to bypass strict linter while bootstrapping the object. + // Use Zod to validate and parse the card, ensuring safe defaults and narrowing types. + const parsed = AgentCardSchema.parse(card); + // Narrowing to AgentCard interface after runtime validation. // eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion - const result = { ...card } as unknown as AgentCard; - - // Ensure mandatory fields exist with safe defaults. - if (typeof result.name !== 'string') result.name = 'unknown'; - if (typeof result.description !== 'string') result.description = ''; - if (typeof result.url !== 'string') result.url = ''; - if (typeof result.version !== 'string') result.version = ''; - if (typeof result.protocolVersion !== 'string') result.protocolVersion = ''; - if (!isObject(result.capabilities)) result.capabilities = {}; - if (!Array.isArray(result.skills)) result.skills = []; - if (!Array.isArray(result.defaultInputModes)) result.defaultInputModes = []; - if (!Array.isArray(result.defaultOutputModes)) result.defaultOutputModes = []; + const result = parsed as unknown as AgentCard; // Normalize interfaces and synchronize both interface fields. - const normalizedInterfaces = extractNormalizedInterfaces(card); + const normalizedInterfaces = extractNormalizedInterfaces(parsed); result.additionalInterfaces = normalizedInterfaces; + + // Sync supportedInterfaces for backward compatibility. // eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion - (result as unknown as Record)[ - 'supportedInterfaces' - ] = normalizedInterfaces; + const legacyResult = result as unknown as Record; + legacyResult['supportedInterfaces'] = normalizedInterfaces; // Fallback preferredTransport: If not specified, default to GRPC if available. if ( @@ -387,26 +414,33 @@ export function extractIdsFromResponse(result: SendMessageResult): { let taskId: string | undefined; let clearTaskId = false; - if ('kind' in result) { - const kind = result.kind; - if (kind === 'message' || isArtifactUpdateEvent(result)) { + if (!('kind' in result)) return { contextId, taskId, clearTaskId }; + + switch (result.kind) { + case 'message': + case 'artifact-update': taskId = result.taskId; contextId = result.contextId; - } else if (kind === 'task') { + break; + + case 'task': taskId = result.id; contextId = result.contextId; if (isTerminalState(result.status?.state)) { clearTaskId = true; } - } else if (isStatusUpdateEvent(result)) { + break; + + case 'status-update': taskId = result.taskId; contextId = result.contextId; - // Note: We ignore the 'final' flag here per A2A protocol best practices, - // as a stream can close while a task is still in a 'working' state. if (isTerminalState(result.status?.state)) { clearTaskId = true; } - } + break; + default: + // Handle other kind values if any + break; } return { contextId, taskId, clearTaskId }; @@ -430,26 +464,20 @@ function extractNormalizedInterfaces( const mapped: AgentInterface[] = []; for (const i of rawInterfaces) { if (isObject(i)) { - // Create a copy to preserve all original fields. + // Use schema to validate interface object. + const parsed = AgentInterfaceSchema.parse(i); + // Narrowing to AgentInterface after runtime validation. // eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion - const normalized = { ...i } as unknown as AgentInterface & { + const normalized = parsed as unknown as AgentInterface & { protocolBinding?: string; }; - // Ensure 'url' exists - if (typeof normalized.url !== 'string') { - normalized.url = ''; - } - - // Normalize 'transport' from 'protocolBinding' - const transport = normalized.transport || normalized.protocolBinding; - if (transport) { - normalized.transport = transport; + // Normalize 'transport' from 'protocolBinding' if missing. + if (!normalized.transport && normalized.protocolBinding) { + normalized.transport = normalized.protocolBinding; } // Robust URL: Ensure the URL has a scheme (except for gRPC). - // Some agent implementations (like a2a-go samples) may provide raw IP:port strings. - // gRPC targets MUST NOT have a scheme (e.g. 'http://'), or they will fail name resolution. if ( normalized.url && !normalized.url.includes('://') && @@ -460,7 +488,7 @@ function extractNormalizedInterfaces( normalized.url = `http://${normalized.url}`; } - mapped.push(normalized); + mapped.push(normalized as AgentInterface); } } return mapped; @@ -491,26 +519,6 @@ function isFilePart(part: Part): part is FilePart { return part.kind === 'file'; } -function isStatusUpdateEvent( - result: SendMessageResult, -): result is TaskStatusUpdateEvent { - return result.kind === 'status-update'; -} - -function isArtifactUpdateEvent( - result: SendMessageResult, -): result is TaskArtifactUpdateEvent { - return result.kind === 'artifact-update'; -} - -function isMessage(result: SendMessageResult): result is Message { - return result.kind === 'message'; -} - -function isTask(result: SendMessageResult): result is Task { - return result.kind === 'task'; -} - /** * Returns true if the given state is a terminal state for a task. */