From f74491358445365fb545ccb28d74e222c626f614 Mon Sep 17 00:00:00 2001 From: Spencer Date: Thu, 9 Apr 2026 17:13:55 -0400 Subject: [PATCH] feat(core): migrate chat recording to JSONL streaming (#23749) --- .../src/ui/hooks/useSessionBrowser.test.ts | 15 +- .../cli/src/ui/hooks/useSessionBrowser.ts | 13 +- packages/cli/src/utils/sessionUtils.ts | 37 +- .../core/src/agents/local-executor.test.ts | 2 + packages/core/src/agents/local-executor.ts | 5 +- packages/core/src/config/storage.ts | 4 +- packages/core/src/core/client.test.ts | 4 + packages/core/src/core/client.ts | 4 +- packages/core/src/core/geminiChat.test.ts | 28 +- packages/core/src/core/geminiChat.ts | 9 +- .../src/services/chatRecordingService.test.ts | 360 ++++--- .../core/src/services/chatRecordingService.ts | 954 +++++++++--------- .../core/src/services/chatRecordingTypes.ts | 124 +++ packages/core/src/utils/sessionOperations.ts | 7 +- packages/sdk/src/agent.ts | 5 +- 15 files changed, 906 insertions(+), 665 deletions(-) create mode 100644 packages/core/src/services/chatRecordingTypes.ts diff --git a/packages/cli/src/ui/hooks/useSessionBrowser.test.ts b/packages/cli/src/ui/hooks/useSessionBrowser.test.ts index 6ef39b7a5d..cb4e3bd17d 100644 --- a/packages/cli/src/ui/hooks/useSessionBrowser.test.ts +++ b/packages/cli/src/ui/hooks/useSessionBrowser.test.ts @@ -11,7 +11,6 @@ import { useSessionBrowser, convertSessionToHistoryFormats, } from './useSessionBrowser.js'; -import * as fs from 'node:fs/promises'; import path from 'node:path'; import { getSessionFiles, type SessionInfo } from '../../utils/sessionUtils.js'; import { @@ -19,6 +18,7 @@ import { type ConversationRecord, type MessageRecord, CoreToolCallStatus, + loadConversationRecord, } from '@google/gemini-cli-core'; import { coreEvents, @@ -46,6 +46,7 @@ vi.mock('@google/gemini-cli-core', async (importOriginal) => { clear: vi.fn(), hydrate: vi.fn(), }, + loadConversationRecord: vi.fn(), }; }); @@ -55,7 +56,6 @@ const MOCKED_SESSION_ID = 'test-session-123'; const MOCKED_CURRENT_SESSION_ID = 'current-session-id'; describe('useSessionBrowser', () => { - const mockedFs = vi.mocked(fs); const mockedPath = vi.mocked(path); const mockedGetSessionFiles = vi.mocked(getSessionFiles); @@ -98,7 +98,7 @@ describe('useSessionBrowser', () => { fileName: MOCKED_FILENAME, } as SessionInfo; mockedGetSessionFiles.mockResolvedValue([mockSession]); - mockedFs.readFile.mockResolvedValue(JSON.stringify(mockConversation)); + vi.mocked(loadConversationRecord).mockResolvedValue(mockConversation); const { result } = await renderHook(() => useSessionBrowser(mockConfig, mockOnLoadHistory), @@ -107,9 +107,8 @@ describe('useSessionBrowser', () => { await act(async () => { await result.current.handleResumeSession(mockSession); }); - expect(mockedFs.readFile).toHaveBeenCalledWith( + expect(loadConversationRecord).toHaveBeenCalledWith( `${MOCKED_CHATS_DIR}/${MOCKED_FILENAME}`, - 'utf8', ); expect(mockConfig.setSessionId).toHaveBeenCalledWith( 'existing-session-456', @@ -125,7 +124,9 @@ describe('useSessionBrowser', () => { id: MOCKED_SESSION_ID, fileName: MOCKED_FILENAME, } as SessionInfo; - mockedFs.readFile.mockRejectedValue(new Error('File not found')); + vi.mocked(loadConversationRecord).mockRejectedValue( + new Error('File not found'), + ); const { result } = await renderHook(() => useSessionBrowser(mockConfig, mockOnLoadHistory), @@ -149,7 +150,7 @@ describe('useSessionBrowser', () => { id: MOCKED_SESSION_ID, fileName: MOCKED_FILENAME, } as SessionInfo; - mockedFs.readFile.mockResolvedValue('invalid json'); + vi.mocked(loadConversationRecord).mockResolvedValue(null); const { result } = await renderHook(() => useSessionBrowser(mockConfig, mockOnLoadHistory), diff --git a/packages/cli/src/ui/hooks/useSessionBrowser.ts b/packages/cli/src/ui/hooks/useSessionBrowser.ts index 4e86c2d92e..b42e1c5a72 100644 --- a/packages/cli/src/ui/hooks/useSessionBrowser.ts +++ b/packages/cli/src/ui/hooks/useSessionBrowser.ts @@ -6,14 +6,13 @@ import { useState, useCallback } from 'react'; import type { HistoryItemWithoutId } from '../types.js'; -import * as fs from 'node:fs/promises'; import path from 'node:path'; import { coreEvents, convertSessionToClientHistory, uiTelemetryService, + loadConversationRecord, type Config, - type ConversationRecord, type ResumedSessionData, } from '@google/gemini-cli-core'; import { @@ -61,10 +60,12 @@ export const useSessionBrowser = ( const originalFilePath = path.join(chatsDir, fileName); // Load up the conversation. - // eslint-disable-next-line @typescript-eslint/no-unsafe-assignment - const conversation: ConversationRecord = JSON.parse( - await fs.readFile(originalFilePath, 'utf8'), - ); + const conversation = await loadConversationRecord(originalFilePath); + if (!conversation) { + throw new Error( + `Failed to parse conversation from ${originalFilePath}`, + ); + } // Use the old session's ID to continue it. const existingSessionId = conversation.sessionId; diff --git a/packages/cli/src/utils/sessionUtils.ts b/packages/cli/src/utils/sessionUtils.ts index 6f72b20381..647ed77727 100644 --- a/packages/cli/src/utils/sessionUtils.ts +++ b/packages/cli/src/utils/sessionUtils.ts @@ -12,6 +12,7 @@ import { type Storage, type ConversationRecord, type MessageRecord, + loadConversationRecord, } from '@google/gemini-cli-core'; import * as fs from 'node:fs/promises'; import path from 'node:path'; @@ -250,23 +251,27 @@ export const getAllSessionFiles = async ( try { const files = await fs.readdir(chatsDir); const sessionFiles = files - .filter((f) => f.startsWith(SESSION_FILE_PREFIX) && f.endsWith('.json')) + .filter( + (f) => + f.startsWith(SESSION_FILE_PREFIX) && + (f.endsWith('.json') || f.endsWith('.jsonl')), + ) .sort(); // Sort by filename, which includes timestamp const sessionPromises = sessionFiles.map( async (file): Promise => { const filePath = path.join(chatsDir, file); try { - // eslint-disable-next-line @typescript-eslint/no-unsafe-assignment - const content: ConversationRecord = JSON.parse( - await fs.readFile(filePath, 'utf8'), - ); + const content = await loadConversationRecord(filePath, { + metadataOnly: !options.includeFullContent, + }); + if (!content) { + return { fileName: file, sessionInfo: null }; + } // Validate required fields if ( !content.sessionId || - !content.messages || - !Array.isArray(content.messages) || !content.startTime || !content.lastUpdated ) { @@ -275,7 +280,7 @@ export const getAllSessionFiles = async ( } // Skip sessions that only contain system messages (info, error, warning) - if (!hasUserOrAssistantMessage(content.messages)) { + if (!content.hasUserOrAssistantMessage) { return { fileName: file, sessionInfo: null }; } @@ -285,7 +290,9 @@ export const getAllSessionFiles = async ( return { fileName: file, sessionInfo: null }; } - const firstUserMessage = extractFirstUserMessage(content.messages); + const firstUserMessage = content.firstUserMessage + ? cleanMessage(content.firstUserMessage) + : extractFirstUserMessage(content.messages); const isCurrentSession = currentSessionId ? file.includes(currentSessionId.slice(0, 8)) : false; @@ -310,11 +317,11 @@ export const getAllSessionFiles = async ( const sessionInfo: SessionInfo = { id: content.sessionId, - file: file.replace('.json', ''), + file: file.replace(/\.jsonl?$/, ''), fileName: file, startTime: content.startTime, lastUpdated: content.lastUpdated, - messageCount: content.messages.length, + messageCount: content.messageCount ?? content.messages.length, displayName: content.summary ? stripUnsafeCharacters(content.summary) : firstUserMessage, @@ -505,10 +512,10 @@ export class SessionSelector { const sessionPath = path.join(chatsDir, sessionInfo.fileName); try { - // eslint-disable-next-line @typescript-eslint/no-unsafe-assignment - const sessionData: ConversationRecord = JSON.parse( - await fs.readFile(sessionPath, 'utf8'), - ); + const sessionData = await loadConversationRecord(sessionPath); + if (!sessionData) { + throw new Error('Failed to load session data'); + } const displayInfo = `Session ${sessionInfo.index}: ${sessionInfo.firstUserMessage} (${sessionInfo.messageCount} messages, ${formatRelativeTime(sessionInfo.lastUpdated)})`; diff --git a/packages/core/src/agents/local-executor.test.ts b/packages/core/src/agents/local-executor.test.ts index c824344cfe..26f0cc88e3 100644 --- a/packages/core/src/agents/local-executor.test.ts +++ b/packages/core/src/agents/local-executor.test.ts @@ -141,6 +141,7 @@ vi.mock('../core/geminiChat.js', () => ({ CHUNK: 'chunk', }, GeminiChat: vi.fn().mockImplementation(() => ({ + initialize: vi.fn(), sendMessageStream: mockSendMessageStream, getHistory: vi.fn((_curated?: boolean) => [...mockChatHistory]), setHistory: mockSetHistory, @@ -434,6 +435,7 @@ describe('LocalAgentExecutor', () => { MockedGeminiChat.mockImplementation( () => ({ + initialize: vi.fn(), sendMessageStream: mockSendMessageStream, setSystemInstruction: mockSetSystemInstruction, getHistory: vi.fn((_curated?: boolean) => [...mockChatHistory]), diff --git a/packages/core/src/agents/local-executor.ts b/packages/core/src/agents/local-executor.ts index f257018b1b..cfae92c870 100644 --- a/packages/core/src/agents/local-executor.ts +++ b/packages/core/src/agents/local-executor.ts @@ -1021,15 +1021,16 @@ export class LocalAgentExecutor { : undefined; try { - return new GeminiChat( + const chat = new GeminiChat( this.executionContext, systemInstruction, [{ functionDeclarations: tools }], startHistory, undefined, undefined, - 'subagent', ); + await chat.initialize(undefined, 'subagent'); + return chat; } catch (e: unknown) { await reportError( e, diff --git a/packages/core/src/config/storage.ts b/packages/core/src/config/storage.ts index 7d476f8135..d49e027369 100644 --- a/packages/core/src/config/storage.ts +++ b/packages/core/src/config/storage.ts @@ -353,7 +353,9 @@ export class Storage { const chatsDir = path.join(this.getProjectTempDir(), 'chats'); try { const files = await fs.promises.readdir(chatsDir); - const jsonFiles = files.filter((f) => f.endsWith('.json')); + const jsonFiles = files.filter( + (f) => f.endsWith('.json') || f.endsWith('.jsonl'), + ); const sessions = await Promise.all( jsonFiles.map(async (file) => { diff --git a/packages/core/src/core/client.test.ts b/packages/core/src/core/client.test.ts index f8178488bd..e28ea9cfa4 100644 --- a/packages/core/src/core/client.test.ts +++ b/packages/core/src/core/client.test.ts @@ -63,6 +63,10 @@ vi.mock('node:fs', () => { writeFileSync: vi.fn((path: string, data: string) => { mockFileSystem.set(path, data); }), + appendFileSync: vi.fn((path: string, data: string) => { + const current = mockFileSystem.get(path) || ''; + mockFileSystem.set(path, current + data); + }), readFileSync: vi.fn((path: string) => { if (mockFileSystem.has(path)) { return mockFileSystem.get(path); diff --git a/packages/core/src/core/client.ts b/packages/core/src/core/client.ts index 491758049d..25509862fb 100644 --- a/packages/core/src/core/client.ts +++ b/packages/core/src/core/client.ts @@ -378,7 +378,7 @@ export class GeminiClient { try { const systemMemory = this.config.getSystemInstructionMemory(); const systemInstruction = getCoreSystemPrompt(this.config, systemMemory); - return new GeminiChat( + const chat = new GeminiChat( this.config, systemInstruction, tools, @@ -392,6 +392,8 @@ export class GeminiClient { return [{ functionDeclarations: toolDeclarations }]; }, ); + await chat.initialize(resumedSessionData, 'main'); + return chat; } catch (error) { await reportError( error, diff --git a/packages/core/src/core/geminiChat.test.ts b/packages/core/src/core/geminiChat.test.ts index e822fd7fd6..d4a3f40aad 100644 --- a/packages/core/src/core/geminiChat.test.ts +++ b/packages/core/src/core/geminiChat.test.ts @@ -48,6 +48,10 @@ vi.mock('node:fs', () => { writeFileSync: vi.fn((path: string, data: string) => { mockFileSystem.set(path, data); }), + appendFileSync: vi.fn((path: string, data: string) => { + const current = mockFileSystem.get(path) || ''; + mockFileSystem.set(path, current + data); + }), readFileSync: vi.fn((path: string) => { if (mockFileSystem.has(path)) { return mockFileSystem.get(path); @@ -1082,8 +1086,10 @@ describe('GeminiChat', () => { ); const { default: fs } = await import('node:fs'); - const writeFileSync = vi.mocked(fs.writeFileSync); - const writeCountBefore = writeFileSync.mock.calls.length; + const appendFileSync = vi.mocked(fs.appendFileSync); + const writeCountBefore = appendFileSync.mock.calls.length; + + await chat.initialize(); const stream = await chat.sendMessageStream( { model: 'test-model' }, @@ -1096,17 +1102,19 @@ describe('GeminiChat', () => { // consume } - const newWrites = writeFileSync.mock.calls.slice(writeCountBefore); + const newWrites = appendFileSync.mock.calls.slice(writeCountBefore); expect(newWrites.length).toBeGreaterThan(0); - const lastWriteData = JSON.parse( - newWrites[newWrites.length - 1][1] as string, - ) as { messages: Array<{ type: string }> }; + const geminiWrite = newWrites.find((w) => { + try { + const data = JSON.parse(w[1] as string); + return data.type === 'gemini'; + } catch { + return false; + } + }); - const geminiMessages = lastWriteData.messages.filter( - (m) => m.type === 'gemini', - ); - expect(geminiMessages.length).toBeGreaterThan(0); + expect(geminiWrite).toBeDefined(); }); }); diff --git a/packages/core/src/core/geminiChat.ts b/packages/core/src/core/geminiChat.ts index b0efc9e1e4..f5ee37e565 100644 --- a/packages/core/src/core/geminiChat.ts +++ b/packages/core/src/core/geminiChat.ts @@ -256,16 +256,21 @@ export class GeminiChat { private history: Content[] = [], resumedSessionData?: ResumedSessionData, private readonly onModelChanged?: (modelId: string) => Promise, - kind: 'main' | 'subagent' = 'main', ) { validateHistory(history); this.chatRecordingService = new ChatRecordingService(context); - this.chatRecordingService.initialize(resumedSessionData, kind); this.lastPromptTokenCount = estimateTokenCountSync( this.history.flatMap((c) => c.parts || []), ); } + async initialize( + resumedSessionData?: ResumedSessionData, + kind: 'main' | 'subagent' = 'main', + ) { + await this.chatRecordingService.initialize(resumedSessionData, kind); + } + setSystemInstruction(sysInstr: string) { this.systemInstruction = sysInstr; } diff --git a/packages/core/src/services/chatRecordingService.test.ts b/packages/core/src/services/chatRecordingService.test.ts index d542b8c7cb..22ba6c2c03 100644 --- a/packages/core/src/services/chatRecordingService.test.ts +++ b/packages/core/src/services/chatRecordingService.test.ts @@ -5,11 +5,42 @@ */ import { expect, it, describe, vi, beforeEach, afterEach } from 'vitest'; -import fs from 'node:fs'; +import * as fs from 'node:fs'; import path from 'node:path'; import os from 'node:os'; + +vi.mock('node:fs', async (importOriginal) => { + const actual = await importOriginal(); + const fsModule = { + ...actual, + mkdirSync: vi.fn(actual.mkdirSync), + appendFileSync: vi.fn(actual.appendFileSync), + writeFileSync: vi.fn(actual.writeFileSync), + readFileSync: vi.fn(actual.readFileSync), + unlinkSync: vi.fn(actual.unlinkSync), + existsSync: vi.fn(actual.existsSync), + readdirSync: vi.fn(actual.readdirSync), + promises: { + ...actual.promises, + stat: vi.fn(actual.promises.stat), + readFile: vi.fn(actual.promises.readFile), + unlink: vi.fn(actual.promises.unlink), + readdir: vi.fn(actual.promises.readdir), + open: vi.fn(actual.promises.open), + rm: vi.fn(actual.promises.rm), + mkdir: vi.fn(actual.promises.mkdir), + writeFile: vi.fn(actual.promises.writeFile), + }, + }; + return { + ...fsModule, + default: fsModule, + }; +}); + import { ChatRecordingService, + loadConversationRecord, type ConversationRecord, type ToolCallRecord, type MessageRecord, @@ -21,9 +52,11 @@ import type { Config } from '../config/config.js'; import { getProjectHash } from '../utils/paths.js'; vi.mock('../utils/paths.js'); -vi.mock('node:crypto', () => { +vi.mock('node:crypto', async (importOriginal) => { + const actual = await importOriginal(); let count = 0; return { + ...actual, randomUUID: vi.fn(() => `test-uuid-${count++}`), createHash: vi.fn(() => ({ update: vi.fn(() => ({ @@ -38,6 +71,9 @@ describe('ChatRecordingService', () => { let mockConfig: Config; let testTempDir: string; + afterEach(() => { + vi.restoreAllMocks(); + }); beforeEach(async () => { testTempDir = await fs.promises.mkdtemp( path.join(os.tmpdir(), 'chat-recording-test-'), @@ -89,8 +125,8 @@ describe('ChatRecordingService', () => { }); describe('initialize', () => { - it('should create a new session if none is provided', () => { - chatRecordingService.initialize(); + it('should create a new session if none is provided', async () => { + await chatRecordingService.initialize(); chatRecordingService.recordMessage({ type: 'user', content: 'ping', @@ -101,11 +137,11 @@ describe('ChatRecordingService', () => { expect(fs.existsSync(chatsDir)).toBe(true); const files = fs.readdirSync(chatsDir); expect(files.length).toBeGreaterThan(0); - expect(files[0]).toMatch(/^session-.*-test-ses\.json$/); + expect(files[0]).toMatch(/^session-.*-test-ses\.jsonl$/); }); - it('should include the conversation kind when specified', () => { - chatRecordingService.initialize(undefined, 'subagent'); + it('should include the conversation kind when specified', async () => { + await chatRecordingService.initialize(undefined, 'subagent'); chatRecordingService.recordMessage({ type: 'user', content: 'ping', @@ -113,13 +149,13 @@ describe('ChatRecordingService', () => { }); const sessionFile = chatRecordingService.getConversationFilePath()!; - const conversation = JSON.parse( - fs.readFileSync(sessionFile, 'utf8'), - ) as ConversationRecord; + const conversation = (await loadConversationRecord( + sessionFile, + )) as ConversationRecord; expect(conversation.kind).toBe('subagent'); }); - it('should create a subdirectory for subagents if parentSessionId is present', () => { + it('should create a subdirectory for subagents if parentSessionId is present', async () => { const parentSessionId = 'test-parent-uuid'; Object.defineProperty(mockConfig, 'parentSessionId', { value: parentSessionId, @@ -127,7 +163,7 @@ describe('ChatRecordingService', () => { configurable: true, }); - chatRecordingService.initialize(undefined, 'subagent'); + await chatRecordingService.initialize(undefined, 'subagent'); chatRecordingService.recordMessage({ type: 'user', content: 'ping', @@ -140,19 +176,19 @@ describe('ChatRecordingService', () => { const files = fs.readdirSync(subagentDir); expect(files.length).toBeGreaterThan(0); - expect(files[0]).toBe('test-session-id.json'); + expect(files[0]).toBe('test-session-id.jsonl'); }); - it('should inherit workspace directories for subagents during initialization', () => { + it('should inherit workspace directories for subagents during initialization', async () => { const mockDirectories = ['/project/dir1', '/project/dir2']; vi.mocked(mockConfig.getWorkspaceContext).mockReturnValue({ getDirectories: vi.fn().mockReturnValue(mockDirectories), } as unknown as WorkspaceContext); // Initialize as a subagent - chatRecordingService.initialize(undefined, 'subagent'); + await chatRecordingService.initialize(undefined, 'subagent'); - // Recording a message triggers the disk write (deferred until then) + // Recording a message triggers the disk write chatRecordingService.recordMessage({ type: 'user', content: 'ping', @@ -160,43 +196,53 @@ describe('ChatRecordingService', () => { }); const sessionFile = chatRecordingService.getConversationFilePath()!; - const conversation = JSON.parse( - fs.readFileSync(sessionFile, 'utf8'), - ) as ConversationRecord; + const conversation = (await loadConversationRecord( + sessionFile, + )) as ConversationRecord; expect(conversation.kind).toBe('subagent'); expect(conversation.directories).toEqual(mockDirectories); }); - it('should resume from an existing session if provided', () => { + it('should resume from an existing session if provided', async () => { const chatsDir = path.join(testTempDir, 'chats'); fs.mkdirSync(chatsDir, { recursive: true }); - const sessionFile = path.join(chatsDir, 'session.json'); + const sessionFile = path.join(chatsDir, 'session.jsonl'); const initialData = { sessionId: 'old-session-id', projectHash: 'test-project-hash', messages: [], }; - fs.writeFileSync(sessionFile, JSON.stringify(initialData)); + fs.writeFileSync( + sessionFile, + JSON.stringify({ ...initialData, messages: undefined }) + + '\n' + + (initialData.messages || []) + .map((m: unknown) => JSON.stringify(m)) + .join('\n') + + '\n', + ); - chatRecordingService.initialize({ + await chatRecordingService.initialize({ filePath: sessionFile, conversation: { sessionId: 'old-session-id', } as ConversationRecord, }); - const conversation = JSON.parse(fs.readFileSync(sessionFile, 'utf8')); + const conversation = (await loadConversationRecord( + sessionFile, + )) as ConversationRecord; expect(conversation.sessionId).toBe('old-session-id'); }); }); describe('recordMessage', () => { - beforeEach(() => { - chatRecordingService.initialize(); + beforeEach(async () => { + await chatRecordingService.initialize(); }); - it('should record a new message', () => { + it('should record a new message', async () => { chatRecordingService.recordMessage({ type: 'user', content: 'Hello', @@ -205,9 +251,9 @@ describe('ChatRecordingService', () => { }); const sessionFile = chatRecordingService.getConversationFilePath()!; - const conversation = JSON.parse( - fs.readFileSync(sessionFile, 'utf8'), - ) as ConversationRecord; + const conversation = (await loadConversationRecord( + sessionFile, + )) as ConversationRecord; expect(conversation.messages).toHaveLength(1); expect(conversation.messages[0].content).toBe('Hello'); @@ -215,7 +261,7 @@ describe('ChatRecordingService', () => { expect(conversation.messages[0].type).toBe('user'); }); - it('should create separate messages when recording multiple messages', () => { + it('should create separate messages when recording multiple messages', async () => { chatRecordingService.recordMessage({ type: 'user', content: 'World', @@ -223,17 +269,17 @@ describe('ChatRecordingService', () => { }); const sessionFile = chatRecordingService.getConversationFilePath()!; - const conversation = JSON.parse( - fs.readFileSync(sessionFile, 'utf8'), - ) as ConversationRecord; + const conversation = (await loadConversationRecord( + sessionFile, + )) as ConversationRecord; expect(conversation.messages).toHaveLength(1); expect(conversation.messages[0].content).toBe('World'); }); }); describe('recordThought', () => { - it('should queue a thought', () => { - chatRecordingService.initialize(); + it('should queue a thought', async () => { + await chatRecordingService.initialize(); chatRecordingService.recordThought({ subject: 'Thinking', description: 'Thinking...', @@ -246,11 +292,11 @@ describe('ChatRecordingService', () => { }); describe('recordMessageTokens', () => { - beforeEach(() => { - chatRecordingService.initialize(); + beforeEach(async () => { + await chatRecordingService.initialize(); }); - it('should update the last message with token info', () => { + it('should update the last message with token info', async () => { chatRecordingService.recordMessage({ type: 'gemini', content: 'Response', @@ -265,9 +311,9 @@ describe('ChatRecordingService', () => { }); const sessionFile = chatRecordingService.getConversationFilePath()!; - const conversation = JSON.parse( - fs.readFileSync(sessionFile, 'utf8'), - ) as ConversationRecord; + const conversation = (await loadConversationRecord( + sessionFile, + )) as ConversationRecord; const geminiMsg = conversation.messages[0] as MessageRecord & { type: 'gemini'; }; @@ -281,7 +327,7 @@ describe('ChatRecordingService', () => { }); }); - it('should queue token info if the last message already has tokens', () => { + it('should queue token info if the last message already has tokens', async () => { chatRecordingService.recordMessage({ type: 'gemini', content: 'Response', @@ -313,11 +359,11 @@ describe('ChatRecordingService', () => { }); }); - it('should not write to disk when queuing tokens (no last gemini message)', () => { - const writeFileSyncSpy = vi.spyOn(fs, 'writeFileSync'); + it('should not write to disk when queuing tokens (no last gemini message)', async () => { + const appendFileSyncSpy = vi.mocked(fs.appendFileSync); // Clear spy call count after initialize writes the initial file - writeFileSyncSpy.mockClear(); + appendFileSyncSpy.mockClear(); // No gemini message recorded yet, so tokens should only be queued chatRecordingService.recordMessageTokens({ @@ -328,7 +374,7 @@ describe('ChatRecordingService', () => { }); // writeFileSync should NOT have been called since we only queued - expect(writeFileSyncSpy).not.toHaveBeenCalled(); + expect(appendFileSyncSpy).not.toHaveBeenCalled(); // @ts-expect-error private property expect(chatRecordingService.queuedTokens).toEqual({ @@ -339,11 +385,9 @@ describe('ChatRecordingService', () => { thoughts: 0, tool: 0, }); - - writeFileSyncSpy.mockRestore(); }); - it('should not write to disk when queuing tokens (last message already has tokens)', () => { + it('should not write to disk when queuing tokens (last message already has tokens)', async () => { chatRecordingService.recordMessage({ type: 'gemini', content: 'Response', @@ -358,8 +402,8 @@ describe('ChatRecordingService', () => { cachedContentTokenCount: 0, }); - const writeFileSyncSpy = vi.spyOn(fs, 'writeFileSync'); - writeFileSyncSpy.mockClear(); + const appendFileSyncSpy = vi.mocked(fs.appendFileSync); + appendFileSyncSpy.mockClear(); // Second call should only queue, NOT write to disk chatRecordingService.recordMessageTokens({ @@ -369,18 +413,17 @@ describe('ChatRecordingService', () => { cachedContentTokenCount: 0, }); - expect(writeFileSyncSpy).not.toHaveBeenCalled(); - writeFileSyncSpy.mockRestore(); + expect(appendFileSyncSpy).not.toHaveBeenCalled(); }); - it('should use in-memory cache and not re-read from disk on subsequent operations', () => { + it('should use in-memory cache and not re-read from disk on subsequent operations', async () => { chatRecordingService.recordMessage({ type: 'gemini', content: 'Response', model: 'gemini-pro', }); - const readFileSyncSpy = vi.spyOn(fs, 'readFileSync'); + const readFileSyncSpy = vi.mocked(fs.readFileSync); readFileSyncSpy.mockClear(); // These operations should all use the in-memory cache @@ -401,16 +444,15 @@ describe('ChatRecordingService', () => { // readFileSync should NOT have been called since we use the in-memory cache expect(readFileSyncSpy).not.toHaveBeenCalled(); - readFileSyncSpy.mockRestore(); }); }); describe('recordToolCalls', () => { - beforeEach(() => { - chatRecordingService.initialize(); + beforeEach(async () => { + await chatRecordingService.initialize(); }); - it('should add new tool calls to the last message', () => { + it('should add new tool calls to the last message', async () => { chatRecordingService.recordMessage({ type: 'gemini', content: '', @@ -427,9 +469,9 @@ describe('ChatRecordingService', () => { chatRecordingService.recordToolCalls('gemini-pro', [toolCall]); const sessionFile = chatRecordingService.getConversationFilePath()!; - const conversation = JSON.parse( - fs.readFileSync(sessionFile, 'utf8'), - ) as ConversationRecord; + const conversation = (await loadConversationRecord( + sessionFile, + )) as ConversationRecord; const geminiMsg = conversation.messages[0] as MessageRecord & { type: 'gemini'; }; @@ -437,7 +479,7 @@ describe('ChatRecordingService', () => { expect(geminiMsg.toolCalls![0].name).toBe('testTool'); }); - it('should preserve dynamic description and NOT overwrite with generic one', () => { + it('should preserve dynamic description and NOT overwrite with generic one', async () => { chatRecordingService.recordMessage({ type: 'gemini', content: '', @@ -457,9 +499,9 @@ describe('ChatRecordingService', () => { chatRecordingService.recordToolCalls('gemini-pro', [toolCall]); const sessionFile = chatRecordingService.getConversationFilePath()!; - const conversation = JSON.parse( - fs.readFileSync(sessionFile, 'utf8'), - ) as ConversationRecord; + const conversation = (await loadConversationRecord( + sessionFile, + )) as ConversationRecord; const geminiMsg = conversation.messages[0] as MessageRecord & { type: 'gemini'; }; @@ -467,7 +509,7 @@ describe('ChatRecordingService', () => { expect(geminiMsg.toolCalls![0].description).toBe(dynamicDescription); }); - it('should create a new message if the last message is not from gemini', () => { + it('should create a new message if the last message is not from gemini', async () => { chatRecordingService.recordMessage({ type: 'user', content: 'call a tool', @@ -484,9 +526,9 @@ describe('ChatRecordingService', () => { chatRecordingService.recordToolCalls('gemini-pro', [toolCall]); const sessionFile = chatRecordingService.getConversationFilePath()!; - const conversation = JSON.parse( - fs.readFileSync(sessionFile, 'utf8'), - ) as ConversationRecord; + const conversation = (await loadConversationRecord( + sessionFile, + )) as ConversationRecord; expect(conversation.messages).toHaveLength(2); expect(conversation.messages[1].type).toBe('gemini'); expect( @@ -513,9 +555,9 @@ describe('ChatRecordingService', () => { // Create main session file with timestamp const sessionFile = path.join( chatsDir, - `session-2023-01-01T00-00-${shortId}.json`, + `session-2023-01-01T00-00-${shortId}.jsonl`, ); - fs.writeFileSync(sessionFile, JSON.stringify({ sessionId })); + fs.writeFileSync(sessionFile, JSON.stringify({ sessionId }) + '\n'); const logFile = path.join(logsDir, `session-${sessionId}.jsonl`); fs.writeFileSync(logFile, '{}'); @@ -547,20 +589,21 @@ describe('ChatRecordingService', () => { // Create parent session file const parentFile = path.join( chatsDir, - `session-2023-01-01T00-00-${shortId}.json`, + `session-2023-01-01T00-00-${shortId}.jsonl`, ); fs.writeFileSync( parentFile, - JSON.stringify({ sessionId: parentSessionId }), + JSON.stringify({ sessionId: parentSessionId }) + '\n', ); // Create subagent session file in subdirectory const subagentDir = path.join(chatsDir, parentSessionId); fs.mkdirSync(subagentDir, { recursive: true }); - const subagentFile = path.join(subagentDir, `${subagentSessionId}.json`); + const subagentFile = path.join(subagentDir, `${subagentSessionId}.jsonl`); fs.writeFileSync( subagentFile, - JSON.stringify({ sessionId: subagentSessionId, kind: 'subagent' }), + JSON.stringify({ sessionId: subagentSessionId, kind: 'subagent' }) + + '\n', ); // Create logs for both @@ -609,21 +652,22 @@ describe('ChatRecordingService', () => { // Create parent session file const parentFile = path.join( chatsDir, - `session-2023-01-01T00-00-${shortId}.json`, + `session-2023-01-01T00-00-${shortId}.jsonl`, ); fs.writeFileSync( parentFile, - JSON.stringify({ sessionId: parentSessionId }), + JSON.stringify({ sessionId: parentSessionId }) + '\n', ); // Create legacy subagent session file (flat in chatsDir) const subagentFile = path.join( chatsDir, - `session-2023-01-01T00-01-${shortId}.json`, + `session-2023-01-01T00-01-${shortId}.jsonl`, ); fs.writeFileSync( subagentFile, - JSON.stringify({ sessionId: subagentSessionId, kind: 'subagent' }), + JSON.stringify({ sessionId: subagentSessionId, kind: 'subagent' }) + + '\n', ); // Call with parent sessionId @@ -643,8 +687,8 @@ describe('ChatRecordingService', () => { fs.mkdirSync(logsDir, { recursive: true }); const basename = `session-2023-01-01T00-00-${shortId}`; - const sessionFile = path.join(chatsDir, `${basename}.json`); - fs.writeFileSync(sessionFile, JSON.stringify({ sessionId })); + const sessionFile = path.join(chatsDir, `${basename}.jsonl`); + fs.writeFileSync(sessionFile, JSON.stringify({ sessionId }) + '\n'); const logFile = path.join(logsDir, `session-${sessionId}.jsonl`); fs.writeFileSync(logFile, '{}'); @@ -664,11 +708,11 @@ describe('ChatRecordingService', () => { }); describe('recordDirectories', () => { - beforeEach(() => { - chatRecordingService.initialize(); + beforeEach(async () => { + await chatRecordingService.initialize(); }); - it('should save directories to the conversation', () => { + it('should save directories to the conversation', async () => { chatRecordingService.recordMessage({ type: 'user', content: 'ping', @@ -680,16 +724,16 @@ describe('ChatRecordingService', () => { ]); const sessionFile = chatRecordingService.getConversationFilePath()!; - const conversation = JSON.parse( - fs.readFileSync(sessionFile, 'utf8'), - ) as ConversationRecord; + const conversation = (await loadConversationRecord( + sessionFile, + )) as ConversationRecord; expect(conversation.directories).toEqual([ '/path/to/dir1', '/path/to/dir2', ]); }); - it('should overwrite existing directories', () => { + it('should overwrite existing directories', async () => { chatRecordingService.recordMessage({ type: 'user', content: 'ping', @@ -699,16 +743,16 @@ describe('ChatRecordingService', () => { chatRecordingService.recordDirectories(['/new/dir1', '/new/dir2']); const sessionFile = chatRecordingService.getConversationFilePath()!; - const conversation = JSON.parse( - fs.readFileSync(sessionFile, 'utf8'), - ) as ConversationRecord; + const conversation = (await loadConversationRecord( + sessionFile, + )) as ConversationRecord; expect(conversation.directories).toEqual(['/new/dir1', '/new/dir2']); }); }); describe('rewindTo', () => { - it('should rewind the conversation to a specific message ID', () => { - chatRecordingService.initialize(); + it('should rewind the conversation to a specific message ID', async () => { + await chatRecordingService.initialize(); // Record some messages chatRecordingService.recordMessage({ type: 'user', @@ -727,9 +771,9 @@ describe('ChatRecordingService', () => { }); const sessionFile = chatRecordingService.getConversationFilePath()!; - let conversation = JSON.parse( - fs.readFileSync(sessionFile, 'utf8'), - ) as ConversationRecord; + let conversation = (await loadConversationRecord( + sessionFile, + )) as ConversationRecord; const secondMsgId = conversation.messages[1].id; const result = chatRecordingService.rewindTo(secondMsgId); @@ -738,14 +782,14 @@ describe('ChatRecordingService', () => { expect(result!.messages).toHaveLength(1); expect(result!.messages[0].content).toBe('msg1'); - conversation = JSON.parse( - fs.readFileSync(sessionFile, 'utf8'), - ) as ConversationRecord; + conversation = (await loadConversationRecord( + sessionFile, + )) as ConversationRecord; expect(conversation.messages).toHaveLength(1); }); - it('should return the original conversation if the message ID is not found', () => { - chatRecordingService.initialize(); + it('should return the original conversation if the message ID is not found', async () => { + await chatRecordingService.initialize(); chatRecordingService.recordMessage({ type: 'user', content: 'msg1', @@ -760,33 +804,31 @@ describe('ChatRecordingService', () => { }); describe('ENOSPC (disk full) graceful degradation - issue #16266', () => { - it('should disable recording and not throw when ENOSPC occurs during initialize', () => { + it('should disable recording and not throw when ENOSPC occurs during initialize', async () => { const enospcError = new Error('ENOSPC: no space left on device'); (enospcError as NodeJS.ErrnoException).code = 'ENOSPC'; - const mkdirSyncSpy = vi.spyOn(fs, 'mkdirSync').mockImplementation(() => { + const mkdirSyncSpy = vi.mocked(fs.mkdirSync).mockImplementation(() => { throw enospcError; }); // Should not throw - expect(() => chatRecordingService.initialize()).not.toThrow(); + await expect(chatRecordingService.initialize()).resolves.not.toThrow(); // Recording should be disabled (conversationFile set to null) expect(chatRecordingService.getConversationFilePath()).toBeNull(); mkdirSyncSpy.mockRestore(); }); - it('should disable recording and not throw when ENOSPC occurs during writeConversation', () => { - chatRecordingService.initialize(); + it('should disable recording and not throw when ENOSPC occurs during writeConversation', async () => { + await chatRecordingService.initialize(); const enospcError = new Error('ENOSPC: no space left on device'); (enospcError as NodeJS.ErrnoException).code = 'ENOSPC'; - const writeFileSyncSpy = vi - .spyOn(fs, 'writeFileSync') - .mockImplementation(() => { - throw enospcError; - }); + vi.mocked(fs.appendFileSync).mockImplementation(() => { + throw enospcError; + }); // Should not throw when recording a message expect(() => @@ -799,17 +841,16 @@ describe('ChatRecordingService', () => { // Recording should be disabled (conversationFile set to null) expect(chatRecordingService.getConversationFilePath()).toBeNull(); - writeFileSyncSpy.mockRestore(); }); - it('should skip recording operations when recording is disabled', () => { - chatRecordingService.initialize(); + it('should skip recording operations when recording is disabled', async () => { + await chatRecordingService.initialize(); const enospcError = new Error('ENOSPC: no space left on device'); (enospcError as NodeJS.ErrnoException).code = 'ENOSPC'; - const writeFileSyncSpy = vi - .spyOn(fs, 'writeFileSync') + const appendFileSyncSpy = vi + .mocked(fs.appendFileSync) .mockImplementationOnce(() => { throw enospcError; }); @@ -821,7 +862,7 @@ describe('ChatRecordingService', () => { }); // Reset mock to track subsequent calls - writeFileSyncSpy.mockClear(); + appendFileSyncSpy.mockClear(); // Subsequent calls should be no-ops (not call writeFileSync) chatRecordingService.recordMessage({ @@ -838,21 +879,18 @@ describe('ChatRecordingService', () => { chatRecordingService.saveSummary('Test summary'); // writeFileSync should not have been called for any of these - expect(writeFileSyncSpy).not.toHaveBeenCalled(); - writeFileSyncSpy.mockRestore(); + expect(appendFileSyncSpy).not.toHaveBeenCalled(); }); - it('should return null from getConversation when recording is disabled', () => { - chatRecordingService.initialize(); + it('should return null from getConversation when recording is disabled', async () => { + await chatRecordingService.initialize(); const enospcError = new Error('ENOSPC: no space left on device'); (enospcError as NodeJS.ErrnoException).code = 'ENOSPC'; - const writeFileSyncSpy = vi - .spyOn(fs, 'writeFileSync') - .mockImplementation(() => { - throw enospcError; - }); + vi.mocked(fs.appendFileSync).mockImplementation(() => { + throw enospcError; + }); // Trigger ENOSPC chatRecordingService.recordMessage({ @@ -864,20 +902,17 @@ describe('ChatRecordingService', () => { // getConversation should return null when disabled expect(chatRecordingService.getConversation()).toBeNull(); expect(chatRecordingService.getConversationFilePath()).toBeNull(); - writeFileSyncSpy.mockRestore(); }); - it('should still throw for non-ENOSPC errors', () => { - chatRecordingService.initialize(); + it('should still throw for non-ENOSPC errors', async () => { + await chatRecordingService.initialize(); const otherError = new Error('Permission denied'); (otherError as NodeJS.ErrnoException).code = 'EACCES'; - const writeFileSyncSpy = vi - .spyOn(fs, 'writeFileSync') - .mockImplementation(() => { - throw otherError; - }); + vi.mocked(fs.appendFileSync).mockImplementation(() => { + throw otherError; + }); // Should throw for non-ENOSPC errors expect(() => @@ -890,16 +925,15 @@ describe('ChatRecordingService', () => { // Recording should NOT be disabled for non-ENOSPC errors (file path still exists) expect(chatRecordingService.getConversationFilePath()).not.toBeNull(); - writeFileSyncSpy.mockRestore(); }); }); describe('updateMessagesFromHistory', () => { - beforeEach(() => { - chatRecordingService.initialize(); + beforeEach(async () => { + await chatRecordingService.initialize(); }); - it('should update tool results from API history (masking sync)', () => { + it('should update tool results from API history (masking sync)', async () => { // 1. Record an initial message and tool call chatRecordingService.recordMessage({ type: 'gemini', @@ -949,9 +983,9 @@ describe('ChatRecordingService', () => { // 4. Verify disk content const sessionFile = chatRecordingService.getConversationFilePath()!; - const conversation = JSON.parse( - fs.readFileSync(sessionFile, 'utf8'), - ) as ConversationRecord; + const conversation = (await loadConversationRecord( + sessionFile, + )) as ConversationRecord; const geminiMsg = conversation.messages[0]; if (geminiMsg.type !== 'gemini') @@ -968,8 +1002,8 @@ describe('ChatRecordingService', () => { output: maskedSnippet, }); }); - it('should preserve multi-modal sibling parts during sync', () => { - chatRecordingService.initialize(); + it('should preserve multi-modal sibling parts during sync', async () => { + await chatRecordingService.initialize(); const callId = 'multi-modal-call'; const originalResult: Part[] = [ { @@ -1019,9 +1053,9 @@ describe('ChatRecordingService', () => { chatRecordingService.updateMessagesFromHistory(history); const sessionFile = chatRecordingService.getConversationFilePath()!; - const conversation = JSON.parse( - fs.readFileSync(sessionFile, 'utf8'), - ) as ConversationRecord; + const conversation = (await loadConversationRecord( + sessionFile, + )) as ConversationRecord; const lastMsg = conversation.messages[0] as MessageRecord & { type: 'gemini'; @@ -1035,8 +1069,8 @@ describe('ChatRecordingService', () => { expect(result[1].inlineData!.mimeType).toBe('image/png'); }); - it('should handle parts appearing BEFORE the functionResponse in a content block', () => { - chatRecordingService.initialize(); + it('should handle parts appearing BEFORE the functionResponse in a content block', async () => { + await chatRecordingService.initialize(); const callId = 'prefix-part-call'; chatRecordingService.recordMessage({ @@ -1075,9 +1109,9 @@ describe('ChatRecordingService', () => { chatRecordingService.updateMessagesFromHistory(history); const sessionFile = chatRecordingService.getConversationFilePath()!; - const conversation = JSON.parse( - fs.readFileSync(sessionFile, 'utf8'), - ) as ConversationRecord; + const conversation = (await loadConversationRecord( + sessionFile, + )) as ConversationRecord; const lastMsg = conversation.messages[0] as MessageRecord & { type: 'gemini'; @@ -1088,15 +1122,15 @@ describe('ChatRecordingService', () => { expect(result[1].functionResponse!.id).toBe(callId); }); - it('should not write to disk when no tool calls match', () => { + it('should not write to disk when no tool calls match', async () => { chatRecordingService.recordMessage({ type: 'gemini', content: 'Response with no tool calls', model: 'gemini-pro', }); - const writeFileSyncSpy = vi.spyOn(fs, 'writeFileSync'); - writeFileSyncSpy.mockClear(); + const appendFileSyncSpy = vi.mocked(fs.appendFileSync); + appendFileSyncSpy.mockClear(); // History with a tool call ID that doesn't exist in the conversation const history: Content[] = [ @@ -1117,17 +1151,16 @@ describe('ChatRecordingService', () => { chatRecordingService.updateMessagesFromHistory(history); // No tool calls matched, so writeFileSync should NOT have been called - expect(writeFileSyncSpy).not.toHaveBeenCalled(); - writeFileSyncSpy.mockRestore(); + expect(appendFileSyncSpy).not.toHaveBeenCalled(); }); }); describe('ENOENT (missing directory) handling', () => { - it('should ensure directory exists before writing conversation file', () => { - chatRecordingService.initialize(); + it('should ensure directory exists before writing conversation file', async () => { + await chatRecordingService.initialize(); - const mkdirSyncSpy = vi.spyOn(fs, 'mkdirSync'); - const writeFileSyncSpy = vi.spyOn(fs, 'writeFileSync'); + const mkdirSyncSpy = vi.mocked(fs.mkdirSync); + const appendFileSyncSpy = vi.mocked(fs.appendFileSync); chatRecordingService.recordMessage({ type: 'user', @@ -1144,13 +1177,12 @@ describe('ChatRecordingService', () => { // mkdirSync should be called before writeFileSync const mkdirCallOrder = mkdirSyncSpy.mock.invocationCallOrder; - const writeCallOrder = writeFileSyncSpy.mock.invocationCallOrder; + const writeCallOrder = appendFileSyncSpy.mock.invocationCallOrder; const lastMkdir = mkdirCallOrder[mkdirCallOrder.length - 1]; const lastWrite = writeCallOrder[writeCallOrder.length - 1]; expect(lastMkdir).toBeLessThan(lastWrite); mkdirSyncSpy.mockRestore(); - writeFileSyncSpy.mockRestore(); }); }); }); diff --git a/packages/core/src/services/chatRecordingService.ts b/packages/core/src/services/chatRecordingService.ts index c71519f858..cab67f80a1 100644 --- a/packages/core/src/services/chatRecordingService.ts +++ b/packages/core/src/services/chatRecordingService.ts @@ -4,16 +4,17 @@ * SPDX-License-Identifier: Apache-2.0 */ -import { type Status } from '../scheduler/types.js'; import { type ThoughtSummary } from '../utils/thoughtUtils.js'; import { getProjectHash } from '../utils/paths.js'; import path from 'node:path'; -import fs from 'node:fs'; +import * as fs from 'node:fs'; import { sanitizeFilenamePart } from '../utils/fileUtils.js'; +import { isNodeError } from '../utils/errors.js'; import { deleteSessionArtifactsAsync, deleteSubagentSessionDirAndArtifactsAsync, } from '../utils/sessionOperations.js'; +import readline from 'node:readline'; import { randomUUID } from 'node:crypto'; import type { Content, @@ -22,10 +23,21 @@ import type { GenerateContentResponseUsageMetadata, } from '@google/genai'; import { debugLogger } from '../utils/debugLogger.js'; -import type { ToolResultDisplay } from '../tools/tools.js'; import type { AgentLoopContext } from '../config/agent-loop-context.js'; - -export const SESSION_FILE_PREFIX = 'session-'; +import { + SESSION_FILE_PREFIX, + type TokensSummary, + type ToolCallRecord, + type ConversationRecordExtra, + type MessageRecord, + type ConversationRecord, + type ResumedSessionData, + type LoadConversationOptions, + type RewindRecord, + type MetadataUpdateRecord, + type PartialMetadataRecord, +} from './chatRecordingTypes.js'; +export * from './chatRecordingTypes.js'; /** * Warning message shown when recording is disabled due to disk full. @@ -35,103 +47,207 @@ const ENOSPC_WARNING_MESSAGE = 'The conversation will continue but will not be saved to disk. ' + 'Free up disk space and restart to enable recording.'; -/** - * Token usage summary for a message or conversation. - */ -export interface TokensSummary { - input: number; // promptTokenCount - output: number; // candidatesTokenCount - cached: number; // cachedContentTokenCount - thoughts?: number; // thoughtsTokenCount - tool?: number; // toolUsePromptTokenCount - total: number; // totalTokenCount +function hasProperty( + obj: unknown, + prop: T, +): obj is { [key in T]: unknown } { + return obj !== null && typeof obj === 'object' && prop in obj; } -/** - * Base fields common to all messages. - */ -export interface BaseMessageRecord { - id: string; - timestamp: string; - content: PartListUnion; - displayContent?: PartListUnion; +function isStringProperty( + obj: unknown, + prop: T, +): obj is { [key in T]: string } { + return hasProperty(obj, prop) && typeof obj[prop] === 'string'; } -/** - * Record of a tool call execution within a conversation. - */ -export interface ToolCallRecord { - id: string; - name: string; - args: Record; - result?: PartListUnion | null; - status: Status; - timestamp: string; - // UI-specific fields for display purposes - displayName?: string; - description?: string; - resultDisplay?: ToolResultDisplay; - renderOutputAsMarkdown?: boolean; +function isObjectProperty( + obj: unknown, + prop: T, +): obj is { [key in T]: object } { + return ( + hasProperty(obj, prop) && + obj[prop] !== null && + typeof obj[prop] === 'object' + ); } -/** - * Message type and message type-specific fields. - */ -export type ConversationRecordExtra = - | { - type: 'user' | 'info' | 'error' | 'warning'; +function isRewindRecord(record: unknown): record is RewindRecord { + return isStringProperty(record, '$rewindTo'); +} + +function isMessageRecord(record: unknown): record is MessageRecord { + return isStringProperty(record, 'id'); +} + +function isMetadataUpdateRecord( + record: unknown, +): record is MetadataUpdateRecord { + return isObjectProperty(record, '$set'); +} + +function isPartialMetadataRecord( + record: unknown, +): record is PartialMetadataRecord { + return ( + isStringProperty(record, 'sessionId') && + isStringProperty(record, 'projectHash') + ); +} + +function isTextPart(part: unknown): part is { text: string } { + return isStringProperty(part, 'text'); +} + +function isSessionIdRecord(record: unknown): record is { sessionId: string } { + return isStringProperty(record, 'sessionId'); +} + +export async function loadConversationRecord( + filePath: string, + options?: LoadConversationOptions, +): Promise< + | (ConversationRecord & { + messageCount?: number; + firstUserMessage?: string; + hasUserOrAssistantMessage?: boolean; + }) + | null +> { + if (!fs.existsSync(filePath)) { + return null; + } + + try { + const fileStream = fs.createReadStream(filePath); + const rl = readline.createInterface({ + input: fileStream, + crlfDelay: Infinity, + }); + + let metadata: Partial = {}; + const messagesMap = new Map(); + const messageIds: string[] = []; + let firstUserMessageStr: string | undefined; + let hasUserOrAssistant = false; + + for await (const line of rl) { + if (!line.trim()) continue; + try { + const record = JSON.parse(line) as unknown; + if (isRewindRecord(record)) { + const rewindId = record.$rewindTo; + if (options?.metadataOnly) { + const idx = messageIds.indexOf(rewindId); + if (idx !== -1) { + messageIds.splice(idx); + } else { + messageIds.length = 0; + } + // For metadataOnly we can't perfectly un-track hasUserOrAssistant if it was rewinded, + // but we can assume false if messageIds is empty. + if (messageIds.length === 0) hasUserOrAssistant = false; + } else { + let found = false; + const idsToDelete: string[] = []; + for (const [id] of messagesMap) { + if (id === rewindId) found = true; + if (found) idsToDelete.push(id); + } + if (found) { + for (const id of idsToDelete) { + messagesMap.delete(id); + } + } else { + messagesMap.clear(); + } + } + } else if (isMessageRecord(record)) { + const id = record.id; + if ( + hasProperty(record, 'type') && + (record.type === 'user' || record.type === 'gemini') + ) { + hasUserOrAssistant = true; + } + // Track message count and first user message + if (options?.metadataOnly) { + messageIds.push(id); + } + if ( + !firstUserMessageStr && + hasProperty(record, 'type') && + record['type'] === 'user' && + hasProperty(record, 'content') && + record['content'] + ) { + // Basic extraction of first user message for display + const rawContent = record['content']; + if (Array.isArray(rawContent)) { + firstUserMessageStr = rawContent + .map((p: unknown) => (isTextPart(p) ? p['text'] : '')) + .join(''); + } else if (typeof rawContent === 'string') { + firstUserMessageStr = rawContent; + } + } + + if (!options?.metadataOnly) { + messagesMap.set(id, record); + if ( + options?.maxMessages && + messagesMap.size > options.maxMessages + ) { + const firstKey = messagesMap.keys().next().value; + if (typeof firstKey === 'string') messagesMap.delete(firstKey); + } + } + } else if (isMetadataUpdateRecord(record)) { + // Metadata update + metadata = { + ...metadata, + ...record.$set, + }; + } else if (isPartialMetadataRecord(record)) { + // Initial metadata line + metadata = { ...metadata, ...record }; + } + } catch { + // ignore parse errors on individual lines + } } - | { - type: 'gemini'; - toolCalls?: ToolCallRecord[]; - thoughts?: Array; - tokens?: TokensSummary | null; - model?: string; + + if (!metadata.sessionId || !metadata.projectHash) { + return await parseLegacyRecordFallback(filePath, options); + } + + return { + sessionId: metadata.sessionId, + projectHash: metadata.projectHash, + startTime: metadata.startTime || new Date().toISOString(), + lastUpdated: metadata.lastUpdated || new Date().toISOString(), + summary: metadata.summary, + directories: metadata.directories, + kind: metadata.kind, + messages: Array.from(messagesMap.values()), + messageCount: options?.metadataOnly + ? messageIds.length + : messagesMap.size, + firstUserMessage: firstUserMessageStr, + hasUserOrAssistantMessage: options?.metadataOnly + ? hasUserOrAssistant + : Array.from(messagesMap.values()).some( + (m) => m.type === 'user' || m.type === 'gemini', + ), }; - -/** - * A single message record in a conversation. - */ -export type MessageRecord = BaseMessageRecord & ConversationRecordExtra; - -/** - * Complete conversation record stored in session files. - */ -export interface ConversationRecord { - sessionId: string; - projectHash: string; - startTime: string; - lastUpdated: string; - messages: MessageRecord[]; - summary?: string; - /** Workspace directories added during the session via /dir add */ - directories?: string[]; - /** The kind of conversation (main agent or subagent) */ - kind?: 'main' | 'subagent'; + } catch (error) { + debugLogger.error('Error loading conversation record from JSONL:', error); + return null; + } } -/** - * Data structure for resuming an existing session. - */ -export interface ResumedSessionData { - conversation: ConversationRecord; - filePath: string; -} - -/** - * Service for automatically recording chat conversations to disk. - * - * This service provides comprehensive conversation recording that captures: - * - All user and assistant messages - * - Tool calls and their execution results - * - Token usage statistics - * - Assistant thoughts and reasoning - * - * Sessions are stored as JSON files in ~/.gemini/tmp//chats/ - */ export class ChatRecordingService { private conversationFile: string | null = null; - private cachedLastConvData: string | null = null; private cachedConversation: ConversationRecord | null = null; private sessionId: string; private projectHash: string; @@ -146,33 +262,48 @@ export class ChatRecordingService { this.projectHash = getProjectHash(context.config.getProjectRoot()); } - /** - * Initializes the chat recording service: creates a new conversation file and associates it with - * this service instance, or resumes from an existing session if resumedSessionData is provided. - * - * @param resumedSessionData Data from a previous session to resume from. - * @param kind The kind of conversation (main or subagent). - */ - initialize( + async initialize( resumedSessionData?: ResumedSessionData, kind?: 'main' | 'subagent', - ): void { + ): Promise { try { this.kind = kind; if (resumedSessionData) { - // Resume from existing session this.conversationFile = resumedSessionData.filePath; this.sessionId = resumedSessionData.conversation.sessionId; this.kind = resumedSessionData.conversation.kind; - // Update the session ID in the existing file - this.updateConversation((conversation) => { - conversation.sessionId = this.sessionId; - }); + const loadedRecord = await loadConversationRecord( + this.conversationFile, + ); + if (loadedRecord) { + this.cachedConversation = loadedRecord; + this.projectHash = this.cachedConversation.projectHash; - // Clear any cached data to force fresh reads - this.cachedLastConvData = null; - this.cachedConversation = null; + if (this.conversationFile.endsWith('.json')) { + this.conversationFile = this.conversationFile + 'l'; // e.g. session-foo.jsonl + + // Migrate the entire legacy record to the new file + const initialMetadata = { + sessionId: this.sessionId, + projectHash: this.projectHash, + startTime: this.cachedConversation.startTime, + lastUpdated: this.cachedConversation.lastUpdated, + kind: this.cachedConversation.kind, + directories: this.cachedConversation.directories, + summary: this.cachedConversation.summary, + }; + this.appendRecord(initialMetadata); + for (const msg of this.cachedConversation.messages) { + this.appendRecord(msg); + } + } + + // Update the session ID in the existing file + this.updateMetadata({ sessionId: this.sessionId }); + } else { + throw new Error('Failed to load resumed session data from file'); + } } else { // Create new session this.sessionId = this.context.promptId; @@ -209,12 +340,12 @@ export class ChatRecordingService { let filename: string; if (this.kind === 'subagent') { - filename = `${safeSessionId}.json`; + filename = `${safeSessionId}.jsonl`; } else { filename = `${SESSION_FILE_PREFIX}${timestamp}-${safeSessionId.slice( 0, 8, - )}.json`; + )}.jsonl`; } this.conversationFile = path.join(chatsDir, filename); @@ -227,37 +358,74 @@ export class ChatRecordingService { ] : undefined; - this.writeConversation({ + const initialMetadata = { sessionId: this.sessionId, projectHash: this.projectHash, startTime: new Date().toISOString(), lastUpdated: new Date().toISOString(), - messages: [], - directories, kind: this.kind, - }); + directories, + }; + + this.appendRecord(initialMetadata); + this.cachedConversation = { + ...initialMetadata, + messages: [], + }; } - // Clear any queued data since this is a fresh start this.queuedThoughts = []; this.queuedTokens = null; } catch (error) { - // Handle disk full (ENOSPC) gracefully - disable recording but allow CLI to continue - if ( - error instanceof Error && - 'code' in error && - // eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion - (error as NodeJS.ErrnoException).code === 'ENOSPC' - ) { + if (isNodeError(error) && error.code === 'ENOSPC') { this.conversationFile = null; debugLogger.warn(ENOSPC_WARNING_MESSAGE); - return; // Don't throw - allow the CLI to continue + return; } debugLogger.error('Error initializing chat recording service:', error); throw error; } } + private appendRecord(record: unknown): void { + if (!this.conversationFile) return; + try { + const line = JSON.stringify(record) + '\n'; + fs.mkdirSync(path.dirname(this.conversationFile), { recursive: true }); + fs.appendFileSync(this.conversationFile, line); + } catch (error) { + if (isNodeError(error) && error.code === 'ENOSPC') { + this.conversationFile = null; + debugLogger.warn(ENOSPC_WARNING_MESSAGE); + } else { + throw error; + } + } + } + + private updateMetadata(updates: Partial): void { + if (!this.cachedConversation) return; + Object.assign(this.cachedConversation, updates); + this.appendRecord({ $set: updates }); + } + + private pushMessage(msg: MessageRecord): void { + if (!this.cachedConversation) return; + + // We append the full message to the log + this.appendRecord(msg); + + // Now update memory + const index = this.cachedConversation.messages.findIndex( + (m) => m.id === msg.id, + ); + if (index !== -1) { + this.cachedConversation.messages[index] = msg; + } else { + this.cachedConversation.messages.push(msg); + } + } + private getLastMessage( conversation: ConversationRecord, ): MessageRecord | undefined { @@ -278,69 +446,47 @@ export class ChatRecordingService { }; } - /** - * Records a message in the conversation. - */ recordMessage(message: { model: string | undefined; type: ConversationRecordExtra['type']; content: PartListUnion; displayContent?: PartListUnion; }): void { - if (!this.conversationFile) return; + if (!this.conversationFile || !this.cachedConversation) return; try { - this.updateConversation((conversation) => { - const msg = this.newMessage( - message.type, - message.content, - message.displayContent, - ); - if (msg.type === 'gemini') { - // If it's a new Gemini message then incorporate any queued thoughts. - conversation.messages.push({ - ...msg, - thoughts: this.queuedThoughts, - tokens: this.queuedTokens, - model: message.model, - }); - this.queuedThoughts = []; - this.queuedTokens = null; - } else { - // Or else just add it. - conversation.messages.push(msg); - } - }); + const msg = this.newMessage( + message.type, + message.content, + message.displayContent, + ); + if (msg.type === 'gemini') { + msg.thoughts = this.queuedThoughts; + msg.tokens = this.queuedTokens; + msg.model = message.model; + this.queuedThoughts = []; + this.queuedTokens = null; + } + this.pushMessage(msg); + this.updateMetadata({ lastUpdated: new Date().toISOString() }); } catch (error) { debugLogger.error('Error saving message to chat history.', error); throw error; } } - /** - * Records a thought from the assistant's reasoning process. - */ recordThought(thought: ThoughtSummary): void { if (!this.conversationFile) return; - - try { - this.queuedThoughts.push({ - ...thought, - timestamp: new Date().toISOString(), - }); - } catch (error) { - debugLogger.error('Error saving thought to chat history.', error); - throw error; - } + this.queuedThoughts.push({ + ...thought, + timestamp: new Date().toISOString(), + }); } - /** - * Updates the tokens for the last message in the conversation (which should be by Gemini). - */ recordMessageTokens( respUsageMetadata: GenerateContentResponseUsageMetadata, ): void { - if (!this.conversationFile) return; + if (!this.conversationFile || !this.cachedConversation) return; try { const tokens = { @@ -351,17 +497,12 @@ export class ChatRecordingService { tool: respUsageMetadata.toolUsePromptTokenCount ?? 0, total: respUsageMetadata.totalTokenCount ?? 0, }; - const conversation = this.readConversation(); - const lastMsg = this.getLastMessage(conversation); - // If the last message already has token info, it's because this new token info is for a - // new message that hasn't been recorded yet. + const lastMsg = this.getLastMessage(this.cachedConversation); if (lastMsg && lastMsg.type === 'gemini' && !lastMsg.tokens) { lastMsg.tokens = tokens; this.queuedTokens = null; - this.writeConversation(conversation); + this.pushMessage(lastMsg); } else { - // Only queue tokens in memory; no disk I/O needed since the - // conversation record itself hasn't changed. this.queuedTokens = tokens; } } catch (error) { @@ -373,14 +514,9 @@ export class ChatRecordingService { } } - /** - * Adds tool calls to the last message in the conversation (which should be by Gemini). - * This method enriches tool calls with metadata from the ToolRegistry. - */ recordToolCalls(model: string, toolCalls: ToolCallRecord[]): void { - if (!this.conversationFile) return; + if (!this.conversationFile || !this.cachedConversation) return; - // Enrich tool calls with metadata from the ToolRegistry const toolRegistry = this.context.toolRegistry; const enrichedToolCalls = toolCalls.map((toolCall) => { const toolInstance = toolRegistry.getTool(toolCall.name); @@ -394,74 +530,52 @@ export class ChatRecordingService { }); try { - this.updateConversation((conversation) => { - const lastMsg = this.getLastMessage(conversation); - // If a tool call was made, but the last message isn't from Gemini, it's because Gemini is - // calling tools without starting the message with text. So the user submits a prompt, and - // Gemini immediately calls a tool (maybe with some thinking first). In that case, create - // a new empty Gemini message. - // Also if there are any queued thoughts, it means this tool call(s) is from a new Gemini - // message--because it's thought some more since we last, if ever, created a new Gemini - // message from tool calls, when we dequeued the thoughts. - if ( - !lastMsg || - lastMsg.type !== 'gemini' || - this.queuedThoughts.length > 0 - ) { - const newMsg: MessageRecord = { - ...this.newMessage('gemini' as const, ''), - // This isn't strictly necessary, but TypeScript apparently can't - // tell that the first parameter to newMessage() becomes the - // resulting message's type, and so it thinks that toolCalls may - // not be present. Confirming the type here satisfies it. - type: 'gemini' as const, - toolCalls: enrichedToolCalls, - thoughts: this.queuedThoughts, - model, - }; - // If there are any queued thoughts join them to this message. - if (this.queuedThoughts.length > 0) { - newMsg.thoughts = this.queuedThoughts; - this.queuedThoughts = []; - } - // If there's any queued tokens info join it to this message. - if (this.queuedTokens) { - newMsg.tokens = this.queuedTokens; - this.queuedTokens = null; - } - conversation.messages.push(newMsg); - } else { - // The last message is an existing Gemini message that we need to update. + const lastMsg = this.getLastMessage(this.cachedConversation); + if ( + !lastMsg || + lastMsg.type !== 'gemini' || + this.queuedThoughts.length > 0 + ) { + const newMsg: MessageRecord = { + ...this.newMessage('gemini' as const, ''), + type: 'gemini' as const, + toolCalls: enrichedToolCalls, + thoughts: this.queuedThoughts, + model, + }; + if (this.queuedThoughts.length > 0) { + newMsg.thoughts = this.queuedThoughts; + this.queuedThoughts = []; + } + if (this.queuedTokens) { + newMsg.tokens = this.queuedTokens; + this.queuedTokens = null; + } + this.pushMessage(newMsg); + } else { + if (!lastMsg.toolCalls) { + lastMsg.toolCalls = []; + } + // Deep clone toolCalls to avoid modifying memory references directly + const updatedToolCalls = [...lastMsg.toolCalls]; - // Update any existing tool call entries. - if (!lastMsg.toolCalls) { - lastMsg.toolCalls = []; - } - lastMsg.toolCalls = lastMsg.toolCalls.map((toolCall) => { - // If there are multiple tool calls with the same ID, this will take the first one. - const incomingToolCall = toolCalls.find( - (tc) => tc.id === toolCall.id, - ); - if (incomingToolCall) { - // Merge in the new data to keep preserve thoughts, etc., that were assigned to older - // versions of the tool call. - return { ...toolCall, ...incomingToolCall }; - } else { - return toolCall; - } - }); - - // Add any new tools calls that aren't in the message yet. - for (const toolCall of enrichedToolCalls) { - const existingToolCall = lastMsg.toolCalls.find( - (tc) => tc.id === toolCall.id, - ); - if (!existingToolCall) { - lastMsg.toolCalls.push(toolCall); - } + for (const toolCall of enrichedToolCalls) { + const index = updatedToolCalls.findIndex( + (tc) => tc.id === toolCall.id, + ); + if (index !== -1) { + updatedToolCalls[index] = { + ...updatedToolCalls[index], + ...toolCall, + }; + } else { + updatedToolCalls.push(toolCall); } } - }); + + lastMsg.toolCalls = updatedToolCalls; + this.pushMessage(lastMsg); + } } catch (error) { debugLogger.error( 'Error adding tool call to message in chat history.', @@ -471,166 +585,29 @@ export class ChatRecordingService { } } - /** - * Loads up the conversation record from disk. - * - * NOTE: The returned object is the live in-memory cache reference. - * Any mutations to it will be visible to all subsequent reads. - * Callers that mutate the result MUST call writeConversation() to - * persist the changes to disk. - */ - private readConversation(): ConversationRecord { - if (this.cachedConversation) { - return this.cachedConversation; - } - try { - this.cachedLastConvData = fs.readFileSync(this.conversationFile!, 'utf8'); - // eslint-disable-next-line @typescript-eslint/no-unsafe-assignment - this.cachedConversation = JSON.parse(this.cachedLastConvData); - if (!this.cachedConversation) { - // File is corrupt or contains "null". Fallback to an empty conversation. - this.cachedConversation = { - sessionId: this.sessionId, - projectHash: this.projectHash, - startTime: new Date().toISOString(), - lastUpdated: new Date().toISOString(), - messages: [], - kind: this.kind, - }; - } - return this.cachedConversation; - } catch (error) { - // eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion - if ((error as NodeJS.ErrnoException).code !== 'ENOENT') { - debugLogger.error('Error reading conversation file.', error); - throw error; - } - - // Placeholder empty conversation if file doesn't exist. - this.cachedConversation = { - sessionId: this.sessionId, - projectHash: this.projectHash, - startTime: new Date().toISOString(), - lastUpdated: new Date().toISOString(), - messages: [], - kind: this.kind, - }; - return this.cachedConversation; - } - } - - /** - * Saves the conversation record; overwrites the file. - */ - private writeConversation( - conversation: ConversationRecord, - { allowEmpty = false }: { allowEmpty?: boolean } = {}, - ): void { - try { - if (!this.conversationFile) return; - - // Cache the conversation state even if we don't write to disk yet. - // This ensures that subsequent reads (e.g. during recordMessage) - // see the initial state (like directories) instead of trying to - // read a non-existent file from disk. - this.cachedConversation = conversation; - - // Don't write the file yet until there's at least one message. - if (conversation.messages.length === 0 && !allowEmpty) return; - - const newContent = JSON.stringify(conversation, null, 2); - // Skip the disk write if nothing actually changed (e.g. - // updateMessagesFromHistory found no matching tool calls to update). - // Compare before updating lastUpdated so the timestamp doesn't - // cause a false diff. - if (this.cachedLastConvData === newContent) return; - conversation.lastUpdated = new Date().toISOString(); - const contentToWrite = JSON.stringify(conversation, null, 2); - this.cachedLastConvData = contentToWrite; - // Ensure directory exists before writing (handles cases where temp dir was cleaned) - fs.mkdirSync(path.dirname(this.conversationFile), { recursive: true }); - fs.writeFileSync(this.conversationFile, contentToWrite); - } catch (error) { - // Handle disk full (ENOSPC) gracefully - disable recording but allow conversation to continue - if ( - error instanceof Error && - 'code' in error && - // eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion - (error as NodeJS.ErrnoException).code === 'ENOSPC' - ) { - this.conversationFile = null; - this.cachedConversation = null; - debugLogger.warn(ENOSPC_WARNING_MESSAGE); - return; // Don't throw - allow the conversation to continue - } - debugLogger.error('Error writing conversation file.', error); - throw error; - } - } - - /** - * Convenient helper for updating the conversation without file reading and writing and time - * updating boilerplate. - */ - private updateConversation( - updateFn: (conversation: ConversationRecord) => void, - ) { - const conversation = this.readConversation(); - updateFn(conversation); - this.writeConversation(conversation); - } - - /** - * Saves a summary for the current session. - */ saveSummary(summary: string): void { if (!this.conversationFile) return; - try { - this.updateConversation((conversation) => { - conversation.summary = summary; - }); + this.updateMetadata({ summary }); } catch (error) { debugLogger.error('Error saving summary to chat history.', error); - // Don't throw - we want graceful degradation } } - /** - * Records workspace directories to the session file. - * Called when directories are added via /dir add. - */ recordDirectories(directories: readonly string[]): void { if (!this.conversationFile) return; - try { - this.updateConversation((conversation) => { - conversation.directories = [...directories]; - }); + this.updateMetadata({ directories: [...directories] }); } catch (error) { debugLogger.error('Error saving directories to chat history.', error); - // Don't throw - we want graceful degradation } } - /** - * Gets the current conversation data (for summary generation). - */ getConversation(): ConversationRecord | null { if (!this.conversationFile) return null; - - try { - return this.readConversation(); - } catch (error) { - debugLogger.error('Error reading conversation for summary.', error); - return null; - } + return this.cachedConversation; } - /** - * Gets the path to the current conversation file. - * Returns null if the service hasn't been initialized yet or recording is disabled. - */ getConversationFilePath(): string | null { return this.conversationFile; } @@ -646,7 +623,6 @@ export class ChatRecordingService { try { const tempDir = this.context.config.storage.getProjectTempDir(); const chatsDir = path.join(tempDir, 'chats'); - const shortId = this.deriveShortId(sessionIdOrBasename); // Using stat instead of existsSync for async sanity @@ -654,8 +630,10 @@ export class ChatRecordingService { return; // Nothing to delete } - const matchingFiles = this.getMatchingSessionFiles(chatsDir, shortId); - + const matchingFiles = await this.getMatchingSessionFiles( + chatsDir, + shortId, + ); for (const file of matchingFiles) { await this.deleteSessionAndArtifacts(chatsDir, file, tempDir); } @@ -665,13 +643,10 @@ export class ChatRecordingService { } } - /** - * Derives an 8-character shortId from a sessionId, filename, or basename. - */ private deriveShortId(sessionIdOrBasename: string): string { let shortId = sessionIdOrBasename; if (sessionIdOrBasename.startsWith(SESSION_FILE_PREFIX)) { - const withoutExt = sessionIdOrBasename.replace('.json', ''); + const withoutExt = sessionIdOrBasename.replace(/\.jsonl?$/, ''); const parts = withoutExt.split('-'); shortId = parts[parts.length - 1]; } else if (sessionIdOrBasename.length >= 8) { @@ -687,14 +662,15 @@ export class ChatRecordingService { return shortId; } - /** - * Finds all session files matching the pattern session-*-.json - */ - private getMatchingSessionFiles(chatsDir: string, shortId: string): string[] { - const files = fs.readdirSync(chatsDir); + private async getMatchingSessionFiles( + chatsDir: string, + shortId: string, + ): Promise { + const files = await fs.promises.readdir(chatsDir); return files.filter( (f) => - f.startsWith(SESSION_FILE_PREFIX) && f.endsWith(`-${shortId}.json`), + f.startsWith(SESSION_FILE_PREFIX) && + (f.endsWith(`-${shortId}.json`) || f.endsWith(`-${shortId}.jsonl`)), ); } @@ -708,15 +684,34 @@ export class ChatRecordingService { ): Promise { const filePath = path.join(chatsDir, file); try { - const fileContent = await fs.promises.readFile(filePath, 'utf8'); - const content = JSON.parse(fileContent) as unknown; + const CHUNK_SIZE = 4096; + const buffer = Buffer.alloc(CHUNK_SIZE); + let firstLine: string; + let fd: fs.promises.FileHandle | undefined; + try { + fd = await fs.promises.open(filePath, 'r'); + const { bytesRead } = await fd.read(buffer, 0, CHUNK_SIZE, 0); + if (bytesRead === 0) { + await fd.close(); + await fs.promises.unlink(filePath); + return; + } + const contentChunk = buffer.toString('utf8', 0, bytesRead); + const newlineIndex = contentChunk.indexOf('\n'); + firstLine = + newlineIndex !== -1 + ? contentChunk.substring(0, newlineIndex) + : contentChunk; + } finally { + if (fd !== undefined) { + await fd.close(); + } + } + const content = JSON.parse(firstLine) as unknown; let fullSessionId: string | undefined; - if (content && typeof content === 'object' && 'sessionId' in content) { - const id = (content as Record)['sessionId']; - if (typeof id === 'string') { - fullSessionId = id; - } + if (isSessionIdRecord(content)) { + fullSessionId = content['sessionId']; } // Delete the session file @@ -741,11 +736,9 @@ export class ChatRecordingService { * All messages from (and including) the specified ID onwards are removed. */ rewindTo(messageId: string): ConversationRecord | null { - if (!this.conversationFile) { - return null; - } - const conversation = this.readConversation(); - const messageIndex = conversation.messages.findIndex( + if (!this.conversationFile || !this.cachedConversation) return null; + + const messageIndex = this.cachedConversation.messages.findIndex( (m) => m.id === messageId, ); @@ -753,67 +746,60 @@ export class ChatRecordingService { debugLogger.error( 'Message to rewind to not found in conversation history', ); - return conversation; + return this.cachedConversation; } - conversation.messages = conversation.messages.slice(0, messageIndex); - this.writeConversation(conversation, { allowEmpty: true }); - return conversation; + this.cachedConversation.messages = this.cachedConversation.messages.slice( + 0, + messageIndex, + ); + this.appendRecord({ $rewindTo: messageId }); + return this.cachedConversation; } - /** - * Updates the conversation history based on the provided API Content array. - * This is used to persist changes made to the history (like masking) back to disk. - */ updateMessagesFromHistory(history: readonly Content[]): void { - if (!this.conversationFile) return; + if (!this.conversationFile || !this.cachedConversation) return; try { - this.updateConversation((conversation) => { - // Create a map of tool results from the API history for quick lookup by call ID. - // We store the full list of parts associated with each tool call ID to preserve - // multi-modal data and proper trajectory structure. - const partsMap = new Map(); - for (const content of history) { - if (content.role === 'user' && content.parts) { - // Find all unique call IDs in this message - const callIds = content.parts - .map((p) => p.functionResponse?.id) - .filter((id): id is string => !!id); + const partsMap = new Map(); + for (const content of history) { + if (content.role === 'user' && content.parts) { + const callIds = content.parts + .map((p) => p.functionResponse?.id) + .filter((id): id is string => !!id); - if (callIds.length === 0) continue; + if (callIds.length === 0) continue; - // Use the first ID as a seed to capture any "leading" non-ID parts - // in this specific content block. - let currentCallId = callIds[0]; - for (const part of content.parts) { - if (part.functionResponse?.id) { - currentCallId = part.functionResponse.id; - } + let currentCallId = callIds[0]; + for (const part of content.parts) { + if (part.functionResponse?.id) { + currentCallId = part.functionResponse.id; + } - if (!partsMap.has(currentCallId)) { - partsMap.set(currentCallId, []); - } - partsMap.get(currentCallId)!.push(part); + if (!partsMap.has(currentCallId)) { + partsMap.set(currentCallId, []); + } + partsMap.get(currentCallId)!.push(part); + } + } + } + + for (const message of this.cachedConversation.messages) { + let msgChanged = false; + if (message.type === 'gemini' && message.toolCalls) { + for (const toolCall of message.toolCalls) { + const newParts = partsMap.get(toolCall.id); + if (newParts !== undefined) { + toolCall.result = newParts; + msgChanged = true; } } } - - // Update the conversation records tool results if they've changed. - for (const message of conversation.messages) { - if (message.type === 'gemini' && message.toolCalls) { - for (const toolCall of message.toolCalls) { - const newParts = partsMap.get(toolCall.id); - if (newParts !== undefined) { - // Store the results as proper Parts (including functionResponse) - // instead of stringifying them as text parts. This ensures the - // tool trajectory is correctly reconstructed upon session resumption. - toolCall.result = newParts; - } - } - } + if (msgChanged) { + // Push updated message to log + this.pushMessage(message); } - }); + } } catch (error) { debugLogger.error( 'Error updating conversation history from memory.', @@ -823,3 +809,63 @@ export class ChatRecordingService { } } } + +async function parseLegacyRecordFallback( + filePath: string, + options?: LoadConversationOptions, +): Promise< + | (ConversationRecord & { + messageCount?: number; + firstUserMessage?: string; + hasUserOrAssistantMessage?: boolean; + }) + | null +> { + try { + const fileContent = await fs.promises.readFile(filePath, 'utf8'); + const parsed = JSON.parse(fileContent) as unknown; + + const isLegacyRecord = (val: unknown): val is ConversationRecord => + typeof val === 'object' && val !== null && 'sessionId' in val; + + if (isLegacyRecord(parsed)) { + const legacyRecord = parsed; + if (options?.metadataOnly) { + let fallbackFirstUserMessageStr: string | undefined; + const firstUserMessage = legacyRecord.messages?.find( + (m) => m.type === 'user', + ); + if (firstUserMessage) { + const rawContent = firstUserMessage.content; + if (Array.isArray(rawContent)) { + fallbackFirstUserMessageStr = rawContent + .map((p: unknown) => (isTextPart(p) ? p['text'] : '')) + .join(''); + } else if (typeof rawContent === 'string') { + fallbackFirstUserMessageStr = rawContent; + } + } + return { + ...legacyRecord, + messages: [], + messageCount: legacyRecord.messages?.length || 0, + firstUserMessage: fallbackFirstUserMessageStr, + hasUserOrAssistantMessage: + legacyRecord.messages?.some( + (m) => m.type === 'user' || m.type === 'gemini', + ) || false, + }; + } + return { + ...legacyRecord, + hasUserOrAssistantMessage: + legacyRecord.messages?.some( + (m) => m.type === 'user' || m.type === 'gemini', + ) || false, + }; + } + } catch { + // ignore legacy fallback parse error + } + return null; +} diff --git a/packages/core/src/services/chatRecordingTypes.ts b/packages/core/src/services/chatRecordingTypes.ts new file mode 100644 index 0000000000..c2564c0eec --- /dev/null +++ b/packages/core/src/services/chatRecordingTypes.ts @@ -0,0 +1,124 @@ +/** + * @license + * Copyright 2026 Google LLC + * SPDX-License-Identifier: Apache-2.0 + */ + +import type { PartListUnion } from '@google/genai'; +import type { Status } from '../scheduler/types.js'; +import type { ToolResultDisplay } from '../tools/tools.js'; +import { type ThoughtSummary } from '../utils/thoughtUtils.js'; + +export const SESSION_FILE_PREFIX = 'session-'; +export const MAX_HISTORY_MESSAGES = 50; +export const MAX_TOOL_OUTPUT_SIZE = 50 * 1024; // 50KB + +/** + * Token usage summary for a message or conversation. + */ +export interface TokensSummary { + input: number; // promptTokenCount + output: number; // candidatesTokenCount + cached: number; // cachedContentTokenCount + thoughts?: number; // thoughtsTokenCount + tool?: number; // toolUsePromptTokenCount + total: number; // totalTokenCount +} + +/** + * Base fields common to all messages. + */ +export interface BaseMessageRecord { + id: string; + timestamp: string; + content: PartListUnion; + displayContent?: PartListUnion; +} + +/** + * Record of a tool call execution within a conversation. + */ +export interface ToolCallRecord { + id: string; + name: string; + args: Record; + result?: PartListUnion | null; + status: Status; + timestamp: string; + // UI-specific fields for display purposes + displayName?: string; + description?: string; + resultDisplay?: ToolResultDisplay; + renderOutputAsMarkdown?: boolean; +} + +/** + * Message type and message type-specific fields. + */ +export type ConversationRecordExtra = + | { + type: 'user' | 'info' | 'error' | 'warning'; + } + | { + type: 'gemini'; + toolCalls?: ToolCallRecord[]; + thoughts?: Array; + tokens?: TokensSummary | null; + model?: string; + }; + +/** + * A single message record in a conversation. + */ +export type MessageRecord = BaseMessageRecord & ConversationRecordExtra; + +/** + * Complete conversation record stored in session files. + */ +export interface ConversationRecord { + sessionId: string; + projectHash: string; + startTime: string; + lastUpdated: string; + messages: MessageRecord[]; + summary?: string; + /** Workspace directories added during the session via /dir add */ + directories?: string[]; + /** The kind of conversation (main agent or subagent) */ + kind?: 'main' | 'subagent'; +} + +/** + * Data structure for resuming an existing session. + */ +export interface ResumedSessionData { + conversation: ConversationRecord; + filePath: string; +} + +/** + * Loads a ConversationRecord from a JSONL session file. + * Returns null if the file is invalid or cannot be read. + */ +export interface LoadConversationOptions { + maxMessages?: number; + metadataOnly?: boolean; +} + +export interface RewindRecord { + $rewindTo: string; +} + +export interface MetadataUpdateRecord { + $set: Partial; +} + +export interface PartialMetadataRecord { + sessionId: string; + projectHash: string; + startTime?: string; + lastUpdated?: string; + summary?: string; + directories?: string[]; + kind?: 'main' | 'subagent'; +} diff --git a/packages/core/src/utils/sessionOperations.ts b/packages/core/src/utils/sessionOperations.ts index 24ff43aa00..8a6da85d8e 100644 --- a/packages/core/src/utils/sessionOperations.ts +++ b/packages/core/src/utils/sessionOperations.ts @@ -98,8 +98,11 @@ export async function deleteSubagentSessionDirAndArtifactsAsync( }); for (const file of files) { - if (file.isFile() && file.name.endsWith('.json')) { - const agentId = path.basename(file.name, '.json'); + if ( + file.isFile() && + (file.name.endsWith('.json') || file.name.endsWith('.jsonl')) + ) { + const agentId = path.basename(file.name, path.extname(file.name)); await deleteSessionArtifactsAsync(agentId, tempDir); } } diff --git a/packages/sdk/src/agent.ts b/packages/sdk/src/agent.ts index 6e713c0fe1..dba25ca444 100644 --- a/packages/sdk/src/agent.ts +++ b/packages/sdk/src/agent.ts @@ -10,6 +10,7 @@ import { createSessionId, type ResumedSessionData, type ConversationRecord, + loadConversationRecord, } from '@google/gemini-cli-core'; import { GeminiCliSession } from './session.js'; @@ -55,9 +56,11 @@ export class GeminiCliAgent { const filesToCheck = candidates.length > 0 ? candidates : sessions; for (const sessionFile of filesToCheck) { - const loaded = await storage.loadProjectTempFile( + const absolutePath = path.join( + storage.getProjectTempDir(), sessionFile.filePath, ); + const loaded = await loadConversationRecord(absolutePath); if (loaded && loaded.sessionId === sessionId) { conversation = loaded; filePath = path.join(storage.getProjectTempDir(), sessionFile.filePath);