/** * @license * Copyright 2026 Google LLC * SPDX-License-Identifier: Apache-2.0 */ import * as fs from 'node:fs/promises'; import * as path from 'node:path'; import { constants as fsConstants, type Dirent } from 'node:fs'; import { randomUUID } from 'node:crypto'; import * as Diff from 'diff'; import type { Config } from '../config/config.js'; import { SESSION_FILE_PREFIX, loadConversationRecord, type ConversationRecord, type MemoryScratchpad, } from './chatRecordingService.js'; import { debugLogger } from '../utils/debugLogger.js'; import { coreEvents } from '../utils/events.js'; import { isNodeError } from '../utils/errors.js'; import { FRONTMATTER_REGEX, parseFrontmatter } from '../skills/skillLoader.js'; import { LocalAgentExecutor } from '../agents/local-executor.js'; import { SkillExtractionAgent } from '../agents/skill-extraction-agent.js'; import { getModelConfigAlias } from '../agents/registry.js'; import { isToolActivityError, type SubagentActivityEvent, } from '../agents/types.js'; import { ExecutionLifecycleService } from './executionLifecycleService.js'; import { PromptRegistry } from '../prompts/prompt-registry.js'; import { ResourceRegistry } from '../resources/resource-registry.js'; import { PolicyEngine } from '../policy/policy-engine.js'; import { PolicyDecision } from '../policy/types.js'; import { MessageBus } from '../confirmation-bus/message-bus.js'; import { Storage } from '../config/storage.js'; import type { AgentLoopContext } from '../config/agent-loop-context.js'; import { READ_FILE_TOOL_NAME } from '../tools/tool-names.js'; import { applyParsedSkillPatches, hasParsedPatchHunks, type InboxMemoryPatchKind, listInboxPatchFiles, validateInboxMemoryPatchFile, } from './memoryPatchUtils.js'; import { sanitizeWorkflowSummaryForScratchpad } from './sessionScratchpadUtils.js'; const LOCK_FILENAME = '.extraction.lock'; const STATE_FILENAME = '.extraction-state.json'; const LOCK_STALE_MS = 35 * 60 * 1000; // 35 minutes (exceeds agent's 30-min time limit) // Throttle: skip background extraction if the most recent run finished less // than this long ago. Pairs with the advisory lock — the lock prevents // concurrent runs; this throttle prevents back-to-back runs across short // CLI sessions on workspaces with a lot of session history. const MIN_EXTRACTION_INTERVAL_MS = 30 * 60 * 1000; // 30 minutes const MIN_USER_MESSAGES = 10; const MIN_IDLE_MS = 3 * 60 * 60 * 1000; // 3 hours const MAX_SESSION_INDEX_SIZE = 50; const MAX_NEW_SESSION_BATCH_SIZE = 10; /** * Lock file content for coordinating across CLI instances. */ interface LockInfo { pid: number; startedAt: string; } interface SessionVersion { sessionId: string; lastUpdated: string; } interface IndexedSession extends SessionVersion { filePath: string; summary?: string; memoryScratchpad?: MemoryScratchpad; userMessageCount: number; } /** * Metadata for a single extraction run. */ export interface ExtractionRun { runAt: string; sessionIds: string[]; candidateSessions?: SessionVersion[]; processedSessions?: SessionVersion[]; memoryCandidatesCreated?: string[]; memoryFilesUpdated?: string[]; skillsCreated: string[]; turnCount?: number; durationMs?: number; terminateReason?: string; } /** * Tracks extraction history with per-run metadata. */ export interface ExtractionState { runs: ExtractionRun[]; } /** * Returns all session IDs that have been processed across all runs. */ export function getProcessedSessionIds(state: ExtractionState): Set { const ids = new Set(); for (const run of state.runs) { const processedSessionIds = run.processedSessions?.map((session) => session.sessionId) ?? run.sessionIds; for (const id of processedSessionIds) { ids.add(id); } } return ids; } function isLockInfo(value: unknown): value is LockInfo { return ( typeof value === 'object' && value !== null && 'pid' in value && typeof value.pid === 'number' && 'startedAt' in value && typeof value.startedAt === 'string' ); } function isSessionVersion(value: unknown): value is SessionVersion { return ( typeof value === 'object' && value !== null && 'sessionId' in value && typeof value.sessionId === 'string' && 'lastUpdated' in value && typeof value.lastUpdated === 'string' ); } function normalizeSessionVersions(value: unknown): SessionVersion[] { if (!Array.isArray(value)) { return []; } return value.filter(isSessionVersion).map((session) => ({ sessionId: session.sessionId, lastUpdated: session.lastUpdated, })); } function normalizeStringArray(value: unknown): string[] { if (!Array.isArray(value)) { return []; } return value.filter((item): item is string => typeof item === 'string'); } function normalizeOptionalNumber(value: unknown): number | undefined { return typeof value === 'number' && Number.isFinite(value) ? value : undefined; } function normalizeOptionalString(value: unknown): string | undefined { return typeof value === 'string' ? value : undefined; } function isExtractionRunLike(value: unknown): value is { runAt: string; sessionIds?: unknown; candidateSessions?: unknown; processedSessions?: unknown; memoryCandidatesCreated?: unknown; memoryFilesUpdated?: unknown; skillsCreated: unknown; turnCount?: unknown; durationMs?: unknown; terminateReason?: unknown; } { return ( typeof value === 'object' && value !== null && 'runAt' in value && typeof value.runAt === 'string' && 'skillsCreated' in value ); } function isExtractionState(value: unknown): value is { runs: unknown[] } { return ( typeof value === 'object' && value !== null && 'runs' in value && Array.isArray(value.runs) ); } function buildExtractionRun(value: unknown): ExtractionRun | null { if (!isExtractionRunLike(value)) { return null; } const candidateSessions = normalizeSessionVersions(value.candidateSessions); const processedSessions = normalizeSessionVersions(value.processedSessions); const sessionIds = normalizeStringArray(value.sessionIds); const run: ExtractionRun = { runAt: value.runAt, sessionIds: sessionIds.length > 0 ? sessionIds : processedSessions.map((session) => session.sessionId), skillsCreated: normalizeStringArray(value.skillsCreated), }; if (candidateSessions.length > 0) { run.candidateSessions = candidateSessions; } if (processedSessions.length > 0) { run.processedSessions = processedSessions; } if ('memoryCandidatesCreated' in value) { run.memoryCandidatesCreated = normalizeStringArray( value.memoryCandidatesCreated, ); } if ('memoryFilesUpdated' in value) { run.memoryFilesUpdated = normalizeStringArray(value.memoryFilesUpdated); } const turnCount = normalizeOptionalNumber(value.turnCount); if (turnCount !== undefined) { run.turnCount = turnCount; } const durationMs = normalizeOptionalNumber(value.durationMs); if (durationMs !== undefined) { run.durationMs = durationMs; } const terminateReason = normalizeOptionalString(value.terminateReason); if (terminateReason !== undefined) { run.terminateReason = terminateReason; } return run; } function getTimestampMs(timestamp: string): number { const parsed = Date.parse(timestamp); return Number.isNaN(parsed) ? 0 : parsed; } function getSessionVersionKey(session: SessionVersion): string { return `${session.sessionId}\u0000${session.lastUpdated}`; } function hasLegacyRunProcessedSession( run: ExtractionRun, session: SessionVersion, ): boolean { return ( run.sessionIds.includes(session.sessionId) && getTimestampMs(run.runAt) >= getTimestampMs(session.lastUpdated) ); } function isSessionVersionProcessed( state: ExtractionState, session: SessionVersion, ): boolean { const sessionKey = getSessionVersionKey(session); for (const run of state.runs) { if ( run.processedSessions?.some( (processed) => getSessionVersionKey(processed) === sessionKey, ) ) { return true; } if (!run.processedSessions && hasLegacyRunProcessedSession(run, session)) { return true; } } return false; } function getSessionAttemptCount( state: ExtractionState, session: SessionVersion, ): number { const sessionKey = getSessionVersionKey(session); let attempts = 0; for (const run of state.runs) { if (run.candidateSessions) { if ( run.candidateSessions.some( (candidate) => getSessionVersionKey(candidate) === sessionKey, ) ) { attempts++; } continue; } if (hasLegacyRunProcessedSession(run, session)) { attempts++; } } return attempts; } function compareIndexedSessions(a: IndexedSession, b: IndexedSession): number { const timestampDelta = getTimestampMs(b.lastUpdated) - getTimestampMs(a.lastUpdated); if (timestampDelta !== 0) { return timestampDelta; } if (a.filePath.endsWith('.jsonl') !== b.filePath.endsWith('.jsonl')) { return a.filePath.endsWith('.jsonl') ? -1 : 1; } return b.filePath.localeCompare(a.filePath); } function shouldReplaceIndexedSession( existing: IndexedSession, candidate: IndexedSession, ): boolean { return compareIndexedSessions(candidate, existing) < 0; } function isReadFileActivity( activity: SubagentActivityEvent, ): activity is SubagentActivityEvent & { data: { name: string; args?: { file_path?: unknown }; callId?: unknown }; } { return ( activity.type === 'TOOL_CALL_START' && activity.data['name'] === READ_FILE_TOOL_NAME ); } function getReadFileCallId(activity: SubagentActivityEvent): string | null { if (isReadFileActivity(activity)) { const { callId } = activity.data; return typeof callId === 'string' ? callId : null; } if ( activity.type === 'TOOL_CALL_END' && activity.data['name'] === READ_FILE_TOOL_NAME ) { const id = activity.data['id']; return typeof id === 'string' ? id : null; } if ( activity.type === 'ERROR' && activity.data['name'] === READ_FILE_TOOL_NAME ) { const callId = activity.data['callId']; return typeof callId === 'string' ? callId : null; } return null; } function getResolvedActivityFilePath( config: Config, activity: SubagentActivityEvent, ): string | null { if (!isReadFileActivity(activity)) { return null; } const args = activity.data.args; if ( typeof args !== 'object' || args === null || !('file_path' in args) || typeof args.file_path !== 'string' ) { return null; } const targetDir = 'getTargetDir' in config && typeof config.getTargetDir === 'function' ? config.getTargetDir() : process.cwd(); return path.resolve(targetDir, args.file_path); } function getUserMessageCount( conversation: ConversationRecord & { userMessageCount?: number }, ): number { return ( conversation.userMessageCount ?? conversation.messages.filter((message) => message.type === 'user').length ); } function isSupportedSessionFile(fileName: string): boolean { return ( fileName.startsWith(SESSION_FILE_PREFIX) && (fileName.endsWith('.json') || fileName.endsWith('.jsonl')) ); } /** * Attempts to acquire an exclusive lock file using O_CREAT | O_EXCL. * Returns true if the lock was acquired, false if another instance owns it. */ export async function tryAcquireLock( lockPath: string, retries = 1, ): Promise { const lockInfo: LockInfo = { pid: process.pid, startedAt: new Date().toISOString(), }; try { // Atomic create-if-not-exists const fd = await fs.open( lockPath, fsConstants.O_CREAT | fsConstants.O_EXCL | fsConstants.O_WRONLY, ); try { await fd.writeFile(JSON.stringify(lockInfo)); } finally { await fd.close(); } return true; } catch (error: unknown) { if (isNodeError(error) && error.code === 'EEXIST') { // Lock exists — check if it's stale if (retries > 0 && (await isLockStale(lockPath))) { debugLogger.debug('[MemoryService] Cleaning up stale lock file'); await releaseLock(lockPath); return tryAcquireLock(lockPath, retries - 1); } debugLogger.debug( '[MemoryService] Lock held by another instance, skipping', ); return false; } throw error; } } /** * Checks if a lock file is stale (owner PID is dead or lock is too old). */ export async function isLockStale(lockPath: string): Promise { try { const content = await fs.readFile(lockPath, 'utf-8'); const parsed: unknown = JSON.parse(content); if (!isLockInfo(parsed)) { return true; // Invalid lock data — treat as stale } const lockInfo = parsed; // Check if PID is still alive try { process.kill(lockInfo.pid, 0); } catch { // PID is dead — lock is stale return true; } // Check if lock is too old const lockAge = Date.now() - new Date(lockInfo.startedAt).getTime(); if (lockAge > LOCK_STALE_MS) { return true; } return false; } catch { // Can't read lock — treat as stale return true; } } /** * Releases the lock file. */ export async function releaseLock(lockPath: string): Promise { try { await fs.unlink(lockPath); } catch (error: unknown) { if (isNodeError(error) && error.code === 'ENOENT') { return; // Already removed } debugLogger.warn( `[MemoryService] Failed to release lock: ${error instanceof Error ? error.message : String(error)}`, ); } } /** * Reads the extraction state file, or returns a default state. */ export async function readExtractionState( statePath: string, ): Promise { try { const content = await fs.readFile(statePath, 'utf-8'); const parsed: unknown = JSON.parse(content); if (!isExtractionState(parsed)) { return { runs: [] }; } const runs: ExtractionRun[] = []; for (const run of parsed.runs) { const normalizedRun = buildExtractionRun(run); if (!normalizedRun) continue; runs.push(normalizedRun); } return { runs }; } catch (error) { debugLogger.debug( '[MemoryService] Failed to read extraction state:', error, ); return { runs: [] }; } } /** * Writes the extraction state atomically (temp file + rename). */ export async function writeExtractionState( statePath: string, state: ExtractionState, ): Promise { const tmpPath = `${statePath}.tmp`; await fs.writeFile(tmpPath, JSON.stringify(state, null, 2)); await fs.rename(tmpPath, statePath); } /** * Determines if a conversation record should be considered for processing. * Filters out subagent sessions, sessions that haven't been idle long enough, * and sessions with too few user messages. */ function shouldProcessConversation( parsed: ConversationRecord & { userMessageCount?: number }, ): boolean { // Skip subagent sessions if (parsed.kind === 'subagent') return false; // Skip sessions that are still active (not idle for 3+ hours) const lastUpdated = getTimestampMs(parsed.lastUpdated); if (Date.now() - lastUpdated < MIN_IDLE_MS) return false; // Skip sessions with too few user messages if (getUserMessageCount(parsed) < MIN_USER_MESSAGES) return false; return true; } /** * Scans the chats directory for eligible session files, loading metadata from * both JSONL and legacy JSON sessions, deduplicating migrated sessions by * session ID, and sorting by actual lastUpdated. We scan the full directory * here so already-processed recent sessions cannot permanently block older * backlog sessions from surfacing as new candidates. */ async function scanEligibleSessions( chatsDir: string, ): Promise { let allFiles: string[]; try { allFiles = await fs.readdir(chatsDir); } catch { return []; } const candidates: Array<{ filePath: string; mtimeMs: number }> = []; for (const file of allFiles) { if (!isSupportedSessionFile(file)) continue; const filePath = path.join(chatsDir, file); try { const stat = await fs.stat(filePath); if (!stat.isFile()) continue; candidates.push({ filePath, mtimeMs: stat.mtimeMs }); } catch { // Skip files that disappeared between readdir and stat. } } candidates.sort((a, b) => b.mtimeMs - a.mtimeMs); const latestBySessionId = new Map(); for (const { filePath } of candidates) { try { const conversation = await loadConversationRecord(filePath, { metadataOnly: true, }); if (!conversation || !shouldProcessConversation(conversation)) continue; const indexedSession: IndexedSession = { sessionId: conversation.sessionId, lastUpdated: conversation.lastUpdated, filePath, summary: conversation.summary, memoryScratchpad: conversation.memoryScratchpadIsStale === true ? undefined : conversation.memoryScratchpad, userMessageCount: getUserMessageCount(conversation), }; const existing = latestBySessionId.get(indexedSession.sessionId); if (!existing || shouldReplaceIndexedSession(existing, indexedSession)) { latestBySessionId.set(indexedSession.sessionId, indexedSession); } } catch { // Skip unreadable files } } return Array.from(latestBySessionId.values()).sort(compareIndexedSessions); } function formatSessionHeadline(session: IndexedSession): string { const rawWorkflowSummary = session.memoryScratchpad?.workflowSummary; const sanitizedWorkflowSummary = typeof rawWorkflowSummary === 'string' ? sanitizeWorkflowSummaryForScratchpad(rawWorkflowSummary) : undefined; const workflowSummary = sanitizedWorkflowSummary?.trim() ? sanitizedWorkflowSummary : undefined; const summary = session.summary ?? workflowSummary ?? '(no summary)'; if ( session.summary && workflowSummary && workflowSummary !== session.summary ) { return `${summary} | workflow: ${workflowSummary}`; } return summary; } /** * Builds a session index for the extraction agent: a compact listing of all * eligible sessions with their summary, file path, and new/previously-processed status. * The agent can use read_file on paths to inspect sessions that look promising. * * Returns the index text, the list of selected new (unprocessed) session IDs, * and the surfaced candidate sessions for this run. */ export async function buildSessionIndex( chatsDir: string, state: ExtractionState, ): Promise<{ sessionIndex: string; newSessionIds: string[]; candidateSessions: IndexedSession[]; }> { const eligible = await scanEligibleSessions(chatsDir); if (eligible.length === 0) { return { sessionIndex: '', newSessionIds: [], candidateSessions: [] }; } const newSessions: IndexedSession[] = []; const oldSessions: IndexedSession[] = []; for (const session of eligible) { if (isSessionVersionProcessed(state, session)) { oldSessions.push(session); } else { newSessions.push(session); } } newSessions.sort((a, b) => { const attemptDelta = getSessionAttemptCount(state, a) - getSessionAttemptCount(state, b); if (attemptDelta !== 0) { return attemptDelta; } return compareIndexedSessions(a, b); }); const candidateSessions = newSessions.slice(0, MAX_NEW_SESSION_BATCH_SIZE); const remainingSlots = Math.max( 0, MAX_SESSION_INDEX_SIZE - candidateSessions.length, ); const displayedOldSessions = oldSessions.slice(0, remainingSlots); const candidateSessionIds = new Set( candidateSessions.map((session) => getSessionVersionKey(session)), ); const lines = [...candidateSessions, ...displayedOldSessions].map( (session) => { const status = candidateSessionIds.has(getSessionVersionKey(session)) ? '[NEW]' : '[old]'; return `${status} ${formatSessionHeadline(session)} (${session.userMessageCount} user msgs) — ${session.filePath}`; }, ); return { sessionIndex: lines.join('\n'), newSessionIds: candidateSessions.map((session) => session.sessionId), candidateSessions, }; } /** * Builds a summary of all existing skills — both memory-extracted skills * in the skillsDir and globally/workspace-discovered skills from the SkillManager. * This prevents the extraction agent from duplicating already-available skills. */ async function buildExistingSkillsSummary( skillsDir: string, config: Config, ): Promise { const sections: string[] = []; // 1. Memory-extracted skills (from previous runs) const memorySkills: string[] = []; try { const entries = await fs.readdir(skillsDir, { withFileTypes: true }); for (const entry of entries) { if (!entry.isDirectory()) continue; const skillPath = path.join(skillsDir, entry.name, 'SKILL.md'); try { const content = await fs.readFile(skillPath, 'utf-8'); const match = content.match(FRONTMATTER_REGEX); if (match) { const parsed = parseFrontmatter(match[1]); const name = parsed?.name ?? entry.name; const desc = parsed?.description ?? ''; memorySkills.push(`- **${name}**: ${desc}`); } else { memorySkills.push(`- **${entry.name}**`); } } catch { // Skill directory without SKILL.md, skip } } } catch { // Skills directory doesn't exist yet } if (memorySkills.length > 0) { sections.push( `## Previously Extracted Skills (in ${skillsDir})\n${memorySkills.join('\n')}`, ); } // 2. Discovered skills — categorize by source location try { const discoveredSkills = config.getSkillManager().getSkills(); if (discoveredSkills.length > 0) { const userSkillsDir = Storage.getUserSkillsDir(); const globalSkills: string[] = []; const workspaceSkills: string[] = []; const extensionSkills: string[] = []; const builtinSkills: string[] = []; for (const s of discoveredSkills) { const loc = s.location; if (loc.includes('/bundle/') || loc.includes('\\bundle\\')) { builtinSkills.push(`- **${s.name}**: ${s.description}`); } else if (loc.startsWith(userSkillsDir)) { globalSkills.push(`- **${s.name}**: ${s.description} (${loc})`); } else if ( loc.includes('/extensions/') || loc.includes('\\extensions\\') ) { extensionSkills.push(`- **${s.name}**: ${s.description}`); } else { workspaceSkills.push(`- **${s.name}**: ${s.description} (${loc})`); } } if (globalSkills.length > 0) { sections.push( `## Global Skills (~/.gemini/skills — do NOT duplicate)\n${globalSkills.join('\n')}`, ); } if (workspaceSkills.length > 0) { sections.push( `## Workspace Skills (.gemini/skills — do NOT duplicate)\n${workspaceSkills.join('\n')}`, ); } if (extensionSkills.length > 0) { sections.push( `## Extension Skills (from installed extensions — do NOT duplicate)\n${extensionSkills.join('\n')}`, ); } if (builtinSkills.length > 0) { sections.push( `## Builtin Skills (bundled with CLI — do NOT duplicate)\n${builtinSkills.join('\n')}`, ); } } } catch { // SkillManager not available } return sections.join('\n\n'); } /** * Builds an AgentLoopContext from a Config for background agent execution. */ function buildAgentLoopContext(config: Config): AgentLoopContext { // Create a PolicyEngine that auto-approves all tool calls so the // background sub-agent never prompts the user for confirmation. const autoApprovePolicy = new PolicyEngine({ rules: [ { toolName: '*', decision: PolicyDecision.ALLOW, priority: 100, }, ], }); const autoApproveBus = new MessageBus(autoApprovePolicy); return { config, promptId: `skill-extraction-${randomUUID().slice(0, 8)}`, toolRegistry: config.getToolRegistry(), promptRegistry: new PromptRegistry(), resourceRegistry: new ResourceRegistry(), messageBus: autoApproveBus, geminiClient: config.getGeminiClient(), sandboxManager: config.sandboxManager, }; } /** * Validates all .patch files in the skills directory using the `diff` library. * Parses each patch, reads the target file(s), and attempts a dry-run apply. * Removes patches that fail validation. Returns the filenames of valid patches. */ export async function validatePatches( skillsDir: string, config: Config, ): Promise { let entries: string[]; try { entries = await fs.readdir(skillsDir); } catch { return []; } const patchFiles = entries.filter((e) => e.endsWith('.patch')); const validPatches: string[] = []; for (const patchFile of patchFiles) { const patchPath = path.join(skillsDir, patchFile); let valid = true; let reason = ''; try { const patchContent = await fs.readFile(patchPath, 'utf-8'); const parsedPatches = Diff.parsePatch(patchContent); if (!hasParsedPatchHunks(parsedPatches)) { valid = false; reason = 'no hunks found in patch'; } else { const applied = await applyParsedSkillPatches(parsedPatches, config); if (!applied.success) { valid = false; switch (applied.reason) { case 'missingTargetPath': reason = 'missing target file path in patch header'; break; case 'invalidPatchHeaders': reason = 'invalid diff headers'; break; case 'outsideAllowedRoots': reason = `target file is outside skill roots: ${applied.targetPath}`; break; case 'newFileAlreadyExists': reason = `new file target already exists: ${applied.targetPath}`; break; case 'targetNotFound': reason = `target file not found: ${applied.targetPath}`; break; case 'doesNotApply': reason = `patch does not apply cleanly to ${applied.targetPath}`; break; default: reason = 'unknown patch validation failure'; break; } } } } catch (err) { valid = false; reason = `failed to read or parse patch: ${err}`; } if (valid) { validPatches.push(patchFile); debugLogger.log(`[MemoryService] Patch validated: ${patchFile}`); } else { debugLogger.warn( `[MemoryService] Removing invalid patch ${patchFile}: ${reason}`, ); try { await fs.unlink(patchPath); } catch { // Best-effort cleanup } } } return validPatches; } type FileSnapshot = Map; async function snapshotFiles( rootDir: string, shouldIncludeFile: (relativePath: string) => boolean = () => true, shouldDescendDirectory: (relativePath: string) => boolean = () => true, ): Promise { const snapshot: FileSnapshot = new Map(); async function walk(currentDir: string): Promise { let entries: Array>; try { entries = await fs.readdir(currentDir, { withFileTypes: true }); } catch { return; } for (const entry of entries) { const absolutePath = path.join(currentDir, entry.name); const relativePath = path.relative(rootDir, absolutePath); if (!relativePath) { continue; } if (entry.isDirectory()) { if (shouldDescendDirectory(relativePath)) { await walk(absolutePath); } continue; } if (!entry.isFile() || !shouldIncludeFile(relativePath)) { continue; } try { snapshot.set(relativePath, await fs.readFile(absolutePath, 'utf-8')); } catch { // Best-effort snapshot: ignore files that disappear or are unreadable. } } } await walk(rootDir); return snapshot; } async function snapshotInboxCandidates( memoryDir: string, ): Promise { return snapshotFiles(path.join(memoryDir, '.inbox')); } const MEMORY_INBOX_PATCH_KINDS: readonly InboxMemoryPatchKind[] = [ 'private', 'global', ]; async function validateMemoryInboxPatches(config: Config): Promise { for (const kind of MEMORY_INBOX_PATCH_KINDS) { const patchFiles = await listInboxPatchFiles(config, kind); for (const patchFile of patchFiles) { const validation = await validateInboxMemoryPatchFile( config, kind, patchFile, ); if (validation.valid) { continue; } try { await fs.unlink(patchFile); debugLogger.warn( `[MemoryService] Dropped invalid ${kind} memory inbox patch ${patchFile}: ${validation.reason}`, ); } catch (error) { const message = error instanceof Error ? error.message : String(error); debugLogger.warn( `[MemoryService] Failed to drop invalid ${kind} memory inbox patch ${patchFile}: ${validation.reason}; unlink failed: ${message}`, ); } } } } /** * Builds a human-readable summary of the current memory inbox state, grouped * by kind and showing the contents of each `.patch` file. Used as part of the * extraction agent's initial context so the agent can extend existing * canonical patches in-place rather than creating new files each session. * * Returns an empty string if the inbox is empty. */ async function buildPendingInboxSummary(memoryDir: string): Promise { const sections: string[] = []; for (const kind of ['private', 'global'] as const) { const kindRoot = path.join(memoryDir, '.inbox', kind); let entries: Array>; try { entries = await fs.readdir(kindRoot, { withFileTypes: true }); } catch { continue; } const patchFiles = entries .filter((e) => e.isFile() && e.name.endsWith('.patch')) .map((e) => e.name) .sort(); if (patchFiles.length === 0) { continue; } const filesSection: string[] = [`## ${kind} (${patchFiles.length})`]; for (const fileName of patchFiles) { const fullPath = path.join(kindRoot, fileName); let content = ''; try { content = await fs.readFile(fullPath, 'utf-8'); } catch { continue; } // Guard against indirect prompt injection: patch contents originate // from past sessions (which may include user-pasted text), so a // crafted payload could include a closing ``` fence to break out of // the surrounding markdown block. Pick a fence longer than the // longest backtick-run actually present in the content so the close // is guaranteed to terminate the block. const longestBacktickRun = (content.match(/`+/g) ?? []).reduce( (max, run) => Math.max(max, run.length), 2, // never go below the standard 3-backtick fence ); const fence = '`'.repeat(longestBacktickRun + 1); filesSection.push(''); filesSection.push(`### ${fileName}`); filesSection.push(fence); filesSection.push(content.trimEnd()); filesSection.push(fence); } sections.push(filesSection.join('\n')); } return sections.join('\n\n'); } interface FileSnapshotDiff { added: string[]; updated: string[]; deleted: string[]; } function diffFileSnapshots( before: FileSnapshot, after: FileSnapshot, ): FileSnapshotDiff { const added: string[] = []; const updated: string[] = []; const deleted: string[] = []; for (const [relativePath, content] of after) { if (!before.has(relativePath)) { added.push(relativePath); } else if (before.get(relativePath) !== content) { updated.push(relativePath); } } for (const relativePath of before.keys()) { if (!after.has(relativePath)) { deleted.push(relativePath); } } return { added: added.sort(), updated: updated.sort(), deleted: deleted.sort(), }; } function getChangedSnapshotPaths(diff: FileSnapshotDiff): string[] { return [...diff.added, ...diff.updated].sort(); } function prefixRelativePaths( prefix: string, relativePaths: string[], ): string[] { return relativePaths.map((relativePath) => path.join(prefix, relativePath)); } /** * Main entry point for the skill extraction background task. * Designed to be called fire-and-forget on session startup. * * Coordinates across multiple CLI instances via a lock file, * scans past sessions for reusable patterns, and runs a sub-agent * to extract and write SKILL.md files. */ export async function startMemoryService(config: Config): Promise { const memoryDir = config.storage.getProjectMemoryTempDir(); const skillsDir = config.storage.getProjectSkillsMemoryDir(); const lockPath = path.join(memoryDir, LOCK_FILENAME); const statePath = path.join(memoryDir, STATE_FILENAME); const chatsDir = path.join(config.storage.getProjectTempDir(), 'chats'); // Ensure directories exist await fs.mkdir(skillsDir, { recursive: true }); debugLogger.log(`[MemoryService] Starting. Skills dir: ${skillsDir}`); // Try to acquire exclusive lock if (!(await tryAcquireLock(lockPath))) { debugLogger.log('[MemoryService] Skipped: lock held by another instance'); return; } debugLogger.log('[MemoryService] Lock acquired'); // Register with ExecutionLifecycleService for background tracking const abortController = new AbortController(); const handle = ExecutionLifecycleService.createExecution( '', // no initial output () => abortController.abort(), // onKill 'none', undefined, // no format injection 'Skill extraction', 'silent', ); const executionId = handle.pid; const startTime = Date.now(); let completionResult: { error: Error } | undefined; try { // Read extraction state const state = await readExtractionState(statePath); const previousRuns = state.runs.length; const previouslyProcessed = getProcessedSessionIds(state).size; debugLogger.log( `[MemoryService] State loaded: ${previousRuns} previous run(s), ${previouslyProcessed} session(s) already processed`, ); // Throttle: short-circuit if the most recent run finished less than // MIN_EXTRACTION_INTERVAL_MS ago. Avoids re-scanning session history on // every CLI start when the user opens several short sessions in a row. const lastRun = state.runs.at(-1); if (lastRun?.runAt) { const lastRunMs = Date.parse(lastRun.runAt); if ( Number.isFinite(lastRunMs) && Date.now() - lastRunMs < MIN_EXTRACTION_INTERVAL_MS ) { const minutesAgo = Math.round((Date.now() - lastRunMs) / 60000); debugLogger.log( `[MemoryService] Skipped: last run was ${minutesAgo} minute(s) ago (min interval ${MIN_EXTRACTION_INTERVAL_MS / 60000}m)`, ); return; } } // Build session index: all eligible sessions with summaries + file paths. // The agent decides which to read in full via read_file. const { sessionIndex, newSessionIds, candidateSessions } = await buildSessionIndex(chatsDir, state); const totalInIndex = sessionIndex ? sessionIndex.split('\n').length : 0; debugLogger.log( `[MemoryService] Session scan: ${totalInIndex} indexed session(s), ${candidateSessions.length} surfaced as new candidates`, ); if (newSessionIds.length === 0) { debugLogger.log('[MemoryService] Skipped: no new sessions to process'); return; } // Snapshot existing skill directories before extraction const skillsBefore = new Set(); const patchContentsBefore = new Map(); try { const entries = await fs.readdir(skillsDir); for (const e of entries) { if (e.endsWith('.patch')) { try { patchContentsBefore.set( e, await fs.readFile(path.join(skillsDir, e), 'utf-8'), ); } catch { // Ignore unreadable existing patches. } continue; } skillsBefore.add(e); } } catch { // Empty skills dir } debugLogger.log( `[MemoryService] ${skillsBefore.size} existing skill(s) in memory`, ); const inboxCandidatesBefore = await snapshotInboxCandidates(memoryDir); // Read existing skills for context (memory-extracted + global/workspace) const existingSkillsSummary = await buildExistingSkillsSummary( skillsDir, config, ); if (existingSkillsSummary) { debugLogger.log( `[MemoryService] Existing skills context:\n${existingSkillsSummary}`, ); } // Surface the current inbox state to the agent so it can rewrite // existing canonical patches in place instead of accumulating new ones // across sessions. const pendingInboxSummary = await buildPendingInboxSummary(memoryDir); if (pendingInboxSummary) { debugLogger.log( `[MemoryService] Pending inbox surfaced to agent:\n${pendingInboxSummary}`, ); } // Build agent definition and context const agentDefinition = SkillExtractionAgent( skillsDir, sessionIndex, existingSkillsSummary, memoryDir, pendingInboxSummary, ); const context = buildAgentLoopContext(config); // Register the agent's model config since it's not going through AgentRegistry. const modelAlias = getModelConfigAlias(agentDefinition); config.modelConfigService.registerRuntimeModelConfig(modelAlias, { modelConfig: agentDefinition.modelConfig, }); debugLogger.log( `[MemoryService] Starting extraction agent (model: ${agentDefinition.modelConfig.model}, maxTurns: 30, maxTime: 30min)`, ); const candidateSessionsByPath = new Map( candidateSessions.map((session) => [ path.resolve(session.filePath), session, ]), ); const pendingReadFileSessions = new Map(); const processedSessionKeys = new Set(); // Create and run the extraction agent const executor = await LocalAgentExecutor.create( agentDefinition, context, (activity) => { const readFileCallId = getReadFileCallId(activity); if (activity.type === 'TOOL_CALL_START') { const resolvedPath = getResolvedActivityFilePath(config, activity); if (!resolvedPath || !readFileCallId) { return; } const session = candidateSessionsByPath.get(resolvedPath); if (!session) { return; } pendingReadFileSessions.set(readFileCallId, session); return; } if (!readFileCallId) { return; } const session = pendingReadFileSessions.get(readFileCallId); if (!session) { return; } pendingReadFileSessions.delete(readFileCallId); if ( activity.type === 'TOOL_CALL_END' && !isToolActivityError(activity.data['data']) ) { processedSessionKeys.add(getSessionVersionKey(session)); } }, ); const executorResult = await executor.run( { request: 'Extract skills from the provided sessions.' }, abortController.signal, ); const elapsed = ((Date.now() - startTime) / 1000).toFixed(1); // Diff skills directory to find newly created skills const skillsCreated: string[] = []; try { const entriesAfter = await fs.readdir(skillsDir); for (const e of entriesAfter) { if (!skillsBefore.has(e) && !e.endsWith('.patch')) { skillsCreated.push(e); } } } catch { // Skills dir read failed } // Validate any .patch files the agent generated const validPatches = await validatePatches(skillsDir, config); const patchesCreatedThisRun: string[] = []; for (const patchFile of validPatches) { const patchPath = path.join(skillsDir, patchFile); let currentContent: string; try { currentContent = await fs.readFile(patchPath, 'utf-8'); } catch { continue; } if (patchContentsBefore.get(patchFile) !== currentContent) { patchesCreatedThisRun.push(patchFile); } } if (validPatches.length > 0) { debugLogger.log( `[MemoryService] ${validPatches.length} valid patch(es) currently in inbox; ${patchesCreatedThisRun.length} created or updated this run`, ); } await validateMemoryInboxPatches(config); // Anything still in .inbox/ is reviewable; nothing is auto-applied. const memoryFilesUpdated: string[] = []; const memoryCandidatesCreated = prefixRelativePaths( '.inbox', getChangedSnapshotPaths( diffFileSnapshots( inboxCandidatesBefore, await snapshotInboxCandidates(memoryDir), ), ), ); const processedSessions = candidateSessions .filter((session) => processedSessionKeys.has(getSessionVersionKey(session)), ) .map((session) => ({ sessionId: session.sessionId, lastUpdated: session.lastUpdated, })); // Record the run with full metadata const run: ExtractionRun = { runAt: new Date().toISOString(), sessionIds: processedSessions.map((session) => session.sessionId), candidateSessions: candidateSessions.map((session) => ({ sessionId: session.sessionId, lastUpdated: session.lastUpdated, })), processedSessions, memoryCandidatesCreated, memoryFilesUpdated, skillsCreated, turnCount: normalizeOptionalNumber(executorResult?.turn_count), durationMs: normalizeOptionalNumber(executorResult?.duration_ms), terminateReason: normalizeOptionalString( executorResult?.terminate_reason, ), }; const updatedState: ExtractionState = { runs: [...state.runs, run], }; await writeExtractionState(statePath, updatedState); if ( skillsCreated.length > 0 || patchesCreatedThisRun.length > 0 || memoryCandidatesCreated.length > 0 ) { const completionParts: string[] = []; if (memoryCandidatesCreated.length > 0) { completionParts.push( `prepared ${memoryCandidatesCreated.length} memory candidate(s): ${memoryCandidatesCreated.join(', ')}`, ); } if (skillsCreated.length > 0) { completionParts.push( `created ${skillsCreated.length} skill(s): ${skillsCreated.join(', ')}`, ); } if (patchesCreatedThisRun.length > 0) { completionParts.push( `prepared ${patchesCreatedThisRun.length} patch(es): ${patchesCreatedThisRun.join(', ')}`, ); } debugLogger.log( `[MemoryService] Completed in ${elapsed}s. ${completionParts.join('; ')} (read ${processedSessions.length}/${candidateSessions.length} surfaced session(s))`, ); const feedbackParts: string[] = []; if (memoryCandidatesCreated.length > 0) { feedbackParts.push( `${memoryCandidatesCreated.length} memory candidate${memoryCandidatesCreated.length > 1 ? 's' : ''} extracted from past sessions`, ); } if (skillsCreated.length > 0) { feedbackParts.push( `${skillsCreated.length} new skill${skillsCreated.length > 1 ? 's' : ''} extracted from past sessions: ${skillsCreated.join(', ')}`, ); } if (patchesCreatedThisRun.length > 0) { feedbackParts.push( `${patchesCreatedThisRun.length} skill update${patchesCreatedThisRun.length > 1 ? 's' : ''} extracted from past sessions`, ); } coreEvents.emitFeedback( 'info', `${feedbackParts.join('. ')}. Use /memory inbox to review.`, ); } else { debugLogger.log( `[MemoryService] Completed in ${elapsed}s. No new skills or patches created (read ${processedSessions.length}/${candidateSessions.length} surfaced session(s))`, ); } } catch (error) { const elapsed = ((Date.now() - startTime) / 1000).toFixed(1); if (abortController.signal.aborted) { debugLogger.log(`[MemoryService] Cancelled after ${elapsed}s`); } else { debugLogger.log( `[MemoryService] Failed after ${elapsed}s: ${error instanceof Error ? error.message : String(error)}`, ); } completionResult = { error: error instanceof Error ? error : new Error(String(error)), }; return; } finally { await releaseLock(lockPath); debugLogger.log('[MemoryService] Lock released'); if (executionId !== undefined) { ExecutionLifecycleService.completeExecution( executionId, completionResult, ); } } }