From c68303c55357d53a8ddd59d12a6a0cb78f13e050 Mon Sep 17 00:00:00 2001 From: Adam Weidman <65992621+adamfweidman@users.noreply.github.com> Date: Thu, 12 Mar 2026 15:25:51 -0400 Subject: [PATCH] fix(core): add proxy routing support for remote A2A subagents (#22199) --- .../src/agents/a2a-client-manager.test.ts | 47 ++++++++++++++++ .../core/src/agents/a2a-client-manager.ts | 53 +++++++++++++------ packages/core/src/agents/registry.ts | 4 +- 3 files changed, 85 insertions(+), 19 deletions(-) diff --git a/packages/core/src/agents/a2a-client-manager.test.ts b/packages/core/src/agents/a2a-client-manager.test.ts index 8cd3cc0830..aab0de5506 100644 --- a/packages/core/src/agents/a2a-client-manager.test.ts +++ b/packages/core/src/agents/a2a-client-manager.test.ts @@ -18,6 +18,8 @@ import { type AuthenticationHandler, type Client, } from '@a2a-js/sdk/client'; +import type { Config } from '../config/config.js'; +import { Agent as UndiciAgent, ProxyAgent } from 'undici'; import { debugLogger } from '../utils/debugLogger.js'; vi.mock('../utils/debugLogger.js', () => ({ @@ -117,6 +119,51 @@ describe('A2AClientManager', () => { expect(instance1).toBe(instance2); }); + describe('getInstance / dispatcher initialization', () => { + it('should use UndiciAgent when no proxy is configured', async () => { + await manager.loadAgent('TestAgent', 'http://test.agent/card'); + + const resolverOptions = vi.mocked(DefaultAgentCardResolver).mock + .calls[0][0]; + const cardFetch = resolverOptions?.fetchImpl as typeof fetch; + await cardFetch('http://test.agent/card'); + + const fetchCall = vi + .mocked(fetch) + .mock.calls.find((call) => call[0] === 'http://test.agent/card'); + expect(fetchCall).toBeDefined(); + expect( + (fetchCall![1] as { dispatcher?: unknown })?.dispatcher, + ).toBeInstanceOf(UndiciAgent); + expect( + (fetchCall![1] as { dispatcher?: unknown })?.dispatcher, + ).not.toBeInstanceOf(ProxyAgent); + }); + + it('should use ProxyAgent when a proxy is configured via Config', async () => { + A2AClientManager.resetInstanceForTesting(); + const mockConfig = { + getProxy: () => 'http://my-proxy:8080', + } as Config; + + manager = A2AClientManager.getInstance(mockConfig); + await manager.loadAgent('TestProxyAgent', 'http://test.proxy.agent/card'); + + const resolverOptions = vi.mocked(DefaultAgentCardResolver).mock + .calls[0][0]; + const cardFetch = resolverOptions?.fetchImpl as typeof fetch; + await cardFetch('http://test.proxy.agent/card'); + + const fetchCall = vi + .mocked(fetch) + .mock.calls.find((call) => call[0] === 'http://test.proxy.agent/card'); + expect(fetchCall).toBeDefined(); + expect( + (fetchCall![1] as { dispatcher?: unknown })?.dispatcher, + ).toBeInstanceOf(ProxyAgent); + }); + }); + describe('loadAgent', () => { it('should create and cache an A2AClient', async () => { const agentCard = await manager.loadAgent( diff --git a/packages/core/src/agents/a2a-client-manager.ts b/packages/core/src/agents/a2a-client-manager.ts index 1597502c80..7d558e7dbe 100644 --- a/packages/core/src/agents/a2a-client-manager.ts +++ b/packages/core/src/agents/a2a-client-manager.ts @@ -23,7 +23,8 @@ import { createAuthenticatingFetchWithRetry, } from '@a2a-js/sdk/client'; import { v4 as uuidv4 } from 'uuid'; -import { Agent as UndiciAgent } from 'undici'; +import { Agent as UndiciAgent, ProxyAgent } from 'undici'; +import type { Config } from '../config/config.js'; import { debugLogger } from '../utils/debugLogger.js'; import { safeLookup } from '../utils/fetch.js'; import { classifyAgentError } from './a2a-errors.js'; @@ -31,16 +32,6 @@ import { classifyAgentError } from './a2a-errors.js'; // Remote agents can take 10+ minutes (e.g. Deep Research). // Use a dedicated dispatcher so the global 5-min timeout isn't affected. const A2A_TIMEOUT = 1800000; // 30 minutes -const a2aDispatcher = new UndiciAgent({ - headersTimeout: A2A_TIMEOUT, - bodyTimeout: A2A_TIMEOUT, - connect: { - lookup: safeLookup, // SSRF protection at connection level - }, -}); -const a2aFetch: typeof fetch = (input, init) => - // eslint-disable-next-line no-restricted-syntax -- TODO: Migrate to safeFetch for SSRF protection - fetch(input, { ...init, dispatcher: a2aDispatcher } as RequestInit); export type SendMessageResult = | Message @@ -59,14 +50,39 @@ export class A2AClientManager { private clients = new Map(); private agentCards = new Map(); - private constructor() {} + private a2aDispatcher: UndiciAgent | ProxyAgent; + private a2aFetch: typeof fetch; + + private constructor(config?: Config) { + const proxyUrl = config?.getProxy(); + const agentOptions = { + headersTimeout: A2A_TIMEOUT, + bodyTimeout: A2A_TIMEOUT, + connect: { + lookup: safeLookup, // SSRF protection at connection level + }, + }; + + if (proxyUrl) { + this.a2aDispatcher = new ProxyAgent({ + uri: proxyUrl, + ...agentOptions, + }); + } else { + this.a2aDispatcher = new UndiciAgent(agentOptions); + } + + this.a2aFetch = (input, init) => + // eslint-disable-next-line no-restricted-syntax -- TODO: Migrate to safeFetch for SSRF protection + fetch(input, { ...init, dispatcher: this.a2aDispatcher } as RequestInit); + } /** * Gets the singleton instance of the A2AClientManager. */ - static getInstance(): A2AClientManager { + static getInstance(config?: Config): A2AClientManager { if (!A2AClientManager.instance) { - A2AClientManager.instance = new A2AClientManager(); + A2AClientManager.instance = new A2AClientManager(config); } return A2AClientManager.instance; } @@ -97,9 +113,12 @@ export class A2AClientManager { } // Authenticated fetch for API calls (transports). - let authFetch: typeof fetch = a2aFetch; + let authFetch: typeof fetch = this.a2aFetch; if (authHandler) { - authFetch = createAuthenticatingFetchWithRetry(a2aFetch, authHandler); + authFetch = createAuthenticatingFetchWithRetry( + this.a2aFetch, + authHandler, + ); } // Use unauthenticated fetch for the agent card unless explicitly required. @@ -109,7 +128,7 @@ export class A2AClientManager { init?: RequestInit, ): Promise => { // Try without auth first - const response = await a2aFetch(input, init); + const response = await this.a2aFetch(input, init); // Retry with auth if we hit a 401/403 if ((response.status === 401 || response.status === 403) && authFetch) { diff --git a/packages/core/src/agents/registry.ts b/packages/core/src/agents/registry.ts index b91fcad3ed..6eb642da72 100644 --- a/packages/core/src/agents/registry.ts +++ b/packages/core/src/agents/registry.ts @@ -69,7 +69,7 @@ export class AgentRegistry { * Clears the current registry and re-scans for agents. */ async reload(): Promise { - A2AClientManager.getInstance().clearCache(); + A2AClientManager.getInstance(this.config).clearCache(); await this.config.reloadAgents(); this.agents.clear(); this.allDefinitions.clear(); @@ -414,7 +414,7 @@ export class AgentRegistry { // Load the remote A2A agent card and register. try { - const clientManager = A2AClientManager.getInstance(); + const clientManager = A2AClientManager.getInstance(this.config); let authHandler: AuthenticationHandler | undefined; if (definition.auth) { const provider = await A2AAuthProviderFactory.create({