fix(core): support jsonl session logs in memory and summary services (#25816)

This commit is contained in:
Sandy Tao
2026-04-22 16:07:39 -07:00
committed by GitHub
parent 9c0a6864da
commit 5318610c1d
13 changed files with 1396 additions and 256 deletions
@@ -117,6 +117,35 @@ function createConversation(
};
}
async function writeConversationJsonl(
filePath: string,
conversation: ConversationRecord,
): Promise<void> {
const metadata = {
sessionId: conversation.sessionId,
projectHash: conversation.projectHash,
startTime: conversation.startTime,
lastUpdated: conversation.lastUpdated,
summary: conversation.summary,
directories: conversation.directories,
kind: conversation.kind,
};
const records = [metadata, ...conversation.messages];
await fs.writeFile(
filePath,
records.map((record) => JSON.stringify(record)).join('\n') + '\n',
);
}
async function setSessionMtime(
filePath: string,
timestamp: string,
): Promise<void> {
const date = new Date(timestamp);
await fs.utimes(filePath, date, date);
}
describe('memoryService', () => {
let tmpDir: string;
@@ -535,6 +564,150 @@ describe('memoryService', () => {
expect.stringContaining('/memory inbox'),
);
});
it('records only sessions whose read_file calls succeed as processed', async () => {
const { startMemoryService, readExtractionState } = await import(
'./memoryService.js'
);
const { LocalAgentExecutor } = await import(
'../agents/local-executor.js'
);
vi.mocked(LocalAgentExecutor.create).mockReset();
const memoryDir = path.join(tmpDir, 'memory-read-tracking');
const skillsDir = path.join(tmpDir, 'skills-read-tracking');
const projectTempDir = path.join(tmpDir, 'temp-read-tracking');
const chatsDir = path.join(projectTempDir, 'chats');
await fs.mkdir(memoryDir, { recursive: true });
await fs.mkdir(skillsDir, { recursive: true });
await fs.mkdir(chatsDir, { recursive: true });
const openedConversation = createConversation({
sessionId: 'opened-session',
summary: 'Read this one',
messageCount: 20,
lastUpdated: '2025-01-02T01:00:00Z',
});
const skippedConversation = createConversation({
sessionId: 'skipped-session',
summary: 'Do not read this one',
messageCount: 20,
lastUpdated: '2025-01-01T01:00:00Z',
});
const openedPath = path.join(
chatsDir,
`${SESSION_FILE_PREFIX}2025-01-02T00-00-opened.jsonl`,
);
const skippedPath = path.join(
chatsDir,
`${SESSION_FILE_PREFIX}2025-01-01T00-00-skipped.jsonl`,
);
await writeConversationJsonl(openedPath, openedConversation);
await writeConversationJsonl(skippedPath, skippedConversation);
vi.mocked(LocalAgentExecutor.create).mockImplementationOnce(
async (_definition, _context, onActivity) =>
({
run: vi.fn().mockImplementation(async () => {
onActivity?.({
isSubagentActivityEvent: true,
agentName: 'Skill Extractor',
type: 'TOOL_CALL_START',
data: {
name: 'read_file',
args: { file_path: openedPath },
callId: 'call-opened',
},
});
onActivity?.({
isSubagentActivityEvent: true,
agentName: 'Skill Extractor',
type: 'TOOL_CALL_START',
data: {
name: 'read_file',
args: { file_path: skippedPath },
callId: 'call-skipped',
},
});
onActivity?.({
isSubagentActivityEvent: true,
agentName: 'Skill Extractor',
type: 'ERROR',
data: {
name: 'read_file',
callId: 'call-skipped',
error: 'access denied',
},
});
onActivity?.({
isSubagentActivityEvent: true,
agentName: 'Skill Extractor',
type: 'TOOL_CALL_END',
data: {
name: 'read_file',
id: 'call-opened',
data: { content: 'Read this one' },
},
});
onActivity?.({
isSubagentActivityEvent: true,
agentName: 'Skill Extractor',
type: 'TOOL_CALL_START',
data: {
name: 'read_file',
args: { file_path: path.join(chatsDir, 'unrelated.jsonl') },
callId: 'call-unrelated',
},
});
return undefined;
}),
}) as never,
);
const mockConfig = {
storage: {
getProjectMemoryDir: vi.fn().mockReturnValue(memoryDir),
getProjectMemoryTempDir: vi.fn().mockReturnValue(memoryDir),
getProjectSkillsMemoryDir: vi.fn().mockReturnValue(skillsDir),
getProjectTempDir: vi.fn().mockReturnValue(projectTempDir),
},
getToolRegistry: vi.fn(),
getMessageBus: vi.fn(),
getGeminiClient: vi.fn(),
getSkillManager: vi.fn().mockReturnValue({ getSkills: () => [] }),
modelConfigService: {
registerRuntimeModelConfig: vi.fn(),
},
getTargetDir: vi.fn().mockReturnValue(tmpDir),
sandboxManager: undefined,
} as unknown as Parameters<typeof startMemoryService>[0];
await startMemoryService(mockConfig);
const state = await readExtractionState(
path.join(memoryDir, '.extraction-state.json'),
);
expect(state.runs).toHaveLength(1);
expect(state.runs[0].candidateSessions).toEqual([
{
sessionId: 'opened-session',
lastUpdated: '2025-01-02T01:00:00Z',
},
{
sessionId: 'skipped-session',
lastUpdated: '2025-01-01T01:00:00Z',
},
]);
expect(state.runs[0].processedSessions).toEqual([
{
sessionId: 'opened-session',
lastUpdated: '2025-01-02T01:00:00Z',
},
]);
expect(state.runs[0].sessionIds).toEqual(['opened-session']);
});
});
describe('getProcessedSessionIds', () => {
@@ -663,7 +836,7 @@ describe('memoryService', () => {
const state: ExtractionState = {
runs: [
{
runAt: '2025-01-01T00:00:00Z',
runAt: '2025-01-01T02:00:00Z',
sessionIds: ['old-session'],
skillsCreated: [],
},
@@ -676,6 +849,39 @@ describe('memoryService', () => {
expect(result.sessionIndex).not.toContain('[NEW]');
});
it('treats resumed legacy sessions as [NEW] when lastUpdated moved past the old run', async () => {
const { buildSessionIndex } = await import('./memoryService.js');
const conversation = createConversation({
sessionId: 'resumed-session',
summary: 'Resumed after extraction',
messageCount: 20,
lastUpdated: '2025-01-01T03:00:00Z',
});
await fs.writeFile(
path.join(
chatsDir,
`${SESSION_FILE_PREFIX}2025-01-01T00-00-resumed01.json`,
),
JSON.stringify(conversation),
);
const state: ExtractionState = {
runs: [
{
runAt: '2025-01-01T02:00:00Z',
sessionIds: ['resumed-session'],
skillsCreated: [],
},
],
};
const result = await buildSessionIndex(chatsDir, state);
expect(result.sessionIndex).toContain('[NEW]');
expect(result.newSessionIds).toEqual(['resumed-session']);
});
it('includes file path and summary in each line', async () => {
const { buildSessionIndex } = await import('./memoryService.js');
@@ -800,7 +1006,7 @@ describe('memoryService', () => {
const state: ExtractionState = {
runs: [
{
runAt: '2025-01-01T00:00:00Z',
runAt: '2025-01-01T02:00:00Z',
sessionIds: ['processed-one'],
skillsCreated: [],
},
@@ -815,6 +1021,136 @@ describe('memoryService', () => {
expect(result.sessionIndex).toContain('[NEW]');
expect(result.sessionIndex).toContain('[old]');
});
it('reads JSONL sessions and sorts by actual lastUpdated instead of filename', async () => {
const { buildSessionIndex } = await import('./memoryService.js');
const olderByName = createConversation({
sessionId: 'older-by-name',
summary: 'Filename looks newer',
messageCount: 20,
lastUpdated: '2025-01-01T01:00:00Z',
});
const newerByActivity = createConversation({
sessionId: 'newer-by-activity',
summary: 'Actually most recent',
messageCount: 20,
lastUpdated: '2025-02-01T01:00:00Z',
});
await writeConversationJsonl(
path.join(
chatsDir,
`${SESSION_FILE_PREFIX}2025-02-01T00-00-oldername.jsonl`,
),
olderByName,
);
await writeConversationJsonl(
path.join(
chatsDir,
`${SESSION_FILE_PREFIX}2025-01-01T00-00-neweractv.jsonl`,
),
newerByActivity,
);
const result = await buildSessionIndex(chatsDir, { runs: [] });
const firstLine = result.sessionIndex.split('\n')[0];
expect(firstLine).toContain('Actually most recent');
expect(firstLine).not.toContain('Filename looks newer');
});
it('rotates in older unprocessed sessions instead of starving them behind retried recent ones', async () => {
const { buildSessionIndex } = await import('./memoryService.js');
for (let i = 0; i < 11; i++) {
const day = String(11 - i).padStart(2, '0');
const conversation = createConversation({
sessionId: `backlog-${i}`,
summary: `Backlog ${i}`,
messageCount: 20,
lastUpdated: `2025-01-${day}T01:00:00Z`,
});
await fs.writeFile(
path.join(
chatsDir,
`${SESSION_FILE_PREFIX}2025-01-${day}T00-00-backlog${i}.json`,
),
JSON.stringify(conversation),
);
}
const state: ExtractionState = {
runs: [
{
runAt: '2025-02-01T00:00:00Z',
sessionIds: [],
candidateSessions: Array.from({ length: 10 }, (_, i) => ({
sessionId: `backlog-${i}`,
lastUpdated: `2025-01-${String(11 - i).padStart(2, '0')}T01:00:00Z`,
})),
skillsCreated: [],
},
],
};
const result = await buildSessionIndex(chatsDir, state);
expect(result.newSessionIds).toContain('backlog-10');
expect(result.newSessionIds).not.toContain('backlog-9');
});
it('surfaces older unprocessed sessions even when the newest 100 files were already processed', async () => {
const { buildSessionIndex } = await import('./memoryService.js');
const processedSessions: ExtractionRun['processedSessions'] = [];
for (let i = 0; i < 105; i++) {
const timestamp = new Date(
Date.UTC(2025, 0, 1, 0, 0, 105 - i),
).toISOString();
const conversation = createConversation({
sessionId: `backlog-${i}`,
summary: `Backlog ${i}`,
messageCount: 20,
lastUpdated: timestamp,
});
const filePath = path.join(
chatsDir,
`${SESSION_FILE_PREFIX}2025-01-01T00-00-backlog${String(i).padStart(3, '0')}.json`,
);
await fs.writeFile(filePath, JSON.stringify(conversation));
await setSessionMtime(filePath, timestamp);
if (i < 100) {
processedSessions.push({
sessionId: conversation.sessionId,
lastUpdated: conversation.lastUpdated,
});
}
}
const result = await buildSessionIndex(chatsDir, {
runs: [
{
runAt: '2025-02-01T00:00:00Z',
sessionIds: processedSessions.map((session) => session.sessionId),
processedSessions,
skillsCreated: [],
},
],
});
expect(result.newSessionIds).toEqual([
'backlog-100',
'backlog-101',
'backlog-102',
'backlog-103',
'backlog-104',
]);
expect(result.sessionIndex).toContain('Backlog 100');
expect(result.sessionIndex).toContain('Backlog 104');
});
});
describe('ExtractionState runs tracking', () => {
@@ -827,6 +1163,18 @@ describe('memoryService', () => {
{
runAt: '2025-06-01T00:00:00Z',
sessionIds: ['s1'],
candidateSessions: [
{
sessionId: 's1',
lastUpdated: '2025-05-31T12:00:00Z',
},
],
processedSessions: [
{
sessionId: 's1',
lastUpdated: '2025-05-31T12:00:00Z',
},
],
skillsCreated: ['debug-helper', 'test-gen'],
},
],
@@ -840,6 +1188,18 @@ describe('memoryService', () => {
'debug-helper',
'test-gen',
]);
expect(result.runs[0].candidateSessions).toEqual([
{
sessionId: 's1',
lastUpdated: '2025-05-31T12:00:00Z',
},
]);
expect(result.runs[0].processedSessions).toEqual([
{
sessionId: 's1',
lastUpdated: '2025-05-31T12:00:00Z',
},
]);
expect(result.runs[0].sessionIds).toEqual(['s1']);
expect(result.runs[0].runAt).toBe('2025-06-01T00:00:00Z');
});
@@ -854,6 +1214,26 @@ describe('memoryService', () => {
{
runAt: '2025-01-01T00:00:00Z',
sessionIds: ['a', 'b'],
candidateSessions: [
{
sessionId: 'a',
lastUpdated: '2024-12-31T23:00:00Z',
},
{
sessionId: 'b',
lastUpdated: '2024-12-31T22:00:00Z',
},
],
processedSessions: [
{
sessionId: 'a',
lastUpdated: '2024-12-31T23:00:00Z',
},
{
sessionId: 'b',
lastUpdated: '2024-12-31T22:00:00Z',
},
],
skillsCreated: ['skill-x'],
},
{