feat: add GCS-backed session and conversation persistence

Enable session resumability across Cloud Run restarts:

- executor.ts: Save conversation history in task metadata during
  toSDKTask(), restore via setHistory() in reconstruct()
- gcs.ts: Persist conversation history as separate GCS object
  (conversation.tar.gz) alongside metadata and workspace
- session-store.ts: Add optional GCS-backed persistence with periodic
  flush and restore-on-startup for thread→session mappings
- handler.ts: Restore persisted sessions on initialize()
- types.ts: Add gcsBucket to ChatBridgeConfig
- app.ts: Pass GCS_BUCKET_NAME to chat bridge config

Validated end-to-end: message persists, Cloud Run restarts, follow-up
message in same thread correctly recalls prior context. Different
threads maintain isolation.
This commit is contained in:
Adam Weidman
2026-02-12 17:17:08 -05:00
parent df81bfe1f2
commit 60b01761b8
6 changed files with 216 additions and 5 deletions

View File

@@ -77,6 +77,24 @@ class TaskWrapper {
artifacts: [],
};
sdkTask.metadata!['_contextId'] = this.task.contextId;
// Persist conversation history for session resumability.
// GCSTaskStore saves this as a separate object and restores it on load.
try {
const conversationHistory = this.task.geminiClient.getHistory();
if (conversationHistory.length > 0) {
sdkTask.metadata!['_conversationHistory'] = conversationHistory;
logger.info(
`Task ${this.task.id}: Persisting ${conversationHistory.length} conversation history entries.`,
);
}
} catch {
// GeminiClient may not be initialized yet
logger.warn(
`Task ${this.task.id}: Could not get conversation history for persistence.`,
);
}
return sdkTask;
}
}
@@ -131,7 +149,25 @@ export class CoderAgentExecutor implements AgentExecutor {
agentSettings.autoExecute,
);
runtimeTask.taskState = persistedState._taskState;
await runtimeTask.geminiClient.initialize();
// Restore conversation history if available from the TaskStore.
// This enables session resumability — the LLM gets full context of
// prior interactions rather than starting with a blank slate.
const conversationHistory = metadata['_conversationHistory'];
if (Array.isArray(conversationHistory) && conversationHistory.length > 0) {
logger.info(
`Task ${sdkTask.id}: Resuming with ${conversationHistory.length} conversation history entries.`,
);
// History was serialized from GeminiClient.getHistory() which returns
// Content[]. After JSON round-trip it's structurally identical.
await runtimeTask.geminiClient.initialize();
runtimeTask.geminiClient.setHistory(
conversationHistory,
);
} else {
await runtimeTask.geminiClient.initialize();
}
const wrapper = new TaskWrapper(runtimeTask, agentSettings);
this.tasks.set(sdkTask.id, wrapper);

View File

@@ -25,17 +25,18 @@ export class ChatBridgeHandler {
private initialized = false;
constructor(private config: ChatBridgeConfig) {
this.sessionStore = new SessionStore();
this.sessionStore = new SessionStore(config.gcsBucket);
this.a2aClient = new A2ABridgeClient(config.a2aServerUrl);
}
/**
* Initializes the A2A client connection.
* Initializes the A2A client connection and restores persisted sessions.
* Must be called before handling events.
*/
async initialize(): Promise<void> {
if (this.initialized) return;
await this.a2aClient.initialize();
await this.sessionStore.restore();
this.initialized = true;
logger.info(
`[ChatBridge] Handler initialized, connected to ${this.config.a2aServerUrl}`,

View File

@@ -8,6 +8,9 @@
* Manages mapping between Google Chat threads and A2A sessions.
* Each Google Chat thread maintains a persistent contextId (conversation)
* and a transient taskId (active task within that conversation).
*
* Supports optional GCS persistence so session mappings survive
* Cloud Run instance restarts.
*/
import { v4 as uuidv4 } from 'uuid';
@@ -36,11 +39,111 @@ export interface SessionInfo {
yoloMode?: boolean;
}
/** Serializable subset of SessionInfo for GCS persistence. */
interface PersistedSession {
contextId: string;
taskId?: string;
spaceName: string;
threadName: string;
lastActivity: number;
yoloMode?: boolean;
}
/**
* In-memory session store mapping Google Chat thread names to A2A sessions.
* Session store mapping Google Chat thread names to A2A sessions.
* Optionally backed by GCS for persistence across restarts.
*/
export class SessionStore {
private sessions = new Map<string, SessionInfo>();
private gcsBucket?: string;
private gcsObjectPath = 'chat-bridge/sessions.json';
private dirty = false;
private flushTimer?: ReturnType<typeof setInterval>;
constructor(gcsBucket?: string) {
this.gcsBucket = gcsBucket;
if (gcsBucket) {
// Flush to GCS every 30 seconds if dirty
this.flushTimer = setInterval(() => {
if (this.dirty) {
this.persistToGCS().catch((err) =>
logger.warn(`[ChatBridge] GCS session flush failed:`, err),
);
}
}, 30000);
}
}
/**
* Restores sessions from GCS on startup.
*/
async restore(): Promise<void> {
if (!this.gcsBucket) return;
try {
const { Storage } = await import('@google-cloud/storage');
const storage = new Storage();
const file = storage.bucket(this.gcsBucket).file(this.gcsObjectPath);
const [exists] = await file.exists();
if (!exists) {
logger.info('[ChatBridge] No persisted sessions found in GCS.');
return;
}
const [contents] = await file.download();
const persisted: PersistedSession[] = JSON.parse(contents.toString());
for (const s of persisted) {
this.sessions.set(s.threadName, {
contextId: s.contextId,
taskId: s.taskId,
spaceName: s.spaceName,
threadName: s.threadName,
lastActivity: s.lastActivity,
yoloMode: s.yoloMode,
});
}
logger.info(
`[ChatBridge] Restored ${persisted.length} sessions from GCS.`,
);
} catch (err) {
logger.warn(`[ChatBridge] Could not restore sessions from GCS:`, err);
}
}
/**
* Persists current sessions to GCS.
*/
private async persistToGCS(): Promise<void> {
if (!this.gcsBucket) return;
try {
const { Storage } = await import('@google-cloud/storage');
const storage = new Storage();
const file = storage.bucket(this.gcsBucket).file(this.gcsObjectPath);
const persisted: PersistedSession[] = [];
for (const session of this.sessions.values()) {
persisted.push({
contextId: session.contextId,
taskId: session.taskId,
spaceName: session.spaceName,
threadName: session.threadName,
lastActivity: session.lastActivity,
yoloMode: session.yoloMode,
});
}
await file.save(JSON.stringify(persisted), {
contentType: 'application/json',
});
this.dirty = false;
logger.info(
`[ChatBridge] Persisted ${persisted.length} sessions to GCS.`,
);
} catch (err) {
logger.warn(`[ChatBridge] Failed to persist sessions to GCS:`, err);
}
}
/**
* Gets or creates a session for a Google Chat thread.
@@ -55,6 +158,7 @@ export class SessionStore {
lastActivity: Date.now(),
};
this.sessions.set(threadName, session);
this.dirty = true;
logger.info(
`[ChatBridge] New session for thread ${threadName}: contextId=${session.contextId}`,
);
@@ -77,6 +181,7 @@ export class SessionStore {
const session = this.sessions.get(threadName);
if (session) {
session.taskId = taskId;
this.dirty = true;
logger.info(
`[ChatBridge] Session ${threadName}: taskId=${taskId ?? 'cleared'}`,
);
@@ -88,6 +193,7 @@ export class SessionStore {
*/
remove(threadName: string): void {
this.sessions.delete(threadName);
this.dirty = true;
}
/**
@@ -98,8 +204,28 @@ export class SessionStore {
for (const [threadName, session] of this.sessions.entries()) {
if (now - session.lastActivity > maxAgeMs) {
this.sessions.delete(threadName);
this.dirty = true;
logger.info(`[ChatBridge] Cleaned up stale session: ${threadName}`);
}
}
}
/**
* Forces an immediate flush to GCS.
*/
async flush(): Promise<void> {
if (this.dirty) {
await this.persistToGCS();
}
}
/**
* Stops the periodic flush timer.
*/
dispose(): void {
if (this.flushTimer) {
clearInterval(this.flushTimer);
this.flushTimer = undefined;
}
}
}

View File

@@ -136,4 +136,6 @@ export interface ChatBridgeConfig {
projectNumber?: string;
/** Whether to enable debug logging */
debug?: boolean;
/** GCS bucket name for session persistence (optional) */
gcsBucket?: string;
}

View File

@@ -217,6 +217,7 @@ export async function createApp() {
a2aServerUrl: chatBridgeUrl,
projectNumber: process.env['CHAT_PROJECT_NUMBER'],
debug: process.env['CHAT_BRIDGE_DEBUG'] === 'true',
gcsBucket: process.env['GCS_BUCKET_NAME'],
});
expressApp.use(chatRoutes);
logger.info(

View File

@@ -18,7 +18,7 @@ import { setTargetDir } from '../config/config.js';
import { getPersistedState, type PersistedTaskMetadata } from '../types.js';
import { v4 as uuidv4 } from 'uuid';
type ObjectType = 'metadata' | 'workspace';
type ObjectType = 'metadata' | 'workspace' | 'conversation';
const getTmpArchiveFilename = (taskId: string): string =>
`task-${taskId}-workspace-${uuidv4()}.tar.gz`;
@@ -224,6 +224,28 @@ export class GCSTaskStore implements TaskStore {
`Workspace directory ${workDir} not found, skipping workspace save for task ${taskId}.`,
);
}
// Save conversation history if present in metadata
const rawHistory = dataToStore?.['_conversationHistory'];
const conversationHistory = Array.isArray(rawHistory)
? rawHistory
: undefined;
if (conversationHistory && conversationHistory.length > 0) {
const conversationObjectPath = this.getObjectPath(
taskId,
'conversation',
);
const historyJson = JSON.stringify(conversationHistory);
const compressedHistory = gzipSync(Buffer.from(historyJson));
const conversationFile = this.storage
.bucket(this.bucketName)
.file(conversationObjectPath);
await conversationFile.save(compressedHistory, {
contentType: 'application/gzip',
});
logger.info(
`Task ${taskId} conversation history saved to GCS: gs://${this.bucketName}/${conversationObjectPath} (${conversationHistory.length} entries)`,
);
}
} catch (error) {
logger.error(`Failed to save task ${taskId} to GCS:`, error);
throw error;
@@ -280,6 +302,29 @@ export class GCSTaskStore implements TaskStore {
logger.info(`Task ${taskId} workspace archive not found in GCS.`);
}
// Restore conversation history if available
const conversationObjectPath = this.getObjectPath(taskId, 'conversation');
const conversationFile = this.storage
.bucket(this.bucketName)
.file(conversationObjectPath);
const [conversationExists] = await conversationFile.exists();
if (conversationExists) {
try {
const [compressedHistory] = await conversationFile.download();
const historyJson = gunzipSync(compressedHistory).toString();
const conversationHistory: unknown[] = JSON.parse(historyJson);
loadedMetadata['_conversationHistory'] = conversationHistory;
logger.info(
`Task ${taskId} conversation history restored from GCS (${conversationHistory.length} entries)`,
);
} catch (historyError) {
logger.warn(
`Task ${taskId} conversation history could not be restored:`,
historyError,
);
}
}
return {
id: taskId,
contextId: loadedMetadata._contextId || uuidv4(),