chore(a2a-server): refactor a2a-server src directory (#7593)

This commit is contained in:
Sam McCauley
2025-09-03 10:24:48 -04:00
committed by GitHub
parent 1ca388019f
commit ab751618f4
17 changed files with 253 additions and 229 deletions
@@ -0,0 +1,342 @@
/**
* @license
* Copyright 2025 Google LLC
* SPDX-License-Identifier: Apache-2.0
*/
import { Storage } from '@google-cloud/storage';
import * as fse from 'fs-extra';
import { promises as fsPromises, createReadStream } from 'node:fs';
import * as tar from 'tar';
import { gzipSync, gunzipSync } from 'node:zlib';
import { v4 as uuidv4 } from 'uuid';
import type { Task as SDKTask } from '@a2a-js/sdk';
import type { TaskStore } from '@a2a-js/sdk/server';
import type { Mocked, MockedClass, Mock } from 'vitest';
import { describe, it, expect, beforeEach, vi } from 'vitest';
import { GCSTaskStore, NoOpTaskStore } from './gcs.js';
import { logger } from '../utils/logger.js';
import * as configModule from '../config/config.js';
import * as metadataModule from '../metadata_types.js';
// Mock dependencies
vi.mock('@google-cloud/storage');
vi.mock('fs-extra', () => ({
pathExists: vi.fn(),
readdir: vi.fn(),
remove: vi.fn(),
ensureDir: vi.fn(),
}));
vi.mock('node:fs', async () => {
const actual = await vi.importActual<typeof import('node:fs')>('node:fs');
return {
...actual,
promises: {
...actual.promises,
readdir: vi.fn(),
},
createReadStream: vi.fn(),
};
});
vi.mock('tar');
vi.mock('zlib');
vi.mock('uuid');
vi.mock('../utils/logger.js', () => ({
logger: {
info: vi.fn(),
warn: vi.fn(),
error: vi.fn(),
debug: vi.fn(),
},
}));
vi.mock('../config/config.js', () => ({
setTargetDir: vi.fn(),
}));
vi.mock('../metadata_types');
vi.mock('node:stream/promises', () => ({
pipeline: vi.fn(),
}));
const mockStorage = Storage as MockedClass<typeof Storage>;
const mockFse = fse as Mocked<typeof fse>;
const mockCreateReadStream = createReadStream as Mock;
const mockTar = tar as Mocked<typeof tar>;
const mockGzipSync = gzipSync as Mock;
const mockGunzipSync = gunzipSync as Mock;
const mockUuidv4 = uuidv4 as Mock;
const mockSetTargetDir = configModule.setTargetDir as Mock;
const mockGetPersistedState = metadataModule.getPersistedState as Mock;
const METADATA_KEY = metadataModule.METADATA_KEY || '__persistedState';
type MockWriteStream = {
on: Mock<
(event: string, cb: (error?: Error | null) => void) => MockWriteStream
>;
destroy: Mock<() => void>;
destroyed: boolean;
};
type MockFile = {
save: Mock<(data: Buffer | string) => Promise<void>>;
download: Mock<() => Promise<[Buffer]>>;
exists: Mock<() => Promise<[boolean]>>;
createWriteStream: Mock<() => MockWriteStream>;
};
type MockBucket = {
exists: Mock<() => Promise<[boolean]>>;
file: Mock<(path: string) => MockFile>;
name: string;
};
type MockStorageInstance = {
bucket: Mock<(name: string) => MockBucket>;
getBuckets: Mock<() => Promise<[Array<{ name: string }>]>>;
createBucket: Mock<(name: string) => Promise<[MockBucket]>>;
};
describe('GCSTaskStore', () => {
let bucketName: string;
let mockBucket: MockBucket;
let mockFile: MockFile;
let mockWriteStream: MockWriteStream;
let mockStorageInstance: MockStorageInstance;
beforeEach(() => {
vi.clearAllMocks();
bucketName = 'test-bucket';
mockWriteStream = {
on: vi.fn((event, cb) => {
if (event === 'finish') setTimeout(cb, 0); // Simulate async finish
return mockWriteStream;
}),
destroy: vi.fn(),
destroyed: false,
};
mockFile = {
save: vi.fn().mockResolvedValue(undefined),
download: vi.fn().mockResolvedValue([Buffer.from('')]),
exists: vi.fn().mockResolvedValue([true]),
createWriteStream: vi.fn().mockReturnValue(mockWriteStream),
};
mockBucket = {
exists: vi.fn().mockResolvedValue([true]),
file: vi.fn().mockReturnValue(mockFile),
name: bucketName,
};
mockStorageInstance = {
bucket: vi.fn().mockReturnValue(mockBucket),
getBuckets: vi.fn().mockResolvedValue([[{ name: bucketName }]]),
createBucket: vi.fn().mockResolvedValue([mockBucket]),
};
mockStorage.mockReturnValue(mockStorageInstance as unknown as Storage);
mockUuidv4.mockReturnValue('test-uuid');
mockSetTargetDir.mockReturnValue('/tmp/workdir');
mockGetPersistedState.mockReturnValue({
_agentSettings: {},
_taskState: 'submitted',
});
(fse.pathExists as Mock).mockResolvedValue(true);
(fsPromises.readdir as Mock).mockResolvedValue(['file1.txt']);
mockTar.c.mockResolvedValue(undefined);
mockTar.x.mockResolvedValue(undefined);
mockFse.remove.mockResolvedValue(undefined);
mockFse.ensureDir.mockResolvedValue(undefined);
mockGzipSync.mockReturnValue(Buffer.from('compressed'));
mockGunzipSync.mockReturnValue(Buffer.from('{}'));
mockCreateReadStream.mockReturnValue({ on: vi.fn(), pipe: vi.fn() });
});
describe('Constructor & Initialization', () => {
it('should initialize and check bucket existence', async () => {
const store = new GCSTaskStore(bucketName);
await store['ensureBucketInitialized']();
expect(mockStorage).toHaveBeenCalledTimes(1);
expect(mockStorageInstance.getBuckets).toHaveBeenCalled();
expect(logger.info).toHaveBeenCalledWith(
expect.stringContaining('Bucket test-bucket exists'),
);
});
it('should create bucket if it does not exist', async () => {
mockStorageInstance.getBuckets.mockResolvedValue([[]]);
const store = new GCSTaskStore(bucketName);
await store['ensureBucketInitialized']();
expect(mockStorageInstance.createBucket).toHaveBeenCalledWith(bucketName);
expect(logger.info).toHaveBeenCalledWith(
expect.stringContaining('Bucket test-bucket created successfully'),
);
});
it('should throw if bucket creation fails', async () => {
mockStorageInstance.getBuckets.mockResolvedValue([[]]);
mockStorageInstance.createBucket.mockRejectedValue(
new Error('Create failed'),
);
const store = new GCSTaskStore(bucketName);
await expect(store['ensureBucketInitialized']()).rejects.toThrow(
'Failed to create GCS bucket test-bucket: Error: Create failed',
);
});
});
describe('save', () => {
const mockTask: SDKTask = {
id: 'task1',
contextId: 'ctx1',
kind: 'task',
status: { state: 'working' },
metadata: {},
};
it('should save metadata and workspace', async () => {
const store = new GCSTaskStore(bucketName);
await store.save(mockTask);
expect(mockFile.save).toHaveBeenCalledTimes(1);
expect(mockTar.c).toHaveBeenCalledTimes(1);
expect(mockCreateReadStream).toHaveBeenCalledTimes(1);
expect(mockFse.remove).toHaveBeenCalledTimes(1);
expect(logger.info).toHaveBeenCalledWith(
expect.stringContaining('metadata saved to GCS'),
);
expect(logger.info).toHaveBeenCalledWith(
expect.stringContaining('workspace saved to GCS'),
);
});
it('should handle tar creation failure', async () => {
mockFse.pathExists.mockImplementation(
async (path) =>
!path.toString().includes('task-task1-workspace-test-uuid.tar.gz'),
);
const store = new GCSTaskStore(bucketName);
await expect(store.save(mockTask)).rejects.toThrow(
'tar.c command failed to create',
);
});
});
describe('load', () => {
it('should load task metadata and workspace', async () => {
mockGunzipSync.mockReturnValue(
Buffer.from(
JSON.stringify({
[METADATA_KEY]: { _agentSettings: {}, _taskState: 'submitted' },
_contextId: 'ctx1',
}),
),
);
mockFile.download.mockResolvedValue([Buffer.from('compressed metadata')]);
mockFile.download.mockResolvedValueOnce([
Buffer.from('compressed metadata'),
]);
mockBucket.file = vi.fn((path) => {
const newMockFile = { ...mockFile };
if (path.includes('metadata')) {
newMockFile.download = vi
.fn()
.mockResolvedValue([Buffer.from('compressed metadata')]);
newMockFile.exists = vi.fn().mockResolvedValue([true]);
} else {
newMockFile.download = vi
.fn()
.mockResolvedValue([Buffer.from('compressed workspace')]);
newMockFile.exists = vi.fn().mockResolvedValue([true]);
}
return newMockFile;
});
const store = new GCSTaskStore(bucketName);
const task = await store.load('task1');
expect(task).toBeDefined();
expect(task?.id).toBe('task1');
expect(mockBucket.file).toHaveBeenCalledWith(
'tasks/task1/metadata.tar.gz',
);
expect(mockBucket.file).toHaveBeenCalledWith(
'tasks/task1/workspace.tar.gz',
);
expect(mockTar.x).toHaveBeenCalledTimes(1);
expect(mockFse.remove).toHaveBeenCalledTimes(1);
});
it('should return undefined if metadata not found', async () => {
mockFile.exists.mockResolvedValue([false]);
const store = new GCSTaskStore(bucketName);
const task = await store.load('task1');
expect(task).toBeUndefined();
expect(mockBucket.file).toHaveBeenCalledWith(
'tasks/task1/metadata.tar.gz',
);
});
it('should load metadata even if workspace not found', async () => {
mockGunzipSync.mockReturnValue(
Buffer.from(
JSON.stringify({
[METADATA_KEY]: { _agentSettings: {}, _taskState: 'submitted' },
_contextId: 'ctx1',
}),
),
);
mockBucket.file = vi.fn((path) => {
const newMockFile = { ...mockFile };
if (path.includes('workspace.tar.gz')) {
newMockFile.exists = vi.fn().mockResolvedValue([false]);
} else {
newMockFile.exists = vi.fn().mockResolvedValue([true]);
newMockFile.download = vi
.fn()
.mockResolvedValue([Buffer.from('compressed metadata')]);
}
return newMockFile;
});
const store = new GCSTaskStore(bucketName);
const task = await store.load('task1');
expect(task).toBeDefined();
expect(mockTar.x).not.toHaveBeenCalled();
expect(logger.info).toHaveBeenCalledWith(
expect.stringContaining('workspace archive not found'),
);
});
});
});
describe('NoOpTaskStore', () => {
let realStore: TaskStore;
let noOpStore: NoOpTaskStore;
beforeEach(() => {
// Create a mock of the real store to delegate to
realStore = {
save: vi.fn(),
load: vi.fn().mockResolvedValue({ id: 'task-123' } as SDKTask),
};
noOpStore = new NoOpTaskStore(realStore);
});
it("should not call the real store's save method", async () => {
const mockTask: SDKTask = { id: 'test-task' } as SDKTask;
await noOpStore.save(mockTask);
expect(realStore.save).not.toHaveBeenCalled();
});
it('should delegate the load method to the real store', async () => {
const taskId = 'task-123';
const result = await noOpStore.load(taskId);
expect(realStore.load).toHaveBeenCalledWith(taskId);
expect(result).toBeDefined();
expect(result?.id).toBe(taskId);
});
});
+308
View File
@@ -0,0 +1,308 @@
/**
* @license
* Copyright 2025 Google LLC
* SPDX-License-Identifier: Apache-2.0
*/
import { Storage } from '@google-cloud/storage';
import { gzipSync, gunzipSync } from 'node:zlib';
import * as tar from 'tar';
import * as fse from 'fs-extra';
import { promises as fsPromises, createReadStream } from 'node:fs';
import { tmpdir } from 'node:os';
import { join } from 'node:path';
import type { Task as SDKTask } from '@a2a-js/sdk';
import type { TaskStore } from '@a2a-js/sdk/server';
import { logger } from '../utils/logger.js';
import { setTargetDir } from '../config/config.js';
import {
getPersistedState,
type PersistedTaskMetadata,
} from '../metadata_types.js';
import { v4 as uuidv4 } from 'uuid';
type ObjectType = 'metadata' | 'workspace';
const getTmpArchiveFilename = (taskId: string): string =>
`task-${taskId}-workspace-${uuidv4()}.tar.gz`;
export class GCSTaskStore implements TaskStore {
private storage: Storage;
private bucketName: string;
private bucketInitialized: Promise<void>;
constructor(bucketName: string) {
if (!bucketName) {
throw new Error('GCS bucket name is required.');
}
this.storage = new Storage();
this.bucketName = bucketName;
logger.info(`GCSTaskStore initializing with bucket: ${this.bucketName}`);
// Prerequisites: user account or service account must have storage admin IAM role
// and the bucket name must be unique.
this.bucketInitialized = this.initializeBucket();
}
private async initializeBucket(): Promise<void> {
try {
const [buckets] = await this.storage.getBuckets();
const exists = buckets.some((bucket) => bucket.name === this.bucketName);
if (!exists) {
logger.info(
`Bucket ${this.bucketName} does not exist in the list. Attempting to create...`,
);
try {
await this.storage.createBucket(this.bucketName);
logger.info(`Bucket ${this.bucketName} created successfully.`);
} catch (createError) {
logger.info(
`Failed to create bucket ${this.bucketName}: ${createError}`,
);
throw new Error(
`Failed to create GCS bucket ${this.bucketName}: ${createError}`,
);
}
} else {
logger.info(`Bucket ${this.bucketName} exists.`);
}
} catch (error) {
logger.info(
`Error during bucket initialization for ${this.bucketName}: ${error}`,
);
throw new Error(
`Failed to initialize GCS bucket ${this.bucketName}: ${error}`,
);
}
}
private async ensureBucketInitialized(): Promise<void> {
await this.bucketInitialized;
}
private getObjectPath(taskId: string, type: ObjectType): string {
return `tasks/${taskId}/${type}.tar.gz`;
}
async save(task: SDKTask): Promise<void> {
await this.ensureBucketInitialized();
const taskId = task.id;
const persistedState = getPersistedState(
task.metadata as PersistedTaskMetadata,
);
if (!persistedState) {
throw new Error(`Task ${taskId} is missing persisted state in metadata.`);
}
const workDir = process.cwd();
const metadataObjectPath = this.getObjectPath(taskId, 'metadata');
const workspaceObjectPath = this.getObjectPath(taskId, 'workspace');
const dataToStore = task.metadata;
try {
const jsonString = JSON.stringify(dataToStore);
const compressedMetadata = gzipSync(Buffer.from(jsonString));
const metadataFile = this.storage
.bucket(this.bucketName)
.file(metadataObjectPath);
await metadataFile.save(compressedMetadata, {
contentType: 'application/gzip',
});
logger.info(
`Task ${taskId} metadata saved to GCS: gs://${this.bucketName}/${metadataObjectPath}`,
);
if (await fse.pathExists(workDir)) {
const entries = await fsPromises.readdir(workDir);
if (entries.length > 0) {
const tmpArchiveFile = join(tmpdir(), getTmpArchiveFilename(taskId));
try {
await tar.c(
{
gzip: true,
file: tmpArchiveFile,
cwd: workDir,
portable: true,
},
entries,
);
if (!(await fse.pathExists(tmpArchiveFile))) {
throw new Error(
`tar.c command failed to create ${tmpArchiveFile}`,
);
}
const workspaceFile = this.storage
.bucket(this.bucketName)
.file(workspaceObjectPath);
const sourceStream = createReadStream(tmpArchiveFile);
const destStream = workspaceFile.createWriteStream({
contentType: 'application/gzip',
resumable: true,
});
await new Promise<void>((resolve, reject) => {
sourceStream.on('error', (err) => {
logger.error(
`Error in source stream for ${tmpArchiveFile}:`,
err,
);
// Attempt to close destStream if source fails
if (!destStream.destroyed) {
destStream.destroy(err);
}
reject(err);
});
destStream.on('error', (err) => {
logger.error(
`Error in GCS dest stream for ${workspaceObjectPath}:`,
err,
);
reject(err);
});
destStream.on('finish', () => {
logger.info(
`GCS destStream finished for ${workspaceObjectPath}`,
);
resolve();
});
logger.info(
`Piping ${tmpArchiveFile} to GCS object ${workspaceObjectPath}`,
);
sourceStream.pipe(destStream);
});
logger.info(
`Task ${taskId} workspace saved to GCS: gs://${this.bucketName}/${workspaceObjectPath}`,
);
} catch (error) {
logger.error(
`Error during workspace save process for ${taskId}:`,
error,
);
throw error;
} finally {
logger.info(`Cleaning up temporary file: ${tmpArchiveFile}`);
try {
if (await fse.pathExists(tmpArchiveFile)) {
await fse.remove(tmpArchiveFile);
logger.info(
`Successfully removed temporary file: ${tmpArchiveFile}`,
);
} else {
logger.warn(
`Temporary file not found for cleanup: ${tmpArchiveFile}`,
);
}
} catch (removeError) {
logger.error(
`Error removing temporary file ${tmpArchiveFile}:`,
removeError,
);
}
}
} else {
logger.info(
`Workspace directory ${workDir} is empty, skipping workspace save for task ${taskId}.`,
);
}
} else {
logger.info(
`Workspace directory ${workDir} not found, skipping workspace save for task ${taskId}.`,
);
}
} catch (error) {
logger.error(`Failed to save task ${taskId} to GCS:`, error);
throw error;
}
}
async load(taskId: string): Promise<SDKTask | undefined> {
await this.ensureBucketInitialized();
const metadataObjectPath = this.getObjectPath(taskId, 'metadata');
const workspaceObjectPath = this.getObjectPath(taskId, 'workspace');
try {
const metadataFile = this.storage
.bucket(this.bucketName)
.file(metadataObjectPath);
const [metadataExists] = await metadataFile.exists();
if (!metadataExists) {
logger.info(`Task ${taskId} metadata not found in GCS.`);
return undefined;
}
const [compressedMetadata] = await metadataFile.download();
const jsonData = gunzipSync(compressedMetadata).toString();
const loadedMetadata = JSON.parse(jsonData);
logger.info(`Task ${taskId} metadata loaded from GCS.`);
const persistedState = getPersistedState(loadedMetadata);
if (!persistedState) {
throw new Error(
`Loaded metadata for task ${taskId} is missing internal persisted state.`,
);
}
const agentSettings = persistedState._agentSettings;
const workDir = setTargetDir(agentSettings);
await fse.ensureDir(workDir);
const workspaceFile = this.storage
.bucket(this.bucketName)
.file(workspaceObjectPath);
const [workspaceExists] = await workspaceFile.exists();
if (workspaceExists) {
const tmpArchiveFile = join(tmpdir(), getTmpArchiveFilename(taskId));
try {
await workspaceFile.download({ destination: tmpArchiveFile });
await tar.x({ file: tmpArchiveFile, cwd: workDir });
logger.info(
`Task ${taskId} workspace restored from GCS to ${workDir}`,
);
} finally {
if (await fse.pathExists(tmpArchiveFile)) {
await fse.remove(tmpArchiveFile);
}
}
} else {
logger.info(`Task ${taskId} workspace archive not found in GCS.`);
}
return {
id: taskId,
contextId: loadedMetadata._contextId || uuidv4(),
kind: 'task',
status: {
state: persistedState._taskState,
timestamp: new Date().toISOString(),
},
metadata: loadedMetadata,
history: [],
artifacts: [],
};
} catch (error) {
logger.error(`Failed to load task ${taskId} from GCS:`, error);
throw error;
}
}
}
export class NoOpTaskStore implements TaskStore {
constructor(private realStore: TaskStore) {}
async save(task: SDKTask): Promise<void> {
logger.info(`[NoOpTaskStore] save called for task ${task.id} - IGNORED`);
return Promise.resolve();
}
async load(taskId: string): Promise<SDKTask | undefined> {
logger.info(
`[NoOpTaskStore] load called for task ${taskId}, delegating to real store.`,
);
return this.realStore.load(taskId);
}
}