feat(hooks): Hook Session Lifecycle & Compression Integration (#14151)

This commit is contained in:
Edilmo Palencia
2025-12-03 09:04:13 -08:00
committed by GitHub
parent 7a6d3067c6
commit 1c12da1fad
27 changed files with 1026 additions and 302 deletions
@@ -0,0 +1,96 @@
/**
* @license
* Copyright 2025 Google LLC
* SPDX-License-Identifier: Apache-2.0
*/
import type { MessageBus } from '../confirmation-bus/message-bus.js';
import {
MessageBusType,
type HookExecutionRequest,
type HookExecutionResponse,
} from '../confirmation-bus/types.js';
import type {
SessionStartSource,
SessionEndReason,
PreCompressTrigger,
} from '../hooks/types.js';
import { debugLogger } from '../utils/debugLogger.js';
/**
* Fires the SessionStart hook.
*
* @param messageBus The message bus to use for hook communication
* @param source The source/trigger of the session start
*/
export async function fireSessionStartHook(
messageBus: MessageBus,
source: SessionStartSource,
): Promise<void> {
try {
await messageBus.request<HookExecutionRequest, HookExecutionResponse>(
{
type: MessageBusType.HOOK_EXECUTION_REQUEST,
eventName: 'SessionStart',
input: {
source,
},
},
MessageBusType.HOOK_EXECUTION_RESPONSE,
);
} catch (error) {
debugLogger.warn(`SessionStart hook failed:`, error);
}
}
/**
* Fires the SessionEnd hook.
*
* @param messageBus The message bus to use for hook communication
* @param reason The reason for the session end
*/
export async function fireSessionEndHook(
messageBus: MessageBus,
reason: SessionEndReason,
): Promise<void> {
try {
await messageBus.request<HookExecutionRequest, HookExecutionResponse>(
{
type: MessageBusType.HOOK_EXECUTION_REQUEST,
eventName: 'SessionEnd',
input: {
reason,
},
},
MessageBusType.HOOK_EXECUTION_RESPONSE,
);
} catch (error) {
debugLogger.warn(`SessionEnd hook failed:`, error);
}
}
/**
* Fires the PreCompress hook.
*
* @param messageBus The message bus to use for hook communication
* @param trigger The trigger type (manual or auto)
*/
export async function firePreCompressHook(
messageBus: MessageBus,
trigger: PreCompressTrigger,
): Promise<void> {
try {
await messageBus.request<HookExecutionRequest, HookExecutionResponse>(
{
type: MessageBusType.HOOK_EXECUTION_REQUEST,
eventName: 'PreCompress',
input: {
trigger,
},
},
MessageBusType.HOOK_EXECUTION_RESPONSE,
);
} catch (error) {
debugLogger.warn(`PreCompress hook failed:`, error);
}
}
@@ -220,6 +220,57 @@ function validateNotificationInput(input: Record<string, unknown>): {
};
}
/**
* Validates SessionStart input fields
*/
function validateSessionStartInput(input: Record<string, unknown>): {
source: SessionStartSource;
} {
const source = input['source'];
if (typeof source !== 'string') {
throw new Error(
'Invalid input for SessionStart hook event: source must be a string',
);
}
return {
source: source as SessionStartSource,
};
}
/**
* Validates SessionEnd input fields
*/
function validateSessionEndInput(input: Record<string, unknown>): {
reason: SessionEndReason;
} {
const reason = input['reason'];
if (typeof reason !== 'string') {
throw new Error(
'Invalid input for SessionEnd hook event: reason must be a string',
);
}
return {
reason: reason as SessionEndReason,
};
}
/**
* Validates PreCompress input fields
*/
function validatePreCompressInput(input: Record<string, unknown>): {
trigger: PreCompressTrigger;
} {
const trigger = input['trigger'];
if (typeof trigger !== 'string') {
throw new Error(
'Invalid input for PreCompress hook event: trigger must be a string',
);
}
return {
trigger: trigger as PreCompressTrigger,
};
}
/**
* Hook event bus that coordinates hook execution across the system
*/
@@ -704,6 +755,21 @@ export class HookEventHandler {
);
break;
}
case HookEventName.SessionStart: {
const { source } = validateSessionStartInput(enrichedInput);
result = await this.fireSessionStartEvent(source);
break;
}
case HookEventName.SessionEnd: {
const { reason } = validateSessionEndInput(enrichedInput);
result = await this.fireSessionEndEvent(reason);
break;
}
case HookEventName.PreCompress: {
const { trigger } = validatePreCompressInput(enrichedInput);
result = await this.firePreCompressEvent(trigger);
break;
}
default:
throw new Error(`Unsupported hook event: ${request.eventName}`);
}
+12 -2
View File
@@ -238,8 +238,18 @@ export class HookRunner {
debugLogger.warn(`Hook stdin error: ${err}`);
}
});
child.stdin.write(JSON.stringify(input));
child.stdin.end();
// Wrap write operations in try-catch to handle synchronous EPIPE errors
// that occur when the child process exits before we finish writing
try {
child.stdin.write(JSON.stringify(input));
child.stdin.end();
} catch (err) {
// Ignore EPIPE errors which happen when the child process closes stdin early
if (err instanceof Error && 'code' in err && err.code !== 'EPIPE') {
debugLogger.warn(`Hook stdin write error: ${err}`);
}
}
}
// Collect stdout
+7
View File
@@ -19,3 +19,10 @@ export { HookEventHandler } from './hookEventHandler.js';
export type { HookRegistryEntry, ConfigSource } from './hookRegistry.js';
export type { AggregatedHookResult } from './hookAggregator.js';
export type { HookEventContext } from './hookPlanner.js';
// Export hook trigger functions
export {
fireSessionStartHook,
fireSessionEndHook,
firePreCompressHook,
} from '../core/sessionHookTriggers.js';
-1
View File
@@ -463,7 +463,6 @@ export enum SessionStartSource {
Startup = 'startup',
Resume = 'resume',
Clear = 'clear',
Compress = 'compress',
}
/**
@@ -157,6 +157,8 @@ describe('ChatCompressionService', () => {
getContentGenerator: vi.fn().mockReturnValue({
countTokens: vi.fn().mockResolvedValue({ totalTokens: 100 }),
}),
getEnableHooks: vi.fn().mockReturnValue(false),
getMessageBus: vi.fn().mockReturnValue(undefined),
} as unknown as Config;
vi.mocked(tokenLimit).mockReturnValue(1000);
@@ -21,6 +21,8 @@ import {
DEFAULT_GEMINI_MODEL,
PREVIEW_GEMINI_MODEL,
} from '../config/models.js';
import { firePreCompressHook } from '../core/sessionHookTriggers.js';
import { PreCompressTrigger } from '../hooks/types.js';
/**
* Default threshold for compression token count as a fraction of the model's
@@ -123,6 +125,17 @@ export class ChatCompressionService {
};
}
// Fire PreCompress hook before compression (only if hooks are enabled)
// This fires for both manual and auto compression attempts
const hooksEnabled = config.getEnableHooks();
const messageBus = config.getMessageBus();
if (hooksEnabled && messageBus) {
const trigger = force
? PreCompressTrigger.Manual
: PreCompressTrigger.Auto;
await firePreCompressHook(messageBus, trigger);
}
const originalTokenCount = chat.getLastPromptTokenCount();
// Don't compress if not forced and we are under the limit.
+1
View File
@@ -16,6 +16,7 @@ export { DEFAULT_TELEMETRY_TARGET, DEFAULT_OTLP_ENDPOINT };
export {
initializeTelemetry,
shutdownTelemetry,
flushTelemetry,
isTelemetrySdkInitialized,
} from './sdk.js';
export {
+33 -5
View File
@@ -80,6 +80,8 @@ class DiagLoggerAdapter {
diag.setLogger(new DiagLoggerAdapter(), DiagLogLevel.INFO);
let sdk: NodeSDK | undefined;
let spanProcessor: BatchSpanProcessor | undefined;
let logRecordProcessor: BatchLogRecordProcessor | undefined;
let telemetryInitialized = false;
let callbackRegistered = false;
let authListener: ((newCredentials: JWTInput) => Promise<void>) | undefined =
@@ -273,10 +275,14 @@ export async function initializeTelemetry(
});
}
// Store processor references for manual flushing
spanProcessor = new BatchSpanProcessor(spanExporter);
logRecordProcessor = new BatchLogRecordProcessor(logExporter);
sdk = new NodeSDK({
resource,
spanProcessors: [new BatchSpanProcessor(spanExporter)],
logRecordProcessors: [new BatchLogRecordProcessor(logExporter)],
spanProcessors: [spanProcessor],
logRecordProcessors: [logRecordProcessor],
metricReader,
instrumentations: [new HttpInstrumentation()],
});
@@ -293,15 +299,37 @@ export async function initializeTelemetry(
console.error('Error starting OpenTelemetry SDK:', error);
}
// Note: We don't use process.on('exit') here because that callback is synchronous
// and won't wait for the async shutdownTelemetry() to complete.
// Instead, telemetry shutdown is handled in runExitCleanup() in cleanup.ts
process.on('SIGTERM', () => {
shutdownTelemetry(config);
});
process.on('SIGINT', () => {
shutdownTelemetry(config);
});
process.on('exit', () => {
shutdownTelemetry(config);
});
}
/**
* Force flush all pending telemetry data to disk.
* This is useful for ensuring telemetry is written before critical operations like /clear.
*/
export async function flushTelemetry(config: Config): Promise<void> {
if (!telemetryInitialized || !spanProcessor || !logRecordProcessor) {
return;
}
try {
// Force flush all pending telemetry to disk
await Promise.all([
spanProcessor.forceFlush(),
logRecordProcessor.forceFlush(),
]);
if (config.getDebugMode()) {
debugLogger.log('OpenTelemetry SDK flushed successfully.');
}
} catch (error) {
console.error('Error flushing SDK:', error);
}
}
export async function shutdownTelemetry(