[Part 5/6] feat(telemetry): add activity monitor with event-driven snapshots (#8124)

Co-authored-by: Jacob Richman <jacob314@gmail.com>
This commit is contained in:
Adrian Arribas
2025-10-20 19:29:53 +02:00
committed by GitHub
parent 518a9ca314
commit 71ecc401c3
4 changed files with 632 additions and 1 deletions

View File

@@ -20,7 +20,7 @@ describe('Interactive file system', () => {
it('should perform a read-then-write sequence', async () => {
const fileName = 'version.txt';
rig.setup('interactive-read-then-write');
await rig.setup('interactive-read-then-write');
rig.createFile(fileName, '1.0.0');
const run = await rig.runInteractive();
@@ -44,5 +44,8 @@ describe('Interactive file system', () => {
30000,
(args) => args.includes('1.0.1') && args.includes(fileName),
);
// Wait for telemetry to flush and file system to sync, especially in sandboxed environments
await rig.waitForTelemetryReady();
});
});

View File

@@ -0,0 +1,329 @@
/**
* @license
* Copyright 2025 Google LLC
* SPDX-License-Identifier: Apache-2.0
*/
import { describe, it, expect, beforeEach, afterEach, vi } from 'vitest';
import {
ActivityMonitor,
DEFAULT_ACTIVITY_CONFIG,
initializeActivityMonitor,
getActivityMonitor,
recordGlobalActivity,
startGlobalActivityMonitoring,
stopGlobalActivityMonitoring,
} from './activity-monitor.js';
import { ActivityType } from './activity-types.js';
import type { ActivityEvent } from './activity-monitor.js';
import type { Config } from '../config/config.js';
// Mock the dependencies
vi.mock('./metrics.js', () => ({
isPerformanceMonitoringActive: vi.fn(() => true),
}));
vi.mock('./memory-monitor.js', () => ({
getMemoryMonitor: vi.fn(() => ({
takeSnapshot: vi.fn(() => ({
timestamp: Date.now(),
heapUsed: 1000000,
heapTotal: 2000000,
external: 500000,
rss: 3000000,
arrayBuffers: 100000,
heapSizeLimit: 4000000,
})),
})),
}));
describe('ActivityMonitor', () => {
let activityMonitor: ActivityMonitor;
let mockConfig: Config;
beforeEach(() => {
vi.clearAllMocks();
mockConfig = {
getSessionId: () => 'test-session-123',
} as Config;
activityMonitor = new ActivityMonitor();
});
afterEach(() => {
activityMonitor.stop();
});
describe('constructor', () => {
it('should initialize with default config', () => {
const monitor = new ActivityMonitor();
expect(monitor).toBeDefined();
expect(monitor.isMonitoringActive()).toBe(false);
});
it('should initialize with custom config', () => {
const customConfig = {
...DEFAULT_ACTIVITY_CONFIG,
snapshotThrottleMs: 2000,
};
const monitor = new ActivityMonitor(customConfig);
expect(monitor).toBeDefined();
});
});
describe('start and stop', () => {
it('should start and stop monitoring', () => {
expect(activityMonitor.isMonitoringActive()).toBe(false);
activityMonitor.start(mockConfig);
expect(activityMonitor.isMonitoringActive()).toBe(true);
activityMonitor.stop();
expect(activityMonitor.isMonitoringActive()).toBe(false);
});
it('should not start monitoring when already active', () => {
activityMonitor.start(mockConfig);
expect(activityMonitor.isMonitoringActive()).toBe(true);
// Should not affect already active monitor
activityMonitor.start(mockConfig);
expect(activityMonitor.isMonitoringActive()).toBe(true);
});
});
describe('recordActivity', () => {
beforeEach(() => {
activityMonitor.start(mockConfig);
});
it('should record activity events', () => {
activityMonitor.recordActivity(
ActivityType.USER_INPUT_START,
'test-context',
);
const stats = activityMonitor.getActivityStats();
expect(stats.totalEvents).toBe(2); // includes the start event
expect(stats.eventTypes[ActivityType.USER_INPUT_START]).toBe(1);
});
it('should include metadata in activity events', () => {
const metadata = { key: 'value', count: 42 };
activityMonitor.recordActivity(
ActivityType.MESSAGE_ADDED,
'test-context',
metadata,
);
const recentActivity = activityMonitor.getRecentActivity(1);
expect(recentActivity[0].metadata).toEqual(metadata);
});
it('should not record activity when monitoring is disabled', () => {
activityMonitor.updateConfig({ enabled: false });
activityMonitor.recordActivity(ActivityType.USER_INPUT_START);
const stats = activityMonitor.getActivityStats();
expect(stats.totalEvents).toBe(1); // only the start event
});
it('should limit event buffer size', () => {
activityMonitor.updateConfig({ maxEventBuffer: 3 });
// Record more events than buffer size
for (let i = 0; i < 5; i++) {
activityMonitor.recordActivity(
ActivityType.USER_INPUT_START,
`event-${i}`,
);
}
const stats = activityMonitor.getActivityStats();
expect(stats.totalEvents).toBe(3); // buffer limit
});
});
describe('listeners', () => {
let listenerCallCount: number;
let lastEvent: ActivityEvent | null;
beforeEach(() => {
listenerCallCount = 0;
lastEvent = null;
activityMonitor.start(mockConfig);
});
it('should notify listeners of activity events', () => {
const listener = (event: ActivityEvent) => {
listenerCallCount++;
lastEvent = event;
};
activityMonitor.addListener(listener);
activityMonitor.recordActivity(ActivityType.MESSAGE_ADDED, 'test');
expect(listenerCallCount).toBe(1);
expect(lastEvent?.type).toBe(ActivityType.MESSAGE_ADDED);
expect(lastEvent?.context).toBe('test');
});
it('should remove listeners correctly', () => {
const listener = () => {
listenerCallCount++;
};
activityMonitor.addListener(listener);
activityMonitor.recordActivity(ActivityType.USER_INPUT_START);
expect(listenerCallCount).toBe(1);
activityMonitor.removeListener(listener);
activityMonitor.recordActivity(ActivityType.USER_INPUT_START);
expect(listenerCallCount).toBe(1); // Should not increase
});
it('should handle listener errors gracefully', () => {
const faultyListener = () => {
throw new Error('Listener error');
};
const goodListener = () => {
listenerCallCount++;
};
// Spy on console.debug to check error handling
const debugSpy = vi.spyOn(console, 'debug').mockImplementation(() => {});
activityMonitor.addListener(faultyListener);
activityMonitor.addListener(goodListener);
activityMonitor.recordActivity(ActivityType.USER_INPUT_START);
expect(listenerCallCount).toBe(1); // Good listener should still work
expect(debugSpy).toHaveBeenCalled();
debugSpy.mockRestore();
});
});
describe('getActivityStats', () => {
beforeEach(() => {
activityMonitor.start(mockConfig);
});
it('should return correct activity statistics', () => {
activityMonitor.recordActivity(ActivityType.USER_INPUT_START);
activityMonitor.recordActivity(ActivityType.MESSAGE_ADDED);
activityMonitor.recordActivity(ActivityType.USER_INPUT_START);
const stats = activityMonitor.getActivityStats();
expect(stats.totalEvents).toBe(4); // includes start event
expect(stats.eventTypes[ActivityType.USER_INPUT_START]).toBe(2);
expect(stats.eventTypes[ActivityType.MESSAGE_ADDED]).toBe(1);
expect(stats.timeRange).toBeDefined();
});
it('should return null time range for empty buffer', () => {
const emptyMonitor = new ActivityMonitor();
const stats = emptyMonitor.getActivityStats();
expect(stats.totalEvents).toBe(0);
expect(stats.timeRange).toBeNull();
});
});
describe('updateConfig', () => {
it('should update configuration correctly', () => {
const newConfig = { snapshotThrottleMs: 2000 };
activityMonitor.updateConfig(newConfig);
// Config should be updated (tested indirectly through behavior)
expect(activityMonitor).toBeDefined();
});
});
});
describe('Global activity monitoring functions', () => {
let mockConfig: Config;
beforeEach(() => {
mockConfig = {
getSessionId: () => 'test-session-456',
} as Config;
vi.clearAllMocks();
});
afterEach(() => {
stopGlobalActivityMonitoring();
});
describe('initializeActivityMonitor', () => {
it('should create global monitor instance', () => {
const monitor = initializeActivityMonitor();
expect(monitor).toBeDefined();
expect(getActivityMonitor()).toBe(monitor);
});
it('should return same instance on subsequent calls', () => {
const monitor1 = initializeActivityMonitor();
const monitor2 = initializeActivityMonitor();
expect(monitor1).toBe(monitor2);
});
});
describe('recordGlobalActivity', () => {
it('should record activity through global monitor', () => {
startGlobalActivityMonitoring(mockConfig);
recordGlobalActivity(ActivityType.TOOL_CALL_SCHEDULED, 'global-test');
const monitor = getActivityMonitor();
const stats = monitor?.getActivityStats();
expect(stats?.totalEvents).toBeGreaterThan(0);
});
it('should handle missing global monitor gracefully', () => {
stopGlobalActivityMonitoring();
// Should not throw error
expect(() => {
recordGlobalActivity(ActivityType.USER_INPUT_START);
}).not.toThrow();
});
});
describe('startGlobalActivityMonitoring', () => {
it('should start global monitoring with default config', () => {
startGlobalActivityMonitoring(mockConfig);
const monitor = getActivityMonitor();
expect(monitor?.isMonitoringActive()).toBe(true);
});
it('should start global monitoring with custom config', () => {
const customConfig = {
...DEFAULT_ACTIVITY_CONFIG,
snapshotThrottleMs: 3000,
};
startGlobalActivityMonitoring(mockConfig, customConfig);
const monitor = getActivityMonitor();
expect(monitor?.isMonitoringActive()).toBe(true);
});
});
describe('stopGlobalActivityMonitoring', () => {
it('should stop global monitoring', () => {
startGlobalActivityMonitoring(mockConfig);
expect(getActivityMonitor()?.isMonitoringActive()).toBe(true);
stopGlobalActivityMonitoring();
expect(getActivityMonitor()?.isMonitoringActive()).toBe(false);
});
it('should handle missing global monitor gracefully', () => {
expect(() => {
stopGlobalActivityMonitoring();
}).not.toThrow();
});
});
});

View File

@@ -0,0 +1,292 @@
/**
* @license
* Copyright 2025 Google LLC
* SPDX-License-Identifier: Apache-2.0
*/
import type { Config } from '../config/config.js';
import { isPerformanceMonitoringActive } from './metrics.js';
import { getMemoryMonitor } from './memory-monitor.js';
import { ActivityType } from './activity-types.js';
/**
* Activity event data structure
*/
export interface ActivityEvent {
type: ActivityType;
timestamp: number;
context?: string;
metadata?: Record<string, unknown>;
}
/**
* Configuration for activity monitoring
*/
export interface ActivityMonitorConfig {
/** Enable/disable activity monitoring */
enabled: boolean;
/** Minimum interval between memory snapshots (ms) */
snapshotThrottleMs: number;
/** Maximum number of events to buffer */
maxEventBuffer: number;
/** Activity types that should trigger immediate memory snapshots */
triggerActivities: ActivityType[];
}
/**
* Activity listener callback function
*/
export type ActivityListener = (event: ActivityEvent) => void;
/**
* Default configuration for activity monitoring
*/
export const DEFAULT_ACTIVITY_CONFIG: ActivityMonitorConfig = {
enabled: true,
snapshotThrottleMs: 1000, // 1 second minimum between snapshots
maxEventBuffer: 100,
triggerActivities: [
ActivityType.USER_INPUT_START,
ActivityType.MESSAGE_ADDED,
ActivityType.TOOL_CALL_SCHEDULED,
ActivityType.STREAM_START,
],
};
/**
* Activity monitor class that tracks user activity and triggers memory monitoring
*/
export class ActivityMonitor {
private listeners = new Set<ActivityListener>();
private eventBuffer: ActivityEvent[] = [];
private lastSnapshotTime = 0;
private config: ActivityMonitorConfig;
private isActive = false;
private memoryMonitoringListener: ActivityListener | null = null;
constructor(config: ActivityMonitorConfig = DEFAULT_ACTIVITY_CONFIG) {
this.config = { ...config };
}
/**
* Start activity monitoring
*/
start(coreConfig: Config): void {
if (!isPerformanceMonitoringActive() || this.isActive) {
return;
}
this.isActive = true;
// Register default memory monitoring listener
this.memoryMonitoringListener = (event) => {
this.handleMemoryMonitoringActivity(event, coreConfig);
};
this.addListener(this.memoryMonitoringListener);
// Record activity monitoring start
this.recordActivity(
ActivityType.MANUAL_TRIGGER,
'activity_monitoring_start',
);
}
/**
* Stop activity monitoring
*/
stop(): void {
if (!this.isActive) {
return;
}
this.isActive = false;
if (this.memoryMonitoringListener) {
this.removeListener(this.memoryMonitoringListener);
this.memoryMonitoringListener = null;
}
this.eventBuffer = [];
}
/**
* Add an activity listener
*/
addListener(listener: ActivityListener): void {
this.listeners.add(listener);
}
/**
* Remove an activity listener
*/
removeListener(listener: ActivityListener): void {
this.listeners.delete(listener);
}
/**
* Record a user activity event
*/
recordActivity(
type: ActivityType,
context?: string,
metadata?: Record<string, unknown>,
): void {
if (!this.isActive || !this.config.enabled) {
return;
}
const event: ActivityEvent = {
type,
timestamp: Date.now(),
context,
metadata,
};
// Add to buffer
this.eventBuffer.push(event);
if (this.eventBuffer.length > this.config.maxEventBuffer) {
this.eventBuffer.shift(); // Remove oldest event
}
// Notify listeners
this.listeners.forEach((listener) => {
try {
listener(event);
} catch (error) {
// Silently catch listener errors to avoid disrupting the application
console.debug('ActivityMonitor listener error:', error);
}
});
}
/**
* Get recent activity events
*/
getRecentActivity(limit?: number): ActivityEvent[] {
const events = [...this.eventBuffer];
return limit ? events.slice(-limit) : events;
}
/**
* Get activity statistics
*/
getActivityStats(): {
totalEvents: number;
eventTypes: Record<ActivityType, number>;
timeRange: { start: number; end: number } | null;
} {
const eventTypes = {} as Record<ActivityType, number>;
let start = Number.MAX_SAFE_INTEGER;
let end = 0;
for (const event of this.eventBuffer) {
eventTypes[event.type] = (eventTypes[event.type] || 0) + 1;
start = Math.min(start, event.timestamp);
end = Math.max(end, event.timestamp);
}
return {
totalEvents: this.eventBuffer.length,
eventTypes,
timeRange: this.eventBuffer.length > 0 ? { start, end } : null,
};
}
/**
* Update configuration
*/
updateConfig(newConfig: Partial<ActivityMonitorConfig>): void {
this.config = { ...this.config, ...newConfig };
}
/**
* Handle memory monitoring for activity events
*/
private handleMemoryMonitoringActivity(
event: ActivityEvent,
config: Config,
): void {
// Check if this activity type should trigger memory monitoring
if (!this.config.triggerActivities.includes(event.type)) {
return;
}
// Throttle memory snapshots
const now = Date.now();
if (now - this.lastSnapshotTime < this.config.snapshotThrottleMs) {
return;
}
this.lastSnapshotTime = now;
// Take memory snapshot
const memoryMonitor = getMemoryMonitor();
if (memoryMonitor) {
const context = event.context
? `activity_${event.type}_${event.context}`
: `activity_${event.type}`;
memoryMonitor.takeSnapshot(context, config);
}
}
/**
* Check if monitoring is active
*/
isMonitoringActive(): boolean {
return this.isActive && this.config.enabled;
}
}
// Singleton instance for global activity monitoring
let globalActivityMonitor: ActivityMonitor | null = null;
/**
* Initialize global activity monitor
*/
export function initializeActivityMonitor(
config?: ActivityMonitorConfig,
): ActivityMonitor {
if (!globalActivityMonitor) {
globalActivityMonitor = new ActivityMonitor(config);
}
return globalActivityMonitor;
}
/**
* Get global activity monitor instance
*/
export function getActivityMonitor(): ActivityMonitor | null {
return globalActivityMonitor;
}
/**
* Record a user activity on the global monitor (convenience function)
*/
export function recordGlobalActivity(
type: ActivityType,
context?: string,
metadata?: Record<string, unknown>,
): void {
if (globalActivityMonitor) {
globalActivityMonitor.recordActivity(type, context, metadata);
}
}
/**
* Start global activity monitoring
*/
export function startGlobalActivityMonitoring(
coreConfig: Config,
activityConfig?: ActivityMonitorConfig,
): void {
const monitor = initializeActivityMonitor(activityConfig);
monitor.start(coreConfig);
}
/**
* Stop global activity monitoring
*/
export function stopGlobalActivityMonitoring(): void {
if (globalActivityMonitor) {
globalActivityMonitor.stop();
}
}

View File

@@ -87,6 +87,13 @@ export {
recordUserActivity,
isUserActive,
} from './activity-detector.js';
export {
ActivityMonitor,
initializeActivityMonitor,
getActivityMonitor,
startGlobalActivityMonitoring,
stopGlobalActivityMonitoring,
} from './activity-monitor.js';
export {
// Core metrics functions
recordToolCallMetrics,