feat(memory): add Auto Memory inbox flow with canonical-patch contract (#26338)

This commit is contained in:
Sandy Tao
2026-05-04 12:07:13 -07:00
committed by GitHub
parent 60a6a47d56
commit a7beb890d0
26 changed files with 4279 additions and 115 deletions
+260 -11
View File
@@ -6,7 +6,7 @@
import * as fs from 'node:fs/promises';
import * as path from 'node:path';
import { constants as fsConstants } from 'node:fs';
import { constants as fsConstants, type Dirent } from 'node:fs';
import { randomUUID } from 'node:crypto';
import * as Diff from 'diff';
import type { Config } from '../config/config.js';
@@ -45,6 +45,11 @@ import { sanitizeWorkflowSummaryForScratchpad } from './sessionScratchpadUtils.j
const LOCK_FILENAME = '.extraction.lock';
const STATE_FILENAME = '.extraction-state.json';
const LOCK_STALE_MS = 35 * 60 * 1000; // 35 minutes (exceeds agent's 30-min time limit)
// Throttle: skip background extraction if the most recent run finished less
// than this long ago. Pairs with the advisory lock — the lock prevents
// concurrent runs; this throttle prevents back-to-back runs across short
// CLI sessions on workspaces with a lot of session history.
const MIN_EXTRACTION_INTERVAL_MS = 30 * 60 * 1000; // 30 minutes
const MIN_USER_MESSAGES = 10;
const MIN_IDLE_MS = 3 * 60 * 60 * 1000; // 3 hours
const MAX_SESSION_INDEX_SIZE = 50;
@@ -78,6 +83,8 @@ export interface ExtractionRun {
sessionIds: string[];
candidateSessions?: SessionVersion[];
processedSessions?: SessionVersion[];
memoryCandidatesCreated?: string[];
memoryFilesUpdated?: string[];
skillsCreated: string[];
turnCount?: number;
durationMs?: number;
@@ -163,6 +170,8 @@ function isExtractionRunLike(value: unknown): value is {
sessionIds?: unknown;
candidateSessions?: unknown;
processedSessions?: unknown;
memoryCandidatesCreated?: unknown;
memoryFilesUpdated?: unknown;
skillsCreated: unknown;
turnCount?: unknown;
durationMs?: unknown;
@@ -194,22 +203,44 @@ function buildExtractionRun(value: unknown): ExtractionRun | null {
const candidateSessions = normalizeSessionVersions(value.candidateSessions);
const processedSessions = normalizeSessionVersions(value.processedSessions);
const sessionIds = normalizeStringArray(value.sessionIds);
return {
const run: ExtractionRun = {
runAt: value.runAt,
sessionIds:
sessionIds.length > 0
? sessionIds
: processedSessions.map((session) => session.sessionId),
candidateSessions:
candidateSessions.length > 0 ? candidateSessions : undefined,
processedSessions:
processedSessions.length > 0 ? processedSessions : undefined,
skillsCreated: normalizeStringArray(value.skillsCreated),
turnCount: normalizeOptionalNumber(value.turnCount),
durationMs: normalizeOptionalNumber(value.durationMs),
terminateReason: normalizeOptionalString(value.terminateReason),
};
if (candidateSessions.length > 0) {
run.candidateSessions = candidateSessions;
}
if (processedSessions.length > 0) {
run.processedSessions = processedSessions;
}
if ('memoryCandidatesCreated' in value) {
run.memoryCandidatesCreated = normalizeStringArray(
value.memoryCandidatesCreated,
);
}
if ('memoryFilesUpdated' in value) {
run.memoryFilesUpdated = normalizeStringArray(value.memoryFilesUpdated);
}
const turnCount = normalizeOptionalNumber(value.turnCount);
if (turnCount !== undefined) {
run.turnCount = turnCount;
}
const durationMs = normalizeOptionalNumber(value.durationMs);
if (durationMs !== undefined) {
run.durationMs = durationMs;
}
const terminateReason = normalizeOptionalString(value.terminateReason);
if (terminateReason !== undefined) {
run.terminateReason = terminateReason;
}
return run;
}
function getTimestampMs(timestamp: string): number {
@@ -897,6 +928,164 @@ export async function validatePatches(
return validPatches;
}
type FileSnapshot = Map<string, string>;
async function snapshotFiles(
rootDir: string,
shouldIncludeFile: (relativePath: string) => boolean = () => true,
shouldDescendDirectory: (relativePath: string) => boolean = () => true,
): Promise<FileSnapshot> {
const snapshot: FileSnapshot = new Map();
async function walk(currentDir: string): Promise<void> {
let entries: Array<Dirent<string>>;
try {
entries = await fs.readdir(currentDir, { withFileTypes: true });
} catch {
return;
}
for (const entry of entries) {
const absolutePath = path.join(currentDir, entry.name);
const relativePath = path.relative(rootDir, absolutePath);
if (!relativePath) {
continue;
}
if (entry.isDirectory()) {
if (shouldDescendDirectory(relativePath)) {
await walk(absolutePath);
}
continue;
}
if (!entry.isFile() || !shouldIncludeFile(relativePath)) {
continue;
}
try {
snapshot.set(relativePath, await fs.readFile(absolutePath, 'utf-8'));
} catch {
// Best-effort snapshot: ignore files that disappear or are unreadable.
}
}
}
await walk(rootDir);
return snapshot;
}
async function snapshotInboxCandidates(
memoryDir: string,
): Promise<FileSnapshot> {
return snapshotFiles(path.join(memoryDir, '.inbox'));
}
/**
* Builds a human-readable summary of the current memory inbox state, grouped
* by kind and showing the contents of each `.patch` file. Used as part of the
* extraction agent's initial context so the agent can extend existing
* canonical patches in-place rather than creating new files each session.
*
* Returns an empty string if the inbox is empty.
*/
async function buildPendingInboxSummary(memoryDir: string): Promise<string> {
const sections: string[] = [];
for (const kind of ['private', 'global'] as const) {
const kindRoot = path.join(memoryDir, '.inbox', kind);
let entries: Array<Dirent<string>>;
try {
entries = await fs.readdir(kindRoot, { withFileTypes: true });
} catch {
continue;
}
const patchFiles = entries
.filter((e) => e.isFile() && e.name.endsWith('.patch'))
.map((e) => e.name)
.sort();
if (patchFiles.length === 0) {
continue;
}
const filesSection: string[] = [`## ${kind} (${patchFiles.length})`];
for (const fileName of patchFiles) {
const fullPath = path.join(kindRoot, fileName);
let content = '';
try {
content = await fs.readFile(fullPath, 'utf-8');
} catch {
continue;
}
// Guard against indirect prompt injection: patch contents originate
// from past sessions (which may include user-pasted text), so a
// crafted payload could include a closing ``` fence to break out of
// the surrounding markdown block. Pick a fence longer than the
// longest backtick-run actually present in the content so the close
// is guaranteed to terminate the block.
const longestBacktickRun = (content.match(/`+/g) ?? []).reduce(
(max, run) => Math.max(max, run.length),
2, // never go below the standard 3-backtick fence
);
const fence = '`'.repeat(longestBacktickRun + 1);
filesSection.push('');
filesSection.push(`### ${fileName}`);
filesSection.push(fence);
filesSection.push(content.trimEnd());
filesSection.push(fence);
}
sections.push(filesSection.join('\n'));
}
return sections.join('\n\n');
}
interface FileSnapshotDiff {
added: string[];
updated: string[];
deleted: string[];
}
function diffFileSnapshots(
before: FileSnapshot,
after: FileSnapshot,
): FileSnapshotDiff {
const added: string[] = [];
const updated: string[] = [];
const deleted: string[] = [];
for (const [relativePath, content] of after) {
if (!before.has(relativePath)) {
added.push(relativePath);
} else if (before.get(relativePath) !== content) {
updated.push(relativePath);
}
}
for (const relativePath of before.keys()) {
if (!after.has(relativePath)) {
deleted.push(relativePath);
}
}
return {
added: added.sort(),
updated: updated.sort(),
deleted: deleted.sort(),
};
}
function getChangedSnapshotPaths(diff: FileSnapshotDiff): string[] {
return [...diff.added, ...diff.updated].sort();
}
function prefixRelativePaths(
prefix: string,
relativePaths: string[],
): string[] {
return relativePaths.map((relativePath) => path.join(prefix, relativePath));
}
/**
* Main entry point for the skill extraction background task.
* Designed to be called fire-and-forget on session startup.
@@ -947,6 +1136,24 @@ export async function startMemoryService(config: Config): Promise<void> {
`[MemoryService] State loaded: ${previousRuns} previous run(s), ${previouslyProcessed} session(s) already processed`,
);
// Throttle: short-circuit if the most recent run finished less than
// MIN_EXTRACTION_INTERVAL_MS ago. Avoids re-scanning session history on
// every CLI start when the user opens several short sessions in a row.
const lastRun = state.runs.at(-1);
if (lastRun?.runAt) {
const lastRunMs = Date.parse(lastRun.runAt);
if (
Number.isFinite(lastRunMs) &&
Date.now() - lastRunMs < MIN_EXTRACTION_INTERVAL_MS
) {
const minutesAgo = Math.round((Date.now() - lastRunMs) / 60000);
debugLogger.log(
`[MemoryService] Skipped: last run was ${minutesAgo} minute(s) ago (min interval ${MIN_EXTRACTION_INTERVAL_MS / 60000}m)`,
);
return;
}
}
// Build session index: all eligible sessions with summaries + file paths.
// The agent decides which to read in full via read_file.
const { sessionIndex, newSessionIds, candidateSessions } =
@@ -988,6 +1195,8 @@ export async function startMemoryService(config: Config): Promise<void> {
`[MemoryService] ${skillsBefore.size} existing skill(s) in memory`,
);
const inboxCandidatesBefore = await snapshotInboxCandidates(memoryDir);
// Read existing skills for context (memory-extracted + global/workspace)
const existingSkillsSummary = await buildExistingSkillsSummary(
skillsDir,
@@ -999,11 +1208,23 @@ export async function startMemoryService(config: Config): Promise<void> {
);
}
// Surface the current inbox state to the agent so it can rewrite
// existing canonical patches in place instead of accumulating new ones
// across sessions.
const pendingInboxSummary = await buildPendingInboxSummary(memoryDir);
if (pendingInboxSummary) {
debugLogger.log(
`[MemoryService] Pending inbox surfaced to agent:\n${pendingInboxSummary}`,
);
}
// Build agent definition and context
const agentDefinition = SkillExtractionAgent(
skillsDir,
sessionIndex,
existingSkillsSummary,
memoryDir,
pendingInboxSummary,
);
const context = buildAgentLoopContext(config);
@@ -1109,6 +1330,18 @@ export async function startMemoryService(config: Config): Promise<void> {
);
}
// Anything still in .inbox/ is reviewable; nothing is auto-applied.
const memoryFilesUpdated: string[] = [];
const memoryCandidatesCreated = prefixRelativePaths(
'.inbox',
getChangedSnapshotPaths(
diffFileSnapshots(
inboxCandidatesBefore,
await snapshotInboxCandidates(memoryDir),
),
),
);
const processedSessions = candidateSessions
.filter((session) =>
processedSessionKeys.has(getSessionVersionKey(session)),
@@ -1127,6 +1360,8 @@ export async function startMemoryService(config: Config): Promise<void> {
lastUpdated: session.lastUpdated,
})),
processedSessions,
memoryCandidatesCreated,
memoryFilesUpdated,
skillsCreated,
turnCount: normalizeOptionalNumber(executorResult?.turn_count),
durationMs: normalizeOptionalNumber(executorResult?.duration_ms),
@@ -1139,8 +1374,17 @@ export async function startMemoryService(config: Config): Promise<void> {
};
await writeExtractionState(statePath, updatedState);
if (skillsCreated.length > 0 || patchesCreatedThisRun.length > 0) {
if (
skillsCreated.length > 0 ||
patchesCreatedThisRun.length > 0 ||
memoryCandidatesCreated.length > 0
) {
const completionParts: string[] = [];
if (memoryCandidatesCreated.length > 0) {
completionParts.push(
`prepared ${memoryCandidatesCreated.length} memory candidate(s): ${memoryCandidatesCreated.join(', ')}`,
);
}
if (skillsCreated.length > 0) {
completionParts.push(
`created ${skillsCreated.length} skill(s): ${skillsCreated.join(', ')}`,
@@ -1155,6 +1399,11 @@ export async function startMemoryService(config: Config): Promise<void> {
`[MemoryService] Completed in ${elapsed}s. ${completionParts.join('; ')} (read ${processedSessions.length}/${candidateSessions.length} surfaced session(s))`,
);
const feedbackParts: string[] = [];
if (memoryCandidatesCreated.length > 0) {
feedbackParts.push(
`${memoryCandidatesCreated.length} memory candidate${memoryCandidatesCreated.length > 1 ? 's' : ''} extracted from past sessions`,
);
}
if (skillsCreated.length > 0) {
feedbackParts.push(
`${skillsCreated.length} new skill${skillsCreated.length > 1 ? 's' : ''} extracted from past sessions: ${skillsCreated.join(', ')}`,