From 60b01761b8a2cf1a3a1b12548ce7499e9f5510b7 Mon Sep 17 00:00:00 2001 From: Adam Weidman Date: Thu, 12 Feb 2026 17:17:08 -0500 Subject: [PATCH] feat: add GCS-backed session and conversation persistence MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Enable session resumability across Cloud Run restarts: - executor.ts: Save conversation history in task metadata during toSDKTask(), restore via setHistory() in reconstruct() - gcs.ts: Persist conversation history as separate GCS object (conversation.tar.gz) alongside metadata and workspace - session-store.ts: Add optional GCS-backed persistence with periodic flush and restore-on-startup for thread→session mappings - handler.ts: Restore persisted sessions on initialize() - types.ts: Add gcsBucket to ChatBridgeConfig - app.ts: Pass GCS_BUCKET_NAME to chat bridge config Validated end-to-end: message persists, Cloud Run restarts, follow-up message in same thread correctly recalls prior context. Different threads maintain isolation. --- packages/a2a-server/src/agent/executor.ts | 38 +++++- .../a2a-server/src/chat-bridge/handler.ts | 5 +- .../src/chat-bridge/session-store.ts | 128 +++++++++++++++++- packages/a2a-server/src/chat-bridge/types.ts | 2 + packages/a2a-server/src/http/app.ts | 1 + packages/a2a-server/src/persistence/gcs.ts | 47 ++++++- 6 files changed, 216 insertions(+), 5 deletions(-) diff --git a/packages/a2a-server/src/agent/executor.ts b/packages/a2a-server/src/agent/executor.ts index 9cde587118..c672954f99 100644 --- a/packages/a2a-server/src/agent/executor.ts +++ b/packages/a2a-server/src/agent/executor.ts @@ -77,6 +77,24 @@ class TaskWrapper { artifacts: [], }; sdkTask.metadata!['_contextId'] = this.task.contextId; + + // Persist conversation history for session resumability. + // GCSTaskStore saves this as a separate object and restores it on load. + try { + const conversationHistory = this.task.geminiClient.getHistory(); + if (conversationHistory.length > 0) { + sdkTask.metadata!['_conversationHistory'] = conversationHistory; + logger.info( + `Task ${this.task.id}: Persisting ${conversationHistory.length} conversation history entries.`, + ); + } + } catch { + // GeminiClient may not be initialized yet + logger.warn( + `Task ${this.task.id}: Could not get conversation history for persistence.`, + ); + } + return sdkTask; } } @@ -131,7 +149,25 @@ export class CoderAgentExecutor implements AgentExecutor { agentSettings.autoExecute, ); runtimeTask.taskState = persistedState._taskState; - await runtimeTask.geminiClient.initialize(); + + // Restore conversation history if available from the TaskStore. + // This enables session resumability — the LLM gets full context of + // prior interactions rather than starting with a blank slate. + const conversationHistory = metadata['_conversationHistory']; + if (Array.isArray(conversationHistory) && conversationHistory.length > 0) { + logger.info( + `Task ${sdkTask.id}: Resuming with ${conversationHistory.length} conversation history entries.`, + ); + // History was serialized from GeminiClient.getHistory() which returns + // Content[]. After JSON round-trip it's structurally identical. + await runtimeTask.geminiClient.initialize(); + runtimeTask.geminiClient.setHistory( + + conversationHistory, + ); + } else { + await runtimeTask.geminiClient.initialize(); + } const wrapper = new TaskWrapper(runtimeTask, agentSettings); this.tasks.set(sdkTask.id, wrapper); diff --git a/packages/a2a-server/src/chat-bridge/handler.ts b/packages/a2a-server/src/chat-bridge/handler.ts index 5d518705fe..a6ffdacd65 100644 --- a/packages/a2a-server/src/chat-bridge/handler.ts +++ b/packages/a2a-server/src/chat-bridge/handler.ts @@ -25,17 +25,18 @@ export class ChatBridgeHandler { private initialized = false; constructor(private config: ChatBridgeConfig) { - this.sessionStore = new SessionStore(); + this.sessionStore = new SessionStore(config.gcsBucket); this.a2aClient = new A2ABridgeClient(config.a2aServerUrl); } /** - * Initializes the A2A client connection. + * Initializes the A2A client connection and restores persisted sessions. * Must be called before handling events. */ async initialize(): Promise { if (this.initialized) return; await this.a2aClient.initialize(); + await this.sessionStore.restore(); this.initialized = true; logger.info( `[ChatBridge] Handler initialized, connected to ${this.config.a2aServerUrl}`, diff --git a/packages/a2a-server/src/chat-bridge/session-store.ts b/packages/a2a-server/src/chat-bridge/session-store.ts index ee18382aed..ec6dae46df 100644 --- a/packages/a2a-server/src/chat-bridge/session-store.ts +++ b/packages/a2a-server/src/chat-bridge/session-store.ts @@ -8,6 +8,9 @@ * Manages mapping between Google Chat threads and A2A sessions. * Each Google Chat thread maintains a persistent contextId (conversation) * and a transient taskId (active task within that conversation). + * + * Supports optional GCS persistence so session mappings survive + * Cloud Run instance restarts. */ import { v4 as uuidv4 } from 'uuid'; @@ -36,11 +39,111 @@ export interface SessionInfo { yoloMode?: boolean; } +/** Serializable subset of SessionInfo for GCS persistence. */ +interface PersistedSession { + contextId: string; + taskId?: string; + spaceName: string; + threadName: string; + lastActivity: number; + yoloMode?: boolean; +} + /** - * In-memory session store mapping Google Chat thread names to A2A sessions. + * Session store mapping Google Chat thread names to A2A sessions. + * Optionally backed by GCS for persistence across restarts. */ export class SessionStore { private sessions = new Map(); + private gcsBucket?: string; + private gcsObjectPath = 'chat-bridge/sessions.json'; + private dirty = false; + private flushTimer?: ReturnType; + + constructor(gcsBucket?: string) { + this.gcsBucket = gcsBucket; + if (gcsBucket) { + // Flush to GCS every 30 seconds if dirty + this.flushTimer = setInterval(() => { + if (this.dirty) { + this.persistToGCS().catch((err) => + logger.warn(`[ChatBridge] GCS session flush failed:`, err), + ); + } + }, 30000); + } + } + + /** + * Restores sessions from GCS on startup. + */ + async restore(): Promise { + if (!this.gcsBucket) return; + + try { + const { Storage } = await import('@google-cloud/storage'); + const storage = new Storage(); + const file = storage.bucket(this.gcsBucket).file(this.gcsObjectPath); + const [exists] = await file.exists(); + if (!exists) { + logger.info('[ChatBridge] No persisted sessions found in GCS.'); + return; + } + + const [contents] = await file.download(); + const persisted: PersistedSession[] = JSON.parse(contents.toString()); + for (const s of persisted) { + this.sessions.set(s.threadName, { + contextId: s.contextId, + taskId: s.taskId, + spaceName: s.spaceName, + threadName: s.threadName, + lastActivity: s.lastActivity, + yoloMode: s.yoloMode, + }); + } + logger.info( + `[ChatBridge] Restored ${persisted.length} sessions from GCS.`, + ); + } catch (err) { + logger.warn(`[ChatBridge] Could not restore sessions from GCS:`, err); + } + } + + /** + * Persists current sessions to GCS. + */ + private async persistToGCS(): Promise { + if (!this.gcsBucket) return; + + try { + const { Storage } = await import('@google-cloud/storage'); + const storage = new Storage(); + const file = storage.bucket(this.gcsBucket).file(this.gcsObjectPath); + + const persisted: PersistedSession[] = []; + for (const session of this.sessions.values()) { + persisted.push({ + contextId: session.contextId, + taskId: session.taskId, + spaceName: session.spaceName, + threadName: session.threadName, + lastActivity: session.lastActivity, + yoloMode: session.yoloMode, + }); + } + + await file.save(JSON.stringify(persisted), { + contentType: 'application/json', + }); + this.dirty = false; + logger.info( + `[ChatBridge] Persisted ${persisted.length} sessions to GCS.`, + ); + } catch (err) { + logger.warn(`[ChatBridge] Failed to persist sessions to GCS:`, err); + } + } /** * Gets or creates a session for a Google Chat thread. @@ -55,6 +158,7 @@ export class SessionStore { lastActivity: Date.now(), }; this.sessions.set(threadName, session); + this.dirty = true; logger.info( `[ChatBridge] New session for thread ${threadName}: contextId=${session.contextId}`, ); @@ -77,6 +181,7 @@ export class SessionStore { const session = this.sessions.get(threadName); if (session) { session.taskId = taskId; + this.dirty = true; logger.info( `[ChatBridge] Session ${threadName}: taskId=${taskId ?? 'cleared'}`, ); @@ -88,6 +193,7 @@ export class SessionStore { */ remove(threadName: string): void { this.sessions.delete(threadName); + this.dirty = true; } /** @@ -98,8 +204,28 @@ export class SessionStore { for (const [threadName, session] of this.sessions.entries()) { if (now - session.lastActivity > maxAgeMs) { this.sessions.delete(threadName); + this.dirty = true; logger.info(`[ChatBridge] Cleaned up stale session: ${threadName}`); } } } + + /** + * Forces an immediate flush to GCS. + */ + async flush(): Promise { + if (this.dirty) { + await this.persistToGCS(); + } + } + + /** + * Stops the periodic flush timer. + */ + dispose(): void { + if (this.flushTimer) { + clearInterval(this.flushTimer); + this.flushTimer = undefined; + } + } } diff --git a/packages/a2a-server/src/chat-bridge/types.ts b/packages/a2a-server/src/chat-bridge/types.ts index 7ef3daa24f..b38ee90991 100644 --- a/packages/a2a-server/src/chat-bridge/types.ts +++ b/packages/a2a-server/src/chat-bridge/types.ts @@ -136,4 +136,6 @@ export interface ChatBridgeConfig { projectNumber?: string; /** Whether to enable debug logging */ debug?: boolean; + /** GCS bucket name for session persistence (optional) */ + gcsBucket?: string; } diff --git a/packages/a2a-server/src/http/app.ts b/packages/a2a-server/src/http/app.ts index 2253cb55e5..1af6da3835 100644 --- a/packages/a2a-server/src/http/app.ts +++ b/packages/a2a-server/src/http/app.ts @@ -217,6 +217,7 @@ export async function createApp() { a2aServerUrl: chatBridgeUrl, projectNumber: process.env['CHAT_PROJECT_NUMBER'], debug: process.env['CHAT_BRIDGE_DEBUG'] === 'true', + gcsBucket: process.env['GCS_BUCKET_NAME'], }); expressApp.use(chatRoutes); logger.info( diff --git a/packages/a2a-server/src/persistence/gcs.ts b/packages/a2a-server/src/persistence/gcs.ts index ec6b86e56a..2747bc054b 100644 --- a/packages/a2a-server/src/persistence/gcs.ts +++ b/packages/a2a-server/src/persistence/gcs.ts @@ -18,7 +18,7 @@ import { setTargetDir } from '../config/config.js'; import { getPersistedState, type PersistedTaskMetadata } from '../types.js'; import { v4 as uuidv4 } from 'uuid'; -type ObjectType = 'metadata' | 'workspace'; +type ObjectType = 'metadata' | 'workspace' | 'conversation'; const getTmpArchiveFilename = (taskId: string): string => `task-${taskId}-workspace-${uuidv4()}.tar.gz`; @@ -224,6 +224,28 @@ export class GCSTaskStore implements TaskStore { `Workspace directory ${workDir} not found, skipping workspace save for task ${taskId}.`, ); } + // Save conversation history if present in metadata + const rawHistory = dataToStore?.['_conversationHistory']; + const conversationHistory = Array.isArray(rawHistory) + ? rawHistory + : undefined; + if (conversationHistory && conversationHistory.length > 0) { + const conversationObjectPath = this.getObjectPath( + taskId, + 'conversation', + ); + const historyJson = JSON.stringify(conversationHistory); + const compressedHistory = gzipSync(Buffer.from(historyJson)); + const conversationFile = this.storage + .bucket(this.bucketName) + .file(conversationObjectPath); + await conversationFile.save(compressedHistory, { + contentType: 'application/gzip', + }); + logger.info( + `Task ${taskId} conversation history saved to GCS: gs://${this.bucketName}/${conversationObjectPath} (${conversationHistory.length} entries)`, + ); + } } catch (error) { logger.error(`Failed to save task ${taskId} to GCS:`, error); throw error; @@ -280,6 +302,29 @@ export class GCSTaskStore implements TaskStore { logger.info(`Task ${taskId} workspace archive not found in GCS.`); } + // Restore conversation history if available + const conversationObjectPath = this.getObjectPath(taskId, 'conversation'); + const conversationFile = this.storage + .bucket(this.bucketName) + .file(conversationObjectPath); + const [conversationExists] = await conversationFile.exists(); + if (conversationExists) { + try { + const [compressedHistory] = await conversationFile.download(); + const historyJson = gunzipSync(compressedHistory).toString(); + const conversationHistory: unknown[] = JSON.parse(historyJson); + loadedMetadata['_conversationHistory'] = conversationHistory; + logger.info( + `Task ${taskId} conversation history restored from GCS (${conversationHistory.length} entries)`, + ); + } catch (historyError) { + logger.warn( + `Task ${taskId} conversation history could not be restored:`, + historyError, + ); + } + } + return { id: taskId, contextId: loadedMetadata._contextId || uuidv4(),