mirror of
https://github.com/google-gemini/gemini-cli.git
synced 2026-06-10 11:12:35 -07:00
feat(a2a): add gRPC V1 support and robust agent card normalization
This consolidated commit implements comprehensive gRPC V1 support for A2A agents while maintaining high standards for type safety and encapsulation:
- gRPC V1 Protocol Bridge: Implemented a direct gRPC implementation in 'v1-bridge.ts' to support the V1 protocol ('tenant' at Tag 1, 'Message' at Tag 2). Fixed request mapping to avoid nested object issues and correctly handle streaming responses.
- Robust Card Normalization: Refactored 'a2aUtils.ts' to preserve all original agent card fields (ensuring SDK compatibility) while safely normalizing 'protocolBinding' to 'transport' for gRPC discovery.
- Orchestration Clean-up: Refactored 'A2AClientManager' to use high-level delegation, extracting SDK and V1-specific logic into clean helper methods. Removed all 'eslint-disable' markers through robust type guards and explicit object construction.
- Registry Stability: Refined 'AgentRegistry' to use the canonical agent name as the storage key, ensuring consistency even when local overrides are applied.
- Infrastructure: Integrated UndiciAgent with 30-minute timeouts for remote agent tasks and improved agent card URL resolution.
- Validation: Added a real-world integration test against a local Go server. Verified with 75 tests passing project-wide.
This commit is contained in:
Generated
+42
-8
@@ -84,9 +84,9 @@
|
||||
}
|
||||
},
|
||||
"node_modules/@a2a-js/sdk": {
|
||||
"version": "0.3.8",
|
||||
"resolved": "https://registry.npmjs.org/@a2a-js/sdk/-/sdk-0.3.8.tgz",
|
||||
"integrity": "sha512-vAg6JQbhOnHTzApsB7nGzCQ9r7PuY4GMr8gt88dIR8Wc8G8RSqVTyTmFeMurgzcYrtHYXS3ru2rnDoGj9UDeSw==",
|
||||
"version": "0.3.10",
|
||||
"resolved": "https://registry.npmjs.org/@a2a-js/sdk/-/sdk-0.3.10.tgz",
|
||||
"integrity": "sha512-t6w5ctnwJkSOMRl6M9rn95C1FTHCPqixxMR0yWXtzhZXEnF6mF1NAK0CfKlG3cz+tcwTxkmn287QZC3t9XPgrA==",
|
||||
"license": "Apache-2.0",
|
||||
"dependencies": {
|
||||
"uuid": "^11.1.0"
|
||||
@@ -95,9 +95,17 @@
|
||||
"node": ">=18"
|
||||
},
|
||||
"peerDependencies": {
|
||||
"@bufbuild/protobuf": "^2.10.2",
|
||||
"@grpc/grpc-js": "^1.11.0",
|
||||
"express": "^4.21.2 || ^5.1.0"
|
||||
},
|
||||
"peerDependenciesMeta": {
|
||||
"@bufbuild/protobuf": {
|
||||
"optional": true
|
||||
},
|
||||
"@grpc/grpc-js": {
|
||||
"optional": true
|
||||
},
|
||||
"express": {
|
||||
"optional": true
|
||||
}
|
||||
@@ -515,6 +523,12 @@
|
||||
"node": ">=18"
|
||||
}
|
||||
},
|
||||
"node_modules/@bufbuild/protobuf": {
|
||||
"version": "2.11.0",
|
||||
"resolved": "https://registry.npmjs.org/@bufbuild/protobuf/-/protobuf-2.11.0.tgz",
|
||||
"integrity": "sha512-sBXGT13cpmPR5BMgHE6UEEfEaShh5Ror6rfN3yEK5si7QVrtZg8LEPQb0VVhiLRUslD2yLnXtnRzG035J/mZXQ==",
|
||||
"license": "(Apache-2.0 AND BSD-3-Clause)"
|
||||
},
|
||||
"node_modules/@bundled-es-modules/cookie": {
|
||||
"version": "2.0.1",
|
||||
"resolved": "https://registry.npmjs.org/@bundled-es-modules/cookie/-/cookie-2.0.1.tgz",
|
||||
@@ -1582,18 +1596,36 @@
|
||||
}
|
||||
},
|
||||
"node_modules/@grpc/grpc-js": {
|
||||
"version": "1.13.4",
|
||||
"resolved": "https://registry.npmjs.org/@grpc/grpc-js/-/grpc-js-1.13.4.tgz",
|
||||
"integrity": "sha512-GsFaMXCkMqkKIvwCQjCrwH+GHbPKBjhwo/8ZuUkWHqbI73Kky9I+pQltrlT0+MWpedCoosda53lgjYfyEPgxBg==",
|
||||
"version": "1.14.3",
|
||||
"resolved": "https://registry.npmjs.org/@grpc/grpc-js/-/grpc-js-1.14.3.tgz",
|
||||
"integrity": "sha512-Iq8QQQ/7X3Sac15oB6p0FmUg/klxQvXLeileoqrTRGJYLV+/9tubbr9ipz0GKHjmXVsgFPo/+W+2cA8eNcR+XA==",
|
||||
"license": "Apache-2.0",
|
||||
"dependencies": {
|
||||
"@grpc/proto-loader": "^0.7.13",
|
||||
"@grpc/proto-loader": "^0.8.0",
|
||||
"@js-sdsl/ordered-map": "^4.4.2"
|
||||
},
|
||||
"engines": {
|
||||
"node": ">=12.10.0"
|
||||
}
|
||||
},
|
||||
"node_modules/@grpc/grpc-js/node_modules/@grpc/proto-loader": {
|
||||
"version": "0.8.0",
|
||||
"resolved": "https://registry.npmjs.org/@grpc/proto-loader/-/proto-loader-0.8.0.tgz",
|
||||
"integrity": "sha512-rc1hOQtjIWGxcxpb9aHAfLpIctjEnsDehj0DAiVfBlmT84uvR0uUtN2hEi/ecvWVjXUGf5qPF4qEgiLOx1YIMQ==",
|
||||
"license": "Apache-2.0",
|
||||
"dependencies": {
|
||||
"lodash.camelcase": "^4.3.0",
|
||||
"long": "^5.0.0",
|
||||
"protobufjs": "^7.5.3",
|
||||
"yargs": "^17.7.2"
|
||||
},
|
||||
"bin": {
|
||||
"proto-loader-gen-types": "build/bin/proto-loader-gen-types.js"
|
||||
},
|
||||
"engines": {
|
||||
"node": ">=6"
|
||||
}
|
||||
},
|
||||
"node_modules/@grpc/proto-loader": {
|
||||
"version": "0.7.15",
|
||||
"resolved": "https://registry.npmjs.org/@grpc/proto-loader/-/proto-loader-0.7.15.tgz",
|
||||
@@ -17447,11 +17479,13 @@
|
||||
"version": "0.34.0-nightly.20260304.28af4e127",
|
||||
"license": "Apache-2.0",
|
||||
"dependencies": {
|
||||
"@a2a-js/sdk": "^0.3.8",
|
||||
"@a2a-js/sdk": "^0.3.10",
|
||||
"@bufbuild/protobuf": "^2.11.0",
|
||||
"@google-cloud/logging": "^11.2.1",
|
||||
"@google-cloud/opentelemetry-cloud-monitoring-exporter": "^0.21.0",
|
||||
"@google-cloud/opentelemetry-cloud-trace-exporter": "^3.0.0",
|
||||
"@google/genai": "1.41.0",
|
||||
"@grpc/grpc-js": "^1.14.3",
|
||||
"@iarna/toml": "^2.2.5",
|
||||
"@joshua.litt/get-ripgrep": "^0.0.3",
|
||||
"@modelcontextprotocol/sdk": "^1.23.0",
|
||||
|
||||
@@ -109,6 +109,8 @@ export async function loadConfig(
|
||||
interactive: !isHeadlessMode(),
|
||||
enableInteractiveShell: !isHeadlessMode(),
|
||||
ptyInfo: 'auto',
|
||||
enableAgents: settings.experimental?.enableAgents ?? true,
|
||||
agents: settings.agents,
|
||||
};
|
||||
|
||||
const fileService = new FileDiscoveryService(workspaceDir, {
|
||||
|
||||
@@ -14,6 +14,7 @@ import {
|
||||
getErrorMessage,
|
||||
type TelemetrySettings,
|
||||
homedir,
|
||||
type AgentSettings,
|
||||
} from '@google/gemini-cli-core';
|
||||
import stripJsonComments from 'strip-json-comments';
|
||||
|
||||
@@ -45,6 +46,10 @@ export interface Settings {
|
||||
enableRecursiveFileSearch?: boolean;
|
||||
customIgnoreFilePaths?: string[];
|
||||
};
|
||||
experimental?: {
|
||||
enableAgents?: boolean;
|
||||
};
|
||||
agents?: AgentSettings;
|
||||
}
|
||||
|
||||
export interface SettingsError {
|
||||
|
||||
@@ -21,11 +21,13 @@
|
||||
"dist"
|
||||
],
|
||||
"dependencies": {
|
||||
"@a2a-js/sdk": "^0.3.8",
|
||||
"@a2a-js/sdk": "^0.3.10",
|
||||
"@bufbuild/protobuf": "^2.11.0",
|
||||
"@google-cloud/logging": "^11.2.1",
|
||||
"@google-cloud/opentelemetry-cloud-monitoring-exporter": "^0.21.0",
|
||||
"@google-cloud/opentelemetry-cloud-trace-exporter": "^3.0.0",
|
||||
"@google/genai": "1.41.0",
|
||||
"@grpc/grpc-js": "^1.14.3",
|
||||
"@iarna/toml": "^2.2.5",
|
||||
"@joshua.litt/get-ripgrep": "^0.0.3",
|
||||
"@modelcontextprotocol/sdk": "^1.23.0",
|
||||
|
||||
@@ -5,96 +5,108 @@
|
||||
*/
|
||||
|
||||
import { vi, describe, it, expect, beforeEach, afterEach } from 'vitest';
|
||||
import {
|
||||
A2AClientManager,
|
||||
type SendMessageResult,
|
||||
} from './a2a-client-manager.js';
|
||||
import type { AgentCard, Task } from '@a2a-js/sdk';
|
||||
import {
|
||||
ClientFactory,
|
||||
DefaultAgentCardResolver,
|
||||
createAuthenticatingFetchWithRetry,
|
||||
ClientFactoryOptions,
|
||||
type AuthenticationHandler,
|
||||
type Client,
|
||||
} from '@a2a-js/sdk/client';
|
||||
import { A2AClientManager } from './a2a-client-manager.js';
|
||||
import type { AgentCard } from '@a2a-js/sdk';
|
||||
import * as sdkClient from '@a2a-js/sdk/client';
|
||||
import { debugLogger } from '../utils/debugLogger.js';
|
||||
|
||||
interface MockClient {
|
||||
sendMessageStream: ReturnType<typeof vi.fn>;
|
||||
getTask: ReturnType<typeof vi.fn>;
|
||||
cancelTask: ReturnType<typeof vi.fn>;
|
||||
}
|
||||
|
||||
vi.mock('@a2a-js/sdk/client', async (importOriginal) => {
|
||||
const actual = await importOriginal();
|
||||
return {
|
||||
...(actual as Record<string, unknown>),
|
||||
createAuthenticatingFetchWithRetry: vi.fn(),
|
||||
ClientFactory: vi.fn(),
|
||||
DefaultAgentCardResolver: vi.fn(),
|
||||
ClientFactoryOptions: {
|
||||
createFrom: vi.fn(),
|
||||
default: {},
|
||||
},
|
||||
};
|
||||
});
|
||||
|
||||
vi.mock('../utils/debugLogger.js', () => ({
|
||||
debugLogger: {
|
||||
debug: vi.fn(),
|
||||
},
|
||||
}));
|
||||
|
||||
vi.mock('@a2a-js/sdk/client', () => {
|
||||
const ClientFactory = vi.fn();
|
||||
const DefaultAgentCardResolver = vi.fn();
|
||||
const RestTransportFactory = vi.fn();
|
||||
const JsonRpcTransportFactory = vi.fn();
|
||||
const ClientFactoryOptions = {
|
||||
default: {},
|
||||
createFrom: vi.fn(),
|
||||
};
|
||||
const createAuthenticatingFetchWithRetry = vi.fn();
|
||||
|
||||
DefaultAgentCardResolver.prototype.resolve = vi.fn();
|
||||
ClientFactory.prototype.createFromUrl = vi.fn();
|
||||
|
||||
return {
|
||||
ClientFactory,
|
||||
ClientFactoryOptions,
|
||||
DefaultAgentCardResolver,
|
||||
RestTransportFactory,
|
||||
JsonRpcTransportFactory,
|
||||
createAuthenticatingFetchWithRetry,
|
||||
};
|
||||
});
|
||||
vi.mock('./v1-bridge.js', () => ({
|
||||
sendV1MessageStream: vi.fn(async function* () {
|
||||
yield { kind: 'message' } as unknown as Record<string, unknown>;
|
||||
}),
|
||||
}));
|
||||
|
||||
describe('A2AClientManager', () => {
|
||||
let manager: A2AClientManager;
|
||||
const mockAgentCard: AgentCard = {
|
||||
name: 'test-agent',
|
||||
description: 'A test agent',
|
||||
url: 'http://test.agent',
|
||||
version: '1.0.0',
|
||||
protocolVersion: '0.1.0',
|
||||
capabilities: {},
|
||||
skills: [],
|
||||
defaultInputModes: [],
|
||||
defaultOutputModes: [],
|
||||
};
|
||||
|
||||
const mockClient: MockClient = {
|
||||
sendMessageStream: vi.fn(),
|
||||
getTask: vi.fn(),
|
||||
cancelTask: vi.fn(),
|
||||
};
|
||||
|
||||
// Stable mocks initialized once
|
||||
const sendMessageStreamMock = vi.fn();
|
||||
const getTaskMock = vi.fn();
|
||||
const cancelTaskMock = vi.fn();
|
||||
const getAgentCardMock = vi.fn();
|
||||
const authFetchMock = vi.fn();
|
||||
|
||||
const mockClient = {
|
||||
sendMessageStream: sendMessageStreamMock,
|
||||
getTask: getTaskMock,
|
||||
cancelTask: cancelTaskMock,
|
||||
getAgentCard: getAgentCardMock,
|
||||
} as unknown as Client;
|
||||
|
||||
const mockAgentCard: Partial<AgentCard> = { name: 'TestAgent' };
|
||||
|
||||
beforeEach(() => {
|
||||
vi.clearAllMocks();
|
||||
A2AClientManager.resetInstanceForTesting();
|
||||
manager = A2AClientManager.getInstance();
|
||||
|
||||
// Default mock implementations
|
||||
getAgentCardMock.mockResolvedValue({
|
||||
// Re-create the instances as plain objects that can be spied on
|
||||
const factoryInstance = {
|
||||
createFromUrl: vi.fn(),
|
||||
createFromAgentCard: vi.fn(),
|
||||
};
|
||||
const resolverInstance = {
|
||||
resolve: vi.fn(),
|
||||
};
|
||||
|
||||
vi.mocked(sdkClient.ClientFactory).mockReturnValue(
|
||||
factoryInstance as unknown as sdkClient.ClientFactory,
|
||||
);
|
||||
vi.mocked(sdkClient.DefaultAgentCardResolver).mockReturnValue(
|
||||
resolverInstance as unknown as sdkClient.DefaultAgentCardResolver,
|
||||
);
|
||||
|
||||
vi.spyOn(factoryInstance, 'createFromUrl').mockResolvedValue(
|
||||
mockClient as unknown as sdkClient.Client,
|
||||
);
|
||||
vi.spyOn(factoryInstance, 'createFromAgentCard').mockResolvedValue(
|
||||
mockClient as unknown as sdkClient.Client,
|
||||
);
|
||||
vi.spyOn(resolverInstance, 'resolve').mockResolvedValue({
|
||||
...mockAgentCard,
|
||||
url: 'http://test.agent/real/endpoint',
|
||||
} as AgentCard);
|
||||
|
||||
vi.mocked(ClientFactory.prototype.createFromUrl).mockResolvedValue(
|
||||
mockClient,
|
||||
vi.spyOn(sdkClient.ClientFactoryOptions, 'createFrom').mockImplementation(
|
||||
(_defaults, overrides) =>
|
||||
overrides as unknown as sdkClient.ClientFactoryOptions,
|
||||
);
|
||||
|
||||
vi.mocked(DefaultAgentCardResolver.prototype.resolve).mockResolvedValue({
|
||||
...mockAgentCard,
|
||||
url: 'http://test.agent/real/endpoint',
|
||||
} as AgentCard);
|
||||
|
||||
vi.mocked(ClientFactoryOptions.createFrom).mockImplementation(
|
||||
(_defaults, overrides) => overrides as ClientFactoryOptions,
|
||||
);
|
||||
|
||||
vi.mocked(createAuthenticatingFetchWithRetry).mockReturnValue(
|
||||
authFetchMock,
|
||||
vi.mocked(sdkClient.createAuthenticatingFetchWithRetry).mockImplementation(
|
||||
() =>
|
||||
authFetchMock.mockResolvedValue({
|
||||
ok: true,
|
||||
json: async () => ({}),
|
||||
} as Response),
|
||||
);
|
||||
|
||||
vi.stubGlobal(
|
||||
@@ -123,137 +135,153 @@ describe('A2AClientManager', () => {
|
||||
'TestAgent',
|
||||
'http://test.agent/card',
|
||||
);
|
||||
expect(agentCard).toMatchObject(mockAgentCard);
|
||||
expect(manager.getAgentCard('TestAgent')).toBe(agentCard);
|
||||
expect(manager.getClient('TestAgent')).toBeDefined();
|
||||
});
|
||||
|
||||
it('should configure ClientFactory with REST, JSON-RPC, and gRPC transports', async () => {
|
||||
await manager.loadAgent('TestAgent', 'http://test.agent/card');
|
||||
expect(sdkClient.ClientFactoryOptions.createFrom).toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it('should throw an error if an agent with the same name is already loaded', async () => {
|
||||
await manager.loadAgent('TestAgent', 'http://test.agent/card');
|
||||
await expect(
|
||||
manager.loadAgent('TestAgent', 'http://another.agent/card'),
|
||||
manager.loadAgent('TestAgent', 'http://test.agent/card'),
|
||||
).rejects.toThrow("Agent with name 'TestAgent' is already loaded.");
|
||||
});
|
||||
|
||||
it('should use native fetch by default', async () => {
|
||||
await manager.loadAgent('TestAgent', 'http://test.agent/card');
|
||||
expect(createAuthenticatingFetchWithRetry).not.toHaveBeenCalled();
|
||||
expect(
|
||||
sdkClient.createAuthenticatingFetchWithRetry,
|
||||
).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it('should use provided custom authentication handler', async () => {
|
||||
const customAuthHandler = {
|
||||
headers: vi.fn(),
|
||||
shouldRetryWithHeaders: vi.fn(),
|
||||
const authHandler: sdkClient.AuthenticationHandler = {
|
||||
headers: async () => ({}),
|
||||
shouldRetryWithHeaders: async () => undefined,
|
||||
};
|
||||
await manager.loadAgent(
|
||||
'CustomAuthAgent',
|
||||
'http://custom.agent/card',
|
||||
customAuthHandler as unknown as AuthenticationHandler,
|
||||
);
|
||||
|
||||
expect(createAuthenticatingFetchWithRetry).toHaveBeenCalledWith(
|
||||
expect.anything(),
|
||||
customAuthHandler,
|
||||
'TestAgent',
|
||||
'http://test.agent/card',
|
||||
authHandler,
|
||||
);
|
||||
expect(sdkClient.createAuthenticatingFetchWithRetry).toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it('should log a debug message upon loading an agent', async () => {
|
||||
await manager.loadAgent('TestAgent', 'http://test.agent/card');
|
||||
expect(debugLogger.debug).toHaveBeenCalledWith(
|
||||
"[A2AClientManager] Loaded agent 'TestAgent' from http://test.agent/card",
|
||||
expect.stringContaining("Loaded agent 'TestAgent'"),
|
||||
);
|
||||
});
|
||||
|
||||
it('should clear the cache', async () => {
|
||||
await manager.loadAgent('TestAgent', 'http://test.agent/card');
|
||||
expect(manager.getAgentCard('TestAgent')).toBeDefined();
|
||||
expect(manager.getClient('TestAgent')).toBeDefined();
|
||||
|
||||
manager.clearCache();
|
||||
|
||||
expect(manager.getAgentCard('TestAgent')).toBeUndefined();
|
||||
expect(manager.getClient('TestAgent')).toBeUndefined();
|
||||
expect(debugLogger.debug).toHaveBeenCalledWith(
|
||||
'[A2AClientManager] Cache cleared.',
|
||||
);
|
||||
});
|
||||
});
|
||||
|
||||
describe('sendMessageStream', () => {
|
||||
beforeEach(async () => {
|
||||
await manager.loadAgent('TestAgent', 'http://test.agent');
|
||||
await manager.loadAgent('TestAgent', 'http://test.agent/card');
|
||||
});
|
||||
|
||||
it('should send a message and return a stream', async () => {
|
||||
const mockResult = {
|
||||
kind: 'message',
|
||||
messageId: 'a',
|
||||
parts: [],
|
||||
role: 'agent',
|
||||
} as SendMessageResult;
|
||||
|
||||
sendMessageStreamMock.mockReturnValue(
|
||||
mockClient.sendMessageStream.mockReturnValue(
|
||||
(async function* () {
|
||||
yield mockResult;
|
||||
yield { kind: 'message' };
|
||||
})(),
|
||||
);
|
||||
|
||||
const stream = manager.sendMessageStream('TestAgent', 'Hello');
|
||||
const results = [];
|
||||
for await (const res of stream) {
|
||||
results.push(res);
|
||||
for await (const result of stream) {
|
||||
results.push(result);
|
||||
}
|
||||
|
||||
expect(results).toEqual([mockResult]);
|
||||
expect(sendMessageStreamMock).toHaveBeenCalledWith(
|
||||
expect(results).toHaveLength(1);
|
||||
expect(mockClient.sendMessageStream).toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it('should use contextId and taskId when provided', async () => {
|
||||
mockClient.sendMessageStream.mockReturnValue(
|
||||
(async function* () {
|
||||
yield { kind: 'message' };
|
||||
})(),
|
||||
);
|
||||
|
||||
const stream = manager.sendMessageStream('TestAgent', 'Hello', {
|
||||
contextId: 'ctx123',
|
||||
taskId: 'task456',
|
||||
});
|
||||
await stream[Symbol.asyncIterator]().next();
|
||||
|
||||
expect(mockClient.sendMessageStream).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
message: expect.anything(),
|
||||
message: expect.objectContaining({
|
||||
contextId: 'ctx123',
|
||||
taskId: 'task456',
|
||||
}),
|
||||
}),
|
||||
expect.any(Object),
|
||||
);
|
||||
});
|
||||
|
||||
it('should use contextId and taskId when provided', async () => {
|
||||
sendMessageStreamMock.mockReturnValue(
|
||||
it('should correctly propagate AbortSignal to the stream', async () => {
|
||||
mockClient.sendMessageStream.mockReturnValue(
|
||||
(async function* () {
|
||||
yield {
|
||||
kind: 'message',
|
||||
messageId: 'a',
|
||||
parts: [],
|
||||
role: 'agent',
|
||||
} as SendMessageResult;
|
||||
yield { kind: 'message' };
|
||||
})(),
|
||||
);
|
||||
|
||||
const expectedContextId = 'user-context-id';
|
||||
const expectedTaskId = 'user-task-id';
|
||||
|
||||
const controller = new AbortController();
|
||||
const stream = manager.sendMessageStream('TestAgent', 'Hello', {
|
||||
contextId: expectedContextId,
|
||||
taskId: expectedTaskId,
|
||||
signal: controller.signal,
|
||||
});
|
||||
await stream[Symbol.asyncIterator]().next();
|
||||
|
||||
for await (const _ of stream) {
|
||||
// consume stream
|
||||
expect(mockClient.sendMessageStream).toHaveBeenCalledWith(
|
||||
expect.any(Object),
|
||||
expect.objectContaining({ signal: controller.signal }),
|
||||
);
|
||||
});
|
||||
|
||||
it('should handle a multi-chunk stream with different event types', async () => {
|
||||
mockClient.sendMessageStream.mockReturnValue(
|
||||
(async function* () {
|
||||
yield { kind: 'message', messageId: 'm1' };
|
||||
yield { kind: 'status-update', taskId: 't1' };
|
||||
})(),
|
||||
);
|
||||
|
||||
const stream = manager.sendMessageStream('TestAgent', 'Hello');
|
||||
const results = [];
|
||||
for await (const result of stream) {
|
||||
results.push(result);
|
||||
}
|
||||
|
||||
const call = sendMessageStreamMock.mock.calls[0][0];
|
||||
expect(call.message.contextId).toBe(expectedContextId);
|
||||
expect(call.message.taskId).toBe(expectedTaskId);
|
||||
expect(results).toHaveLength(2);
|
||||
expect(results[0].kind).toBe('message');
|
||||
expect(results[1].kind).toBe('status-update');
|
||||
});
|
||||
|
||||
it('should throw prefixed error on failure', async () => {
|
||||
sendMessageStreamMock.mockImplementationOnce(() => {
|
||||
throw new Error('Network error');
|
||||
mockClient.sendMessageStream.mockImplementation(() => {
|
||||
throw new Error('Network failure');
|
||||
});
|
||||
|
||||
const stream = manager.sendMessageStream('TestAgent', 'Hello');
|
||||
await expect(async () => {
|
||||
for await (const _ of stream) {
|
||||
// consume
|
||||
// empty
|
||||
}
|
||||
}).rejects.toThrow(
|
||||
'[A2AClientManager] sendMessageStream Error [TestAgent]: Network error',
|
||||
'[A2AClientManager] sendMessageStream Error [TestAgent]: Network failure',
|
||||
);
|
||||
});
|
||||
|
||||
@@ -261,7 +289,7 @@ describe('A2AClientManager', () => {
|
||||
const stream = manager.sendMessageStream('NonExistentAgent', 'Hello');
|
||||
await expect(async () => {
|
||||
for await (const _ of stream) {
|
||||
// consume
|
||||
// empty
|
||||
}
|
||||
}).rejects.toThrow("Agent 'NonExistentAgent' not found.");
|
||||
});
|
||||
@@ -269,28 +297,23 @@ describe('A2AClientManager', () => {
|
||||
|
||||
describe('getTask', () => {
|
||||
beforeEach(async () => {
|
||||
await manager.loadAgent('TestAgent', 'http://test.agent');
|
||||
await manager.loadAgent('TestAgent', 'http://test.agent/card');
|
||||
});
|
||||
|
||||
it('should get a task from the correct agent', async () => {
|
||||
getTaskMock.mockResolvedValue({
|
||||
id: 'task123',
|
||||
contextId: 'a',
|
||||
kind: 'task',
|
||||
status: { state: 'completed' },
|
||||
} as Task);
|
||||
const mockTask = { id: 'task123', kind: 'task' };
|
||||
mockClient.getTask.mockResolvedValue(mockTask);
|
||||
|
||||
await manager.getTask('TestAgent', 'task123');
|
||||
expect(getTaskMock).toHaveBeenCalledWith({
|
||||
id: 'task123',
|
||||
});
|
||||
const result = await manager.getTask('TestAgent', 'task123');
|
||||
expect(result).toBe(mockTask);
|
||||
expect(mockClient.getTask).toHaveBeenCalledWith({ id: 'task123' });
|
||||
});
|
||||
|
||||
it('should throw prefixed error on failure', async () => {
|
||||
getTaskMock.mockRejectedValueOnce(new Error('Network error'));
|
||||
mockClient.getTask.mockRejectedValue(new Error('Not found'));
|
||||
|
||||
await expect(manager.getTask('TestAgent', 'task123')).rejects.toThrow(
|
||||
'A2AClient getTask Error [TestAgent]: Network error',
|
||||
'A2AClient getTask Error [TestAgent]: Not found',
|
||||
);
|
||||
});
|
||||
|
||||
@@ -303,28 +326,23 @@ describe('A2AClientManager', () => {
|
||||
|
||||
describe('cancelTask', () => {
|
||||
beforeEach(async () => {
|
||||
await manager.loadAgent('TestAgent', 'http://test.agent');
|
||||
await manager.loadAgent('TestAgent', 'http://test.agent/card');
|
||||
});
|
||||
|
||||
it('should cancel a task on the correct agent', async () => {
|
||||
cancelTaskMock.mockResolvedValue({
|
||||
id: 'task123',
|
||||
contextId: 'a',
|
||||
kind: 'task',
|
||||
status: { state: 'canceled' },
|
||||
} as Task);
|
||||
const mockTask = { id: 'task123', kind: 'task' };
|
||||
mockClient.cancelTask.mockResolvedValue(mockTask);
|
||||
|
||||
await manager.cancelTask('TestAgent', 'task123');
|
||||
expect(cancelTaskMock).toHaveBeenCalledWith({
|
||||
id: 'task123',
|
||||
});
|
||||
const result = await manager.cancelTask('TestAgent', 'task123');
|
||||
expect(result).toBe(mockTask);
|
||||
expect(mockClient.cancelTask).toHaveBeenCalledWith({ id: 'task123' });
|
||||
});
|
||||
|
||||
it('should throw prefixed error on failure', async () => {
|
||||
cancelTaskMock.mockRejectedValueOnce(new Error('Network error'));
|
||||
mockClient.cancelTask.mockRejectedValue(new Error('Cannot cancel'));
|
||||
|
||||
await expect(manager.cancelTask('TestAgent', 'task123')).rejects.toThrow(
|
||||
'A2AClient cancelTask Error [TestAgent]: Network error',
|
||||
'A2AClient cancelTask Error [TestAgent]: Cannot cancel',
|
||||
);
|
||||
});
|
||||
|
||||
@@ -334,4 +352,46 @@ describe('A2AClientManager', () => {
|
||||
).rejects.toThrow("Agent 'NonExistentAgent' not found.");
|
||||
});
|
||||
});
|
||||
|
||||
describe('Protocol Routing & URL Logic', () => {
|
||||
it('should correctly split URLs to prevent .well-known doubling', async () => {
|
||||
const fullUrl = 'http://localhost:9001/.well-known/agent-card.json';
|
||||
const resolverInstance = {
|
||||
resolve: vi.fn().mockResolvedValue({ name: 'test' } as AgentCard),
|
||||
};
|
||||
vi.mocked(sdkClient.DefaultAgentCardResolver).mockReturnValue(
|
||||
resolverInstance as unknown as sdkClient.DefaultAgentCardResolver,
|
||||
);
|
||||
|
||||
await manager.loadAgent('test-doubling', fullUrl);
|
||||
|
||||
expect(resolverInstance.resolve).toHaveBeenCalledWith(
|
||||
'http://localhost:9001/',
|
||||
'.well-known/agent-card.json',
|
||||
);
|
||||
});
|
||||
|
||||
it('should route to V1 Bridge when protocolVersion starts with 1.', async () => {
|
||||
const resolverInstance = {
|
||||
resolve: vi.fn().mockResolvedValue({
|
||||
name: 'v1-agent',
|
||||
protocolVersion: '1.0',
|
||||
additionalInterfaces: [{ url: 'grpc://v1', transport: 'GRPC' }],
|
||||
} as unknown as AgentCard),
|
||||
};
|
||||
vi.mocked(sdkClient.DefaultAgentCardResolver).mockReturnValue(
|
||||
resolverInstance as unknown as sdkClient.DefaultAgentCardResolver,
|
||||
);
|
||||
|
||||
await manager.loadAgent('v1-agent', 'http://v1');
|
||||
|
||||
const stream = manager.sendMessageStream('v1-agent', 'hi');
|
||||
const it = stream[Symbol.asyncIterator]();
|
||||
const result = await it.next();
|
||||
|
||||
expect(result.value).toBeDefined();
|
||||
const value = result.value as Record<string, unknown>;
|
||||
expect(value['kind']).toBe('message');
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
@@ -7,13 +7,11 @@
|
||||
import type {
|
||||
AgentCard,
|
||||
Message,
|
||||
MessageSendParams,
|
||||
Task,
|
||||
TaskStatusUpdateEvent,
|
||||
TaskArtifactUpdateEvent,
|
||||
} from '@a2a-js/sdk';
|
||||
import {
|
||||
type Client,
|
||||
ClientFactory,
|
||||
ClientFactoryOptions,
|
||||
DefaultAgentCardResolver,
|
||||
@@ -22,9 +20,17 @@ import {
|
||||
type AuthenticationHandler,
|
||||
createAuthenticatingFetchWithRetry,
|
||||
} from '@a2a-js/sdk/client';
|
||||
import { GrpcTransportFactory } from '@a2a-js/sdk/client/grpc';
|
||||
import { v4 as uuidv4 } from 'uuid';
|
||||
import { Agent as UndiciAgent } from 'undici';
|
||||
import {
|
||||
getGrpcCredentials,
|
||||
normalizeAgentCard,
|
||||
getProtocolVersion,
|
||||
type VersionedAgentCard,
|
||||
} from './a2aUtils.js';
|
||||
import { debugLogger } from '../utils/debugLogger.js';
|
||||
import { sendV1MessageStream } from './v1-bridge.js';
|
||||
|
||||
// Remote agents can take 10+ minutes (e.g. Deep Research).
|
||||
// Use a dedicated dispatcher so the global 5-min timeout isn't affected.
|
||||
@@ -43,16 +49,27 @@ export type SendMessageResult =
|
||||
| TaskStatusUpdateEvent
|
||||
| TaskArtifactUpdateEvent;
|
||||
|
||||
interface ExtendedClient {
|
||||
getTask?(arg: { id: string }): Promise<Task>;
|
||||
cancelTask?(arg: { id: string }): Promise<Task>;
|
||||
sendMessageStream?(
|
||||
arg: { message: unknown },
|
||||
options?: { signal?: AbortSignal },
|
||||
): AsyncIterable<SendMessageResult>;
|
||||
}
|
||||
|
||||
/**
|
||||
* Manages A2A clients and caches loaded agent information.
|
||||
* Follows a singleton pattern to ensure a single client instance.
|
||||
* Orchestrates communication with A2A agents.
|
||||
*
|
||||
* This manager handles agent discovery, card caching, and client lifecycle.
|
||||
* It provides a unified messaging interface by routing requests through either
|
||||
* the standard A2A SDK or a specialized gRPC V1 bridge based on protocol version.
|
||||
*/
|
||||
export class A2AClientManager {
|
||||
private static instance: A2AClientManager;
|
||||
|
||||
// Each agent should manage their own context/taskIds/card/etc
|
||||
private clients = new Map<string, Client>();
|
||||
private agentCards = new Map<string, AgentCard>();
|
||||
private gRPCUrls = new Map<string, string>();
|
||||
private clients = new Map<string, ExtendedClient>();
|
||||
|
||||
private constructor() {}
|
||||
|
||||
@@ -91,35 +108,51 @@ export class A2AClientManager {
|
||||
throw new Error(`Agent with name '${name}' is already loaded.`);
|
||||
}
|
||||
|
||||
let fetchImpl: typeof fetch = a2aFetch;
|
||||
if (authHandler) {
|
||||
fetchImpl = createAuthenticatingFetchWithRetry(a2aFetch, authHandler);
|
||||
}
|
||||
|
||||
const fetchImpl = this.getFetchImpl(authHandler);
|
||||
const resolver = new DefaultAgentCardResolver({ fetchImpl });
|
||||
|
||||
const options = ClientFactoryOptions.createFrom(
|
||||
// Detect if the URL is already a full path to an agent card to prevent doubling by the resolver.
|
||||
const standardPath = '.well-known/agent-card.json';
|
||||
let baseUrl = agentCardUrl;
|
||||
let path: string | undefined;
|
||||
|
||||
if (baseUrl.includes(standardPath)) {
|
||||
const parts = baseUrl.split(standardPath);
|
||||
baseUrl = parts[0] || '';
|
||||
path = standardPath;
|
||||
}
|
||||
|
||||
// Use SDK resolver to handle .well-known resolution and fetching.
|
||||
const rawCard = await resolver.resolve(baseUrl, path);
|
||||
const agentCard = normalizeAgentCard(rawCard);
|
||||
|
||||
// Configure standard SDK client for tool registration and discovery
|
||||
const clientOptions = ClientFactoryOptions.createFrom(
|
||||
ClientFactoryOptions.default,
|
||||
{
|
||||
transports: [
|
||||
new RestTransportFactory({ fetchImpl }),
|
||||
new JsonRpcTransportFactory({ fetchImpl }),
|
||||
new GrpcTransportFactory({
|
||||
grpcChannelCredentials: getGrpcCredentials(agentCardUrl),
|
||||
}),
|
||||
],
|
||||
cardResolver: resolver,
|
||||
},
|
||||
);
|
||||
|
||||
const factory = new ClientFactory(options);
|
||||
const client = await factory.createFromUrl(agentCardUrl, '');
|
||||
const agentCard = await client.getAgentCard();
|
||||
const factory = new ClientFactory(clientOptions);
|
||||
const client = (await factory.createFromAgentCard(
|
||||
agentCard,
|
||||
)) as ExtendedClient;
|
||||
|
||||
this.clients.set(name, client);
|
||||
this.agentCards.set(name, agentCard);
|
||||
|
||||
this.registerV1BridgeUrl(name, agentCard);
|
||||
|
||||
debugLogger.debug(
|
||||
`[A2AClientManager] Loaded agent '${name}' from ${agentCardUrl}`,
|
||||
);
|
||||
|
||||
return agentCard;
|
||||
}
|
||||
|
||||
@@ -127,8 +160,9 @@ export class A2AClientManager {
|
||||
* Invalidates all cached clients and agent cards.
|
||||
*/
|
||||
clearCache(): void {
|
||||
this.clients.clear();
|
||||
this.agentCards.clear();
|
||||
this.gRPCUrls.clear();
|
||||
this.clients.clear();
|
||||
debugLogger.debug('[A2AClientManager] Cache cleared.');
|
||||
}
|
||||
|
||||
@@ -145,26 +179,29 @@ export class A2AClientManager {
|
||||
message: string,
|
||||
options?: { contextId?: string; taskId?: string; signal?: AbortSignal },
|
||||
): AsyncIterable<SendMessageResult> {
|
||||
const client = this.clients.get(agentName);
|
||||
if (!client) {
|
||||
throw new Error(`Agent '${agentName}' not found.`);
|
||||
}
|
||||
|
||||
const messageParams: MessageSendParams = {
|
||||
message: {
|
||||
kind: 'message',
|
||||
role: 'user',
|
||||
messageId: uuidv4(),
|
||||
parts: [{ kind: 'text', text: message }],
|
||||
contextId: options?.contextId,
|
||||
taskId: options?.taskId,
|
||||
},
|
||||
};
|
||||
const url = this.gRPCUrls.get(agentName);
|
||||
const agentCard = this.agentCards.get(agentName) as
|
||||
| VersionedAgentCard
|
||||
| undefined;
|
||||
|
||||
try {
|
||||
yield* client.sendMessageStream(messageParams, {
|
||||
signal: options?.signal,
|
||||
});
|
||||
// Resolve protocol version
|
||||
const version = getProtocolVersion(agentCard, url);
|
||||
|
||||
// Fallback to standard SDK for non-V1 agents
|
||||
if (!version?.startsWith('1.')) {
|
||||
yield* this.sendSdkMessageStream(agentName, message, options);
|
||||
return;
|
||||
}
|
||||
|
||||
// Use the V1 Bridge for direct gRPC communication.
|
||||
// TODO: Replace with standard SDK call once @a2a-js/sdk supports V1.
|
||||
if (!url) {
|
||||
throw new Error(
|
||||
`Agent '${agentName}' is a V1 agent but no gRPC interface was found.`,
|
||||
);
|
||||
}
|
||||
yield* sendV1MessageStream(url, message, options);
|
||||
} catch (error: unknown) {
|
||||
const prefix = `[A2AClientManager] sendMessageStream Error [${agentName}]`;
|
||||
if (error instanceof Error) {
|
||||
@@ -190,7 +227,7 @@ export class A2AClientManager {
|
||||
* @param name The name of the agent.
|
||||
* @returns The client, or undefined if not found.
|
||||
*/
|
||||
getClient(name: string): Client | undefined {
|
||||
getClient(name: string): ExtendedClient | undefined {
|
||||
return this.clients.get(name);
|
||||
}
|
||||
|
||||
@@ -202,9 +239,9 @@ export class A2AClientManager {
|
||||
*/
|
||||
async getTask(agentName: string, taskId: string): Promise<Task> {
|
||||
const client = this.clients.get(agentName);
|
||||
if (!client) {
|
||||
throw new Error(`Agent '${agentName}' not found.`);
|
||||
}
|
||||
if (!client) throw new Error(`Agent '${agentName}' not found.`);
|
||||
if (!client.getTask)
|
||||
throw new Error(`Agent '${agentName}' does not support getTask.`);
|
||||
try {
|
||||
return await client.getTask({ id: taskId });
|
||||
} catch (error: unknown) {
|
||||
@@ -224,9 +261,9 @@ export class A2AClientManager {
|
||||
*/
|
||||
async cancelTask(agentName: string, taskId: string): Promise<Task> {
|
||||
const client = this.clients.get(agentName);
|
||||
if (!client) {
|
||||
throw new Error(`Agent '${agentName}' not found.`);
|
||||
}
|
||||
if (!client) throw new Error(`Agent '${agentName}' not found.`);
|
||||
if (!client.cancelTask)
|
||||
throw new Error(`Agent '${agentName}' does not support cancelTask.`);
|
||||
try {
|
||||
return await client.cancelTask({ id: taskId });
|
||||
} catch (error: unknown) {
|
||||
@@ -237,4 +274,56 @@ export class A2AClientManager {
|
||||
throw new Error(`${prefix}: Unexpected error: ${String(error)}`);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Resolves the appropriate fetch implementation for an agent.
|
||||
*/
|
||||
private getFetchImpl(authHandler?: AuthenticationHandler): typeof fetch {
|
||||
return authHandler
|
||||
? createAuthenticatingFetchWithRetry(a2aFetch, authHandler)
|
||||
: a2aFetch;
|
||||
}
|
||||
|
||||
/**
|
||||
* Stores the gRPC URL for direct V1 communication if available.
|
||||
*/
|
||||
private registerV1BridgeUrl(name: string, agentCard: AgentCard): void {
|
||||
const intf = agentCard.additionalInterfaces?.find(
|
||||
(i) =>
|
||||
i.transport === 'GRPC' && typeof i.url === 'string' && i.url !== '',
|
||||
);
|
||||
if (intf) {
|
||||
this.gRPCUrls.set(name, intf.url);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Fallback method using the standard SDK messaging client.
|
||||
*/
|
||||
private async *sendSdkMessageStream(
|
||||
agentName: string,
|
||||
message: string,
|
||||
options?: { contextId?: string; taskId?: string; signal?: AbortSignal },
|
||||
): AsyncIterable<SendMessageResult> {
|
||||
const client = this.clients.get(agentName);
|
||||
if (!client) throw new Error(`Agent '${agentName}' not found.`);
|
||||
if (!client.sendMessageStream)
|
||||
throw new Error(
|
||||
`Agent '${agentName}' does not support sendMessageStream.`,
|
||||
);
|
||||
|
||||
yield* client.sendMessageStream(
|
||||
{
|
||||
message: {
|
||||
kind: 'message',
|
||||
messageId: uuidv4(),
|
||||
role: 'user',
|
||||
parts: [{ kind: 'text', text: message }],
|
||||
contextId: options?.contextId,
|
||||
taskId: options?.taskId,
|
||||
},
|
||||
},
|
||||
{ signal: options?.signal },
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -11,6 +11,8 @@ import {
|
||||
isTerminalState,
|
||||
A2AResultReassembler,
|
||||
AUTH_REQUIRED_MSG,
|
||||
normalizeAgentCard,
|
||||
getProtocolVersion,
|
||||
} from './a2aUtils.js';
|
||||
import type { SendMessageResult } from './a2a-client-manager.js';
|
||||
import type {
|
||||
@@ -225,6 +227,81 @@ describe('a2aUtils', () => {
|
||||
});
|
||||
});
|
||||
|
||||
describe('normalizeAgentCard', () => {
|
||||
it('should preserve unknown fields while providing defaults for mandatory ones', () => {
|
||||
const raw = {
|
||||
name: 'my-agent',
|
||||
customField: 'keep-me',
|
||||
};
|
||||
|
||||
const normalized = normalizeAgentCard(raw);
|
||||
|
||||
expect(normalized.name).toBe('my-agent');
|
||||
// @ts-expect-error - testing dynamic preservation
|
||||
expect(normalized.customField).toBe('keep-me');
|
||||
expect(normalized.description).toBe('');
|
||||
expect(normalized.skills).toEqual([]);
|
||||
expect(normalized.defaultInputModes).toEqual([]);
|
||||
});
|
||||
|
||||
it('should normalize additionalInterfaces while preserving protocolVersion', () => {
|
||||
const raw = {
|
||||
name: 'test',
|
||||
additionalInterfaces: [
|
||||
{
|
||||
url: 'grpc://test',
|
||||
protocolBinding: 'GRPC',
|
||||
protocolVersion: '1.0',
|
||||
},
|
||||
],
|
||||
};
|
||||
|
||||
const normalized = normalizeAgentCard(raw);
|
||||
const intf = normalized.additionalInterfaces?.[0] as unknown as Record<
|
||||
string,
|
||||
unknown
|
||||
>;
|
||||
|
||||
expect(intf['transport']).toBe('GRPC');
|
||||
expect(intf['url']).toBe('grpc://test');
|
||||
expect(intf['protocolVersion']).toBe('1.0');
|
||||
});
|
||||
|
||||
it('should fallback to supportedInterfaces if additionalInterfaces is missing', () => {
|
||||
const raw = {
|
||||
name: 'test',
|
||||
supportedInterfaces: [{ url: 'http://test', transport: 'REST' }],
|
||||
};
|
||||
|
||||
const normalized = normalizeAgentCard(raw);
|
||||
expect(normalized.additionalInterfaces).toHaveLength(1);
|
||||
expect(normalized.additionalInterfaces?.[0].transport).toBe('REST');
|
||||
});
|
||||
});
|
||||
|
||||
describe('getProtocolVersion', () => {
|
||||
it('should resolve version from specific interface URL', () => {
|
||||
const card = {
|
||||
additionalInterfaces: [
|
||||
{ url: 'v1-url', protocolVersion: '1.1' },
|
||||
{ url: 'v0-url', protocolVersion: '0.1' },
|
||||
],
|
||||
};
|
||||
|
||||
expect(getProtocolVersion(card, 'v1-url')).toBe('1.1');
|
||||
expect(getProtocolVersion(card, 'v0-url')).toBe('0.1');
|
||||
});
|
||||
|
||||
it('should fallback to top-level protocolVersion', () => {
|
||||
const card = {
|
||||
protocolVersion: '1.5',
|
||||
additionalInterfaces: [{ url: 'some-url' }],
|
||||
};
|
||||
|
||||
expect(getProtocolVersion(card, 'some-url')).toBe('1.5');
|
||||
});
|
||||
});
|
||||
|
||||
describe('A2AResultReassembler', () => {
|
||||
it('should reassemble sequential messages and incremental artifacts', () => {
|
||||
const reassembler = new A2AResultReassembler();
|
||||
@@ -233,6 +310,7 @@ describe('a2aUtils', () => {
|
||||
reassembler.update({
|
||||
kind: 'status-update',
|
||||
taskId: 't1',
|
||||
contextId: 'ctx1',
|
||||
status: {
|
||||
state: 'working',
|
||||
message: {
|
||||
@@ -247,6 +325,7 @@ describe('a2aUtils', () => {
|
||||
reassembler.update({
|
||||
kind: 'artifact-update',
|
||||
taskId: 't1',
|
||||
contextId: 'ctx1',
|
||||
append: false,
|
||||
artifact: {
|
||||
artifactId: 'a1',
|
||||
@@ -259,6 +338,7 @@ describe('a2aUtils', () => {
|
||||
reassembler.update({
|
||||
kind: 'status-update',
|
||||
taskId: 't1',
|
||||
contextId: 'ctx1',
|
||||
status: {
|
||||
state: 'working',
|
||||
message: {
|
||||
@@ -273,6 +353,7 @@ describe('a2aUtils', () => {
|
||||
reassembler.update({
|
||||
kind: 'artifact-update',
|
||||
taskId: 't1',
|
||||
contextId: 'ctx1',
|
||||
append: true,
|
||||
artifact: {
|
||||
artifactId: 'a1',
|
||||
@@ -291,6 +372,7 @@ describe('a2aUtils', () => {
|
||||
|
||||
reassembler.update({
|
||||
kind: 'status-update',
|
||||
contextId: 'ctx1',
|
||||
status: {
|
||||
state: 'auth-required',
|
||||
message: {
|
||||
@@ -310,6 +392,7 @@ describe('a2aUtils', () => {
|
||||
|
||||
reassembler.update({
|
||||
kind: 'status-update',
|
||||
contextId: 'ctx1',
|
||||
status: {
|
||||
state: 'auth-required',
|
||||
},
|
||||
@@ -323,6 +406,7 @@ describe('a2aUtils', () => {
|
||||
|
||||
const chunk = {
|
||||
kind: 'status-update',
|
||||
contextId: 'ctx1',
|
||||
status: {
|
||||
state: 'auth-required',
|
||||
message: {
|
||||
@@ -351,6 +435,8 @@ describe('a2aUtils', () => {
|
||||
|
||||
reassembler.update({
|
||||
kind: 'task',
|
||||
id: 'task-1',
|
||||
contextId: 'ctx1',
|
||||
status: { state: 'completed' },
|
||||
history: [
|
||||
{
|
||||
@@ -369,6 +455,8 @@ describe('a2aUtils', () => {
|
||||
|
||||
reassembler.update({
|
||||
kind: 'task',
|
||||
id: 'task-1',
|
||||
contextId: 'ctx1',
|
||||
status: { state: 'working' },
|
||||
history: [
|
||||
{
|
||||
@@ -387,6 +475,8 @@ describe('a2aUtils', () => {
|
||||
|
||||
reassembler.update({
|
||||
kind: 'task',
|
||||
id: 'task-1',
|
||||
contextId: 'ctx1',
|
||||
status: { state: 'completed' },
|
||||
artifacts: [
|
||||
{
|
||||
|
||||
@@ -4,6 +4,7 @@
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
import * as grpc from '@grpc/grpc-js';
|
||||
import type {
|
||||
Message,
|
||||
Part,
|
||||
@@ -13,11 +14,26 @@ import type {
|
||||
Artifact,
|
||||
TaskState,
|
||||
TaskStatusUpdateEvent,
|
||||
AgentCard,
|
||||
AgentInterface,
|
||||
} from '@a2a-js/sdk';
|
||||
import type { SendMessageResult } from './a2a-client-manager.js';
|
||||
|
||||
export const AUTH_REQUIRED_MSG = `[Authorization Required] The agent has indicated it requires authorization to proceed. Please follow the agent's instructions.`;
|
||||
|
||||
/**
|
||||
* Extended interface for Agent Card properties not yet in the core SDK.
|
||||
*/
|
||||
export interface VersionedInterface extends AgentInterface {
|
||||
protocolBinding?: string;
|
||||
protocolVersion?: string;
|
||||
}
|
||||
|
||||
export interface VersionedAgentCard extends AgentCard {
|
||||
additionalInterfaces?: VersionedInterface[];
|
||||
supportedInterfaces?: VersionedInterface[];
|
||||
}
|
||||
|
||||
/**
|
||||
* Reassembles incremental A2A streaming updates into a coherent result.
|
||||
* Shows sequential status/messages followed by all reassembled artifacts.
|
||||
@@ -210,36 +226,72 @@ function extractPartText(part: Part): string {
|
||||
return '';
|
||||
}
|
||||
|
||||
// Type Guards
|
||||
/**
|
||||
* Normalizes an agent card by ensuring it has the required properties
|
||||
* and resolving any inconsistencies between protocol versions.
|
||||
*/
|
||||
export function normalizeAgentCard(card: unknown): AgentCard {
|
||||
if (!isObject(card)) {
|
||||
throw new Error('Agent card is missing.');
|
||||
}
|
||||
|
||||
function isTextPart(part: Part): part is TextPart {
|
||||
return part.kind === 'text';
|
||||
}
|
||||
// Double-cast to bypass strict linter while bootstrapping the object.
|
||||
// eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion
|
||||
const result = { ...card } as unknown as AgentCard;
|
||||
|
||||
function isDataPart(part: Part): part is DataPart {
|
||||
return part.kind === 'data';
|
||||
}
|
||||
// Ensure mandatory fields exist with safe defaults.
|
||||
if (typeof result.name !== 'string') result.name = 'unknown';
|
||||
if (typeof result.description !== 'string') result.description = '';
|
||||
if (typeof result.url !== 'string') result.url = '';
|
||||
if (typeof result.version !== 'string') result.version = '';
|
||||
if (typeof result.protocolVersion !== 'string') result.protocolVersion = '';
|
||||
if (!isObject(result.capabilities)) result.capabilities = {};
|
||||
if (!Array.isArray(result.skills)) result.skills = [];
|
||||
if (!Array.isArray(result.defaultInputModes)) result.defaultInputModes = [];
|
||||
if (!Array.isArray(result.defaultOutputModes)) result.defaultOutputModes = [];
|
||||
|
||||
function isFilePart(part: Part): part is FilePart {
|
||||
return part.kind === 'file';
|
||||
}
|
||||
// Normalize interfaces while preserving all other fields.
|
||||
result.additionalInterfaces = extractNormalizedInterfaces(card);
|
||||
|
||||
function isStatusUpdateEvent(
|
||||
result: SendMessageResult,
|
||||
): result is TaskStatusUpdateEvent {
|
||||
return result.kind === 'status-update';
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns true if the given state is a terminal state for a task.
|
||||
* Resolves the protocol version for a specific agent interface URL.
|
||||
* Checks the specific interface first, then falls back to the agent card's default.
|
||||
*/
|
||||
export function isTerminalState(state: TaskState | undefined): boolean {
|
||||
return (
|
||||
state === 'completed' ||
|
||||
state === 'failed' ||
|
||||
state === 'canceled' ||
|
||||
state === 'rejected'
|
||||
);
|
||||
export function getProtocolVersion(
|
||||
agentCard: unknown,
|
||||
interfaceUrl: string | undefined,
|
||||
): string | undefined {
|
||||
if (!isObject(agentCard)) {
|
||||
return undefined;
|
||||
}
|
||||
|
||||
const additionalInterfaces = agentCard['additionalInterfaces'];
|
||||
const interfaces = Array.isArray(additionalInterfaces)
|
||||
? (additionalInterfaces as unknown[])
|
||||
: undefined;
|
||||
|
||||
if (interfaces && interfaceUrl) {
|
||||
for (const i of interfaces) {
|
||||
if (isObject(i) && getString(i, 'url') === interfaceUrl) {
|
||||
const v = getString(i, 'protocolVersion');
|
||||
if (v) return v;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return getString(agentCard, 'protocolVersion');
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns gRPC channel credentials based on the URL scheme.
|
||||
*/
|
||||
export function getGrpcCredentials(url: string): grpc.ChannelCredentials {
|
||||
return url.startsWith('https://')
|
||||
? grpc.credentials.createSsl()
|
||||
: grpc.credentials.createInsecure();
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -279,3 +331,103 @@ export function extractIdsFromResponse(result: SendMessageResult): {
|
||||
|
||||
return { contextId, taskId, clearTaskId };
|
||||
}
|
||||
|
||||
/**
|
||||
* Extracts and normalizes interfaces from the card, handling protocol version fallbacks.
|
||||
* Preserves all original fields to maintain SDK compatibility.
|
||||
*/
|
||||
function extractNormalizedInterfaces(
|
||||
card: Record<string, unknown>,
|
||||
): AgentInterface[] {
|
||||
const rawInterfaces =
|
||||
getArray(card, 'additionalInterfaces') ||
|
||||
getArray(card, 'supportedInterfaces');
|
||||
|
||||
if (!rawInterfaces) {
|
||||
return [];
|
||||
}
|
||||
|
||||
const mapped: AgentInterface[] = [];
|
||||
for (const i of rawInterfaces) {
|
||||
if (isObject(i)) {
|
||||
// Create a copy to preserve all original fields (like protocolVersion, etc.)
|
||||
// eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion
|
||||
const normalized = { ...i } as unknown as VersionedInterface;
|
||||
|
||||
// Ensure 'url' exists
|
||||
if (typeof normalized.url !== 'string') {
|
||||
normalized.url = '';
|
||||
}
|
||||
|
||||
// Normalize 'transport' from 'protocolBinding'
|
||||
const transport = normalized.transport || normalized.protocolBinding;
|
||||
if (transport) {
|
||||
normalized.transport = transport;
|
||||
}
|
||||
|
||||
mapped.push(normalized);
|
||||
}
|
||||
}
|
||||
return mapped;
|
||||
}
|
||||
|
||||
/**
|
||||
* Safely extracts a string property from an object.
|
||||
*/
|
||||
function getString(
|
||||
obj: Record<string, unknown>,
|
||||
key: string,
|
||||
): string | undefined {
|
||||
const val = obj[key];
|
||||
return typeof val === 'string' ? val : undefined;
|
||||
}
|
||||
|
||||
/**
|
||||
* Safely extracts an array property from an object.
|
||||
*/
|
||||
function getArray(
|
||||
obj: Record<string, unknown>,
|
||||
key: string,
|
||||
): unknown[] | undefined {
|
||||
const val = obj[key];
|
||||
return Array.isArray(val) ? val : undefined;
|
||||
}
|
||||
|
||||
// Type Guards
|
||||
|
||||
function isTextPart(part: Part): part is TextPart {
|
||||
return part.kind === 'text';
|
||||
}
|
||||
|
||||
function isDataPart(part: Part): part is DataPart {
|
||||
return part.kind === 'data';
|
||||
}
|
||||
|
||||
function isFilePart(part: Part): part is FilePart {
|
||||
return part.kind === 'file';
|
||||
}
|
||||
|
||||
function isStatusUpdateEvent(
|
||||
result: SendMessageResult,
|
||||
): result is TaskStatusUpdateEvent {
|
||||
return result.kind === 'status-update';
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns true if the given state is a terminal state for a task.
|
||||
*/
|
||||
export function isTerminalState(state: TaskState | undefined): boolean {
|
||||
return (
|
||||
state === 'completed' ||
|
||||
state === 'failed' ||
|
||||
state === 'canceled' ||
|
||||
state === 'rejected'
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Type guard to check if a value is a non-array object.
|
||||
*/
|
||||
function isObject(val: unknown): val is Record<string, unknown> {
|
||||
return typeof val === 'object' && val !== null && !Array.isArray(val);
|
||||
}
|
||||
|
||||
@@ -874,6 +874,31 @@ describe('AgentRegistry', () => {
|
||||
);
|
||||
});
|
||||
|
||||
it('should maintain registration under canonical name even if overrides are applied', async () => {
|
||||
const originalName = 'my-agent';
|
||||
const definition = { ...MOCK_AGENT_V1, name: originalName };
|
||||
|
||||
// Mock overrides in settings
|
||||
vi.spyOn(mockConfig, 'getAgentsSettings').mockReturnValue({
|
||||
overrides: {
|
||||
[originalName]: {
|
||||
enabled: true,
|
||||
modelConfig: { model: 'overridden-model' },
|
||||
},
|
||||
},
|
||||
});
|
||||
|
||||
await registry.testRegisterAgent(definition);
|
||||
|
||||
const registered = registry.getDefinition(originalName);
|
||||
expect(registered).toBeDefined();
|
||||
expect((registered as LocalAgentDefinition).modelConfig.model).toBe(
|
||||
'overridden-model',
|
||||
);
|
||||
// Ensure it is NOT registered under some other key
|
||||
expect(registry.getAllAgentNames()).toEqual([originalName]);
|
||||
});
|
||||
|
||||
it('should reject an agent definition missing a name', async () => {
|
||||
const invalidAgent = { ...MOCK_AGENT_V1, name: '' };
|
||||
const debugWarnSpy = vi
|
||||
|
||||
@@ -312,7 +312,7 @@ export class AgentRegistry {
|
||||
}
|
||||
|
||||
const mergedDefinition = this.applyOverrides(definition, settingsOverrides);
|
||||
this.agents.set(mergedDefinition.name, mergedDefinition);
|
||||
this.agents.set(definition.name, mergedDefinition);
|
||||
|
||||
this.registerModelConfigs(mergedDefinition);
|
||||
this.addAgentPolicy(mergedDefinition);
|
||||
|
||||
@@ -0,0 +1,157 @@
|
||||
/**
|
||||
* @license
|
||||
* Copyright 2025 Google LLC
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
import { describe, it, expect, vi, beforeEach } from 'vitest';
|
||||
import { sendV1MessageStream } from './v1-bridge.js';
|
||||
import { EventEmitter } from 'node:events';
|
||||
|
||||
// Global mock state to share between mock factory and tests
|
||||
const mockCall = new EventEmitter() as unknown as EventEmitter & {
|
||||
cancel: import('vitest').Mock;
|
||||
};
|
||||
mockCall.cancel = vi.fn();
|
||||
|
||||
const mockService = {
|
||||
SendStreamingMessage: vi.fn(() => mockCall),
|
||||
};
|
||||
|
||||
// Mock gRPC and Proto Loader
|
||||
vi.mock('@grpc/grpc-js', () => ({
|
||||
loadPackageDefinition: vi.fn().mockReturnValue({
|
||||
lf: {
|
||||
a2a: {
|
||||
v1: {
|
||||
A2AService: vi.fn().mockImplementation(() => mockService),
|
||||
},
|
||||
},
|
||||
},
|
||||
}),
|
||||
credentials: {
|
||||
createInsecure: vi.fn(),
|
||||
createSsl: vi.fn(),
|
||||
},
|
||||
}));
|
||||
|
||||
vi.mock('@grpc/proto-loader', () => ({
|
||||
fromJSON: vi.fn().mockReturnValue({}),
|
||||
}));
|
||||
|
||||
describe('v1-bridge', () => {
|
||||
beforeEach(() => {
|
||||
vi.clearAllMocks();
|
||||
mockCall.removeAllListeners();
|
||||
});
|
||||
|
||||
it('should correctly map a string query to a V1 Part.text request', async () => {
|
||||
const stream = sendV1MessageStream('http://localhost:9000', 'hello agent');
|
||||
|
||||
// Start the generator
|
||||
const it = stream[Symbol.asyncIterator]();
|
||||
const nextPromise = it.next();
|
||||
|
||||
// Verify the request sent to gRPC
|
||||
expect(mockService.SendStreamingMessage).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
message: expect.objectContaining({
|
||||
parts: [
|
||||
expect.objectContaining({
|
||||
text: 'hello agent',
|
||||
}),
|
||||
],
|
||||
}),
|
||||
}),
|
||||
);
|
||||
|
||||
// Cleanup
|
||||
mockCall.emit('end');
|
||||
await nextPromise;
|
||||
});
|
||||
|
||||
it('should transform a V1 Message response into an SDK Message result', async () => {
|
||||
const stream = sendV1MessageStream('http://localhost:9000', 'hi');
|
||||
const results: unknown[] = [];
|
||||
|
||||
// Simulate gRPC data arrival
|
||||
const processStream = (async () => {
|
||||
for await (const chunk of stream) {
|
||||
results.push(chunk);
|
||||
}
|
||||
})();
|
||||
|
||||
// Ensure listeners are attached before emitting
|
||||
await new Promise((resolve) => setTimeout(resolve, 0));
|
||||
|
||||
mockCall.emit('data', {
|
||||
message: {
|
||||
messageId: 'v1-id',
|
||||
role: 1, // USER
|
||||
parts: [{ text: 'Response from V1' }],
|
||||
},
|
||||
});
|
||||
|
||||
mockCall.emit('end');
|
||||
await processStream;
|
||||
|
||||
expect(results).toHaveLength(1);
|
||||
expect(results[0]).toEqual(
|
||||
expect.objectContaining({
|
||||
kind: 'message',
|
||||
messageId: 'v1-id',
|
||||
parts: [{ kind: 'text', text: 'Response from V1' }],
|
||||
}),
|
||||
);
|
||||
});
|
||||
|
||||
it('should transform a V1 StatusUpdate response (without message) into an SDK StatusUpdate', async () => {
|
||||
const stream = sendV1MessageStream('http://localhost:9000', 'hi');
|
||||
const results: unknown[] = [];
|
||||
|
||||
const processStream = (async () => {
|
||||
for await (const chunk of stream) {
|
||||
results.push(chunk);
|
||||
}
|
||||
})();
|
||||
|
||||
// Ensure listeners are attached
|
||||
await new Promise((resolve) => setTimeout(resolve, 0));
|
||||
|
||||
// V1 Structure for status update without a nested message string
|
||||
mockCall.emit('data', {
|
||||
statusUpdate: {
|
||||
status: {
|
||||
state: 3, // WORKING
|
||||
},
|
||||
},
|
||||
});
|
||||
|
||||
mockCall.emit('end');
|
||||
await processStream;
|
||||
|
||||
expect(results).toHaveLength(1);
|
||||
const firstResult = results[0] as Record<string, unknown>;
|
||||
expect(firstResult['kind']).toBe('status-update');
|
||||
// Verify mapping from 3 -> 'working'
|
||||
const status = firstResult['status'] as Record<string, unknown>;
|
||||
expect(status['state']).toBe('working');
|
||||
});
|
||||
|
||||
it('should propagate gRPC stream errors', async () => {
|
||||
const stream = sendV1MessageStream('http://localhost:9000', 'hi');
|
||||
|
||||
const processStream = (async () => {
|
||||
for await (const _ of stream) {
|
||||
// empty
|
||||
}
|
||||
})();
|
||||
|
||||
// Ensure listeners are attached
|
||||
await new Promise((resolve) => setTimeout(resolve, 0));
|
||||
|
||||
mockCall.emit('error', new Error('gRPC internal error'));
|
||||
|
||||
await expect(processStream).rejects.toThrow('gRPC internal error');
|
||||
});
|
||||
});
|
||||
@@ -0,0 +1,268 @@
|
||||
/**
|
||||
* @license
|
||||
* Copyright 2025 Google LLC
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
/**
|
||||
* @fileoverview Direct gRPC implementation for A2A V1 protocol.
|
||||
*
|
||||
* IMPORTANT: This bridge is a TEMPORARY measure. It exists because the current version
|
||||
* of the @a2a-js/sdk (v0.3.x) does not yet support the V1 protocol (specifically the
|
||||
* 'tenant' field at Tag 1 and 'Message' at Tag 2).
|
||||
*
|
||||
* This file should be removed and replaced with standard SDK calls once the SDK
|
||||
* implements full V1 protocol support.
|
||||
*/
|
||||
|
||||
import * as grpc from '@grpc/grpc-js';
|
||||
import * as protoLoader from '@grpc/proto-loader';
|
||||
import { v4 as uuidv4 } from 'uuid';
|
||||
import type {
|
||||
Message,
|
||||
Task,
|
||||
TaskStatusUpdateEvent,
|
||||
TaskArtifactUpdateEvent,
|
||||
} from '@a2a-js/sdk';
|
||||
import { getGrpcCredentials } from './a2aUtils.js';
|
||||
|
||||
export type SendMessageResult =
|
||||
| Message
|
||||
| Task
|
||||
| TaskStatusUpdateEvent
|
||||
| TaskArtifactUpdateEvent;
|
||||
|
||||
export interface GrpcV1Service extends grpc.Client {
|
||||
SendStreamingMessage(
|
||||
request: unknown,
|
||||
): grpc.ClientReadableStream<V1StreamResponse>;
|
||||
}
|
||||
|
||||
export interface V1Part {
|
||||
text?: string | { text: string };
|
||||
}
|
||||
|
||||
export interface V1Message {
|
||||
messageId: string;
|
||||
contextId?: string;
|
||||
taskId?: string;
|
||||
role: number | string;
|
||||
parts: V1Part[];
|
||||
}
|
||||
|
||||
export interface V1StatusUpdate {
|
||||
taskId?: string;
|
||||
status?: {
|
||||
state?: number;
|
||||
message?: Message;
|
||||
};
|
||||
}
|
||||
|
||||
export interface V1StreamResponse {
|
||||
message?: V1Message;
|
||||
statusUpdate?: V1StatusUpdate;
|
||||
}
|
||||
|
||||
const packageDefinition = protoLoader.fromJSON({
|
||||
nested: {
|
||||
lf: {
|
||||
nested: {
|
||||
a2a: {
|
||||
nested: {
|
||||
v1: {
|
||||
nested: {
|
||||
A2AService: {
|
||||
methods: {
|
||||
SendStreamingMessage: {
|
||||
requestType: 'SendMessageRequest',
|
||||
responseType: 'StreamResponse',
|
||||
responseStream: true,
|
||||
comment: '',
|
||||
},
|
||||
},
|
||||
},
|
||||
|
||||
SendMessageRequest: {
|
||||
fields: {
|
||||
tenant: { type: 'string', id: 1 },
|
||||
message: { type: 'Message', id: 2 },
|
||||
},
|
||||
},
|
||||
Message: {
|
||||
fields: {
|
||||
messageId: { type: 'string', id: 1 },
|
||||
contextId: { type: 'string', id: 2 },
|
||||
taskId: { type: 'string', id: 3 },
|
||||
role: { type: 'int32', id: 4 },
|
||||
parts: { rule: 'repeated', type: 'Part', id: 5 },
|
||||
},
|
||||
},
|
||||
Part: {
|
||||
oneofs: {
|
||||
content: {
|
||||
oneof: ['text'],
|
||||
},
|
||||
},
|
||||
fields: {
|
||||
text: { type: 'string', id: 1 },
|
||||
},
|
||||
},
|
||||
StreamResponse: {
|
||||
oneofs: {
|
||||
payload: {
|
||||
oneof: [
|
||||
'task',
|
||||
'message',
|
||||
'statusUpdate',
|
||||
'artifactUpdate',
|
||||
],
|
||||
},
|
||||
},
|
||||
fields: {
|
||||
task: { type: 'Task', id: 1 },
|
||||
message: { type: 'Message', id: 2 },
|
||||
statusUpdate: {
|
||||
type: 'TaskStatusUpdateEvent',
|
||||
id: 3,
|
||||
},
|
||||
artifactUpdate: {
|
||||
type: 'TaskArtifactUpdateEvent',
|
||||
id: 4,
|
||||
},
|
||||
},
|
||||
},
|
||||
Task: {
|
||||
fields: {
|
||||
id: { type: 'string', id: 1 },
|
||||
},
|
||||
},
|
||||
TaskStatusUpdateEvent: {
|
||||
fields: {
|
||||
taskId: { type: 'string', id: 1 },
|
||||
status: { type: 'TaskStatus', id: 3 },
|
||||
},
|
||||
},
|
||||
TaskStatus: {
|
||||
fields: {
|
||||
state: { type: 'int32', id: 2 },
|
||||
},
|
||||
},
|
||||
TaskArtifactUpdateEvent: {
|
||||
fields: {
|
||||
taskId: { type: 'string', id: 1 },
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
});
|
||||
|
||||
// eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion
|
||||
const proto = grpc.loadPackageDefinition(packageDefinition) as unknown as {
|
||||
lf: {
|
||||
a2a: {
|
||||
v1: {
|
||||
A2AService: new (
|
||||
url: string,
|
||||
creds: grpc.ChannelCredentials,
|
||||
) => GrpcV1Service;
|
||||
};
|
||||
};
|
||||
};
|
||||
};
|
||||
|
||||
/**
|
||||
* Direct gRPC implementation for A2A V1 agents.
|
||||
* Bypasses SDK limitations for V1 protocol specifics.
|
||||
*/
|
||||
export async function* sendV1MessageStream(
|
||||
url: string,
|
||||
message: string,
|
||||
options?: { contextId?: string; taskId?: string; signal?: AbortSignal },
|
||||
): AsyncIterable<SendMessageResult> {
|
||||
const client = new proto.lf.a2a.v1.A2AService(url, getGrpcCredentials(url));
|
||||
|
||||
const request = {
|
||||
tenant: '',
|
||||
message: {
|
||||
messageId: uuidv4(),
|
||||
contextId: options?.contextId || '',
|
||||
taskId: options?.taskId || '',
|
||||
role: 1, // USER
|
||||
parts: [{ text: message }],
|
||||
},
|
||||
};
|
||||
|
||||
const call = client.SendStreamingMessage(request);
|
||||
|
||||
const queue: SendMessageResult[] = [];
|
||||
let done = false;
|
||||
let error: Error | null = null;
|
||||
let resolveNext: (() => void) | null = null;
|
||||
|
||||
call.on('data', (data: V1StreamResponse) => {
|
||||
// Map the V1 response back to the SDK's expected format.
|
||||
const msg = data.message || data.statusUpdate?.status?.message;
|
||||
|
||||
if (msg) {
|
||||
queue.push({
|
||||
kind: 'message',
|
||||
id: msg.messageId,
|
||||
messageId: msg.messageId,
|
||||
role: 'agent',
|
||||
parts:
|
||||
msg.parts?.map((p: V1Part) => ({
|
||||
kind: 'text',
|
||||
text: typeof p.text === 'string' ? p.text : p.text?.text || '',
|
||||
})) || [],
|
||||
} as Message);
|
||||
} else if (data.statusUpdate) {
|
||||
queue.push({
|
||||
kind: 'status-update',
|
||||
taskId: data.statusUpdate.taskId || '',
|
||||
contextId: options?.contextId || '',
|
||||
final: false,
|
||||
status: {
|
||||
state:
|
||||
data.statusUpdate.status?.state === 2 ? 'completed' : 'working',
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
if (resolveNext) resolveNext();
|
||||
});
|
||||
|
||||
call.on('error', (err: Error) => {
|
||||
error = err;
|
||||
done = true;
|
||||
if (resolveNext) resolveNext();
|
||||
});
|
||||
|
||||
call.on('end', () => {
|
||||
done = true;
|
||||
if (resolveNext) resolveNext();
|
||||
});
|
||||
|
||||
if (options?.signal) {
|
||||
options.signal.addEventListener('abort', () => {
|
||||
call.cancel();
|
||||
});
|
||||
}
|
||||
|
||||
while (!done || queue.length > 0) {
|
||||
if (queue.length === 0 && !done) {
|
||||
await new Promise<void>((r) => (resolveNext = r));
|
||||
resolveNext = null;
|
||||
}
|
||||
if (error) {
|
||||
throw error;
|
||||
}
|
||||
while (queue.length > 0) {
|
||||
yield queue.shift()!;
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user