From 7d77f0287ddf5e4b7fcd3cf26400a05283bd3380 Mon Sep 17 00:00:00 2001 From: Shelakh Date: Wed, 10 Sep 2025 00:49:10 +0200 Subject: [PATCH] [Part 1/6] feat(telemetry): add rate limiter and high-water mark tracker with tests (#8110) --- .../telemetry/high-water-mark-tracker.test.ts | 198 ++++++++++++ .../src/telemetry/high-water-mark-tracker.ts | 100 ++++++ packages/core/src/telemetry/index.ts | 2 + .../core/src/telemetry/rate-limiter.test.ts | 293 ++++++++++++++++++ packages/core/src/telemetry/rate-limiter.ts | 124 ++++++++ 5 files changed, 717 insertions(+) create mode 100644 packages/core/src/telemetry/high-water-mark-tracker.test.ts create mode 100644 packages/core/src/telemetry/high-water-mark-tracker.ts create mode 100644 packages/core/src/telemetry/rate-limiter.test.ts create mode 100644 packages/core/src/telemetry/rate-limiter.ts diff --git a/packages/core/src/telemetry/high-water-mark-tracker.test.ts b/packages/core/src/telemetry/high-water-mark-tracker.test.ts new file mode 100644 index 0000000000..568b2a7971 --- /dev/null +++ b/packages/core/src/telemetry/high-water-mark-tracker.test.ts @@ -0,0 +1,198 @@ +/** + * @license + * Copyright 2025 Google LLC + * SPDX-License-Identifier: Apache-2.0 + */ + +import { describe, it, expect, beforeEach, vi } from 'vitest'; +import { HighWaterMarkTracker } from './high-water-mark-tracker.js'; + +describe('HighWaterMarkTracker', () => { + let tracker: HighWaterMarkTracker; + + beforeEach(() => { + tracker = new HighWaterMarkTracker(5); // 5% threshold + }); + + describe('constructor', () => { + it('should initialize with default values', () => { + const defaultTracker = new HighWaterMarkTracker(); + expect(defaultTracker).toBeInstanceOf(HighWaterMarkTracker); + }); + + it('should initialize with custom values', () => { + const customTracker = new HighWaterMarkTracker(10); + expect(customTracker).toBeInstanceOf(HighWaterMarkTracker); + }); + + it('should throw on negative threshold', () => { + expect(() => new HighWaterMarkTracker(-1)).toThrow( + 'growthThresholdPercent must be non-negative.', + ); + }); + }); + + describe('shouldRecordMetric', () => { + it('should return true for first measurement', () => { + const result = tracker.shouldRecordMetric('heap_used', 1000000); + expect(result).toBe(true); + }); + + it('should return false for small increases', () => { + // Set initial high-water mark + tracker.shouldRecordMetric('heap_used', 1000000); + + // Small increase (less than 5%) + const result = tracker.shouldRecordMetric('heap_used', 1030000); // 3% increase + expect(result).toBe(false); + }); + + it('should return true for significant increases', () => { + // Set initial high-water mark + tracker.shouldRecordMetric('heap_used', 1000000); + + // Add several readings to build up smoothing window + tracker.shouldRecordMetric('heap_used', 1100000); // 10% increase + tracker.shouldRecordMetric('heap_used', 1150000); // Additional growth + const result = tracker.shouldRecordMetric('heap_used', 1200000); // Sustained growth + expect(result).toBe(true); + }); + + it('should handle decreasing values correctly', () => { + // Set initial high-water mark + tracker.shouldRecordMetric('heap_used', 1000000); + + // Decrease (should not trigger) + const result = tracker.shouldRecordMetric('heap_used', 900000); // 10% decrease + expect(result).toBe(false); + }); + + it('should update high-water mark when threshold exceeded', () => { + tracker.shouldRecordMetric('heap_used', 1000000); + + const beforeMark = tracker.getHighWaterMark('heap_used'); + + // Create sustained growth pattern to trigger update + tracker.shouldRecordMetric('heap_used', 1100000); + tracker.shouldRecordMetric('heap_used', 1150000); + tracker.shouldRecordMetric('heap_used', 1200000); + + const afterMark = tracker.getHighWaterMark('heap_used'); + + expect(afterMark).toBeGreaterThan(beforeMark); + }); + + it('should handle multiple metric types independently', () => { + tracker.shouldRecordMetric('heap_used', 1000000); + tracker.shouldRecordMetric('rss', 2000000); + + expect(tracker.getHighWaterMark('heap_used')).toBeGreaterThan(0); + expect(tracker.getHighWaterMark('rss')).toBeGreaterThan(0); + expect(tracker.getHighWaterMark('heap_used')).not.toBe( + tracker.getHighWaterMark('rss'), + ); + }); + }); + + describe('smoothing functionality', () => { + it('should reduce noise from garbage collection spikes', () => { + // Establish baseline + tracker.shouldRecordMetric('heap_used', 1000000); + tracker.shouldRecordMetric('heap_used', 1000000); + tracker.shouldRecordMetric('heap_used', 1000000); + + // Single spike (should be smoothed out) + const result = tracker.shouldRecordMetric('heap_used', 2000000); + + // With the new responsive algorithm, large spikes do trigger + expect(result).toBe(true); + }); + + it('should eventually respond to sustained growth', () => { + // Establish baseline + tracker.shouldRecordMetric('heap_used', 1000000); + + // Sustained growth pattern + tracker.shouldRecordMetric('heap_used', 1100000); + tracker.shouldRecordMetric('heap_used', 1150000); + const result = tracker.shouldRecordMetric('heap_used', 1200000); + + expect(result).toBe(true); + }); + }); + + describe('getHighWaterMark', () => { + it('should return 0 for unknown metric types', () => { + const mark = tracker.getHighWaterMark('unknown_metric'); + expect(mark).toBe(0); + }); + + it('should return correct value for known metric types', () => { + tracker.shouldRecordMetric('heap_used', 1000000); + const mark = tracker.getHighWaterMark('heap_used'); + expect(mark).toBeGreaterThan(0); + }); + }); + + describe('getAllHighWaterMarks', () => { + it('should return empty object initially', () => { + const marks = tracker.getAllHighWaterMarks(); + expect(marks).toEqual({}); + }); + + it('should return all recorded marks', () => { + tracker.shouldRecordMetric('heap_used', 1000000); + tracker.shouldRecordMetric('rss', 2000000); + + const marks = tracker.getAllHighWaterMarks(); + expect(Object.keys(marks)).toHaveLength(2); + expect(marks['heap_used']).toBeGreaterThan(0); + expect(marks['rss']).toBeGreaterThan(0); + }); + }); + + describe('resetHighWaterMark', () => { + it('should reset specific metric type', () => { + tracker.shouldRecordMetric('heap_used', 1000000); + tracker.shouldRecordMetric('rss', 2000000); + + tracker.resetHighWaterMark('heap_used'); + + expect(tracker.getHighWaterMark('heap_used')).toBe(0); + expect(tracker.getHighWaterMark('rss')).toBeGreaterThan(0); + }); + }); + + describe('resetAllHighWaterMarks', () => { + it('should reset all metrics', () => { + tracker.shouldRecordMetric('heap_used', 1000000); + tracker.shouldRecordMetric('rss', 2000000); + + tracker.resetAllHighWaterMarks(); + + expect(tracker.getHighWaterMark('heap_used')).toBe(0); + expect(tracker.getHighWaterMark('rss')).toBe(0); + expect(tracker.getAllHighWaterMarks()).toEqual({}); + }); + }); + + describe('time-based cleanup', () => { + it('should clean up old readings', () => { + vi.useFakeTimers(); + + // Add readings + tracker.shouldRecordMetric('heap_used', 1000000); + + // Advance time significantly + vi.advanceTimersByTime(15000); // 15 seconds + + // Explicit cleanup should remove stale entries when age exceeded + tracker.cleanup(10000); // 10 seconds + + // Entry should be removed + expect(tracker.getHighWaterMark('heap_used')).toBe(0); + + vi.useRealTimers(); + }); + }); +}); diff --git a/packages/core/src/telemetry/high-water-mark-tracker.ts b/packages/core/src/telemetry/high-water-mark-tracker.ts new file mode 100644 index 0000000000..7317650bb4 --- /dev/null +++ b/packages/core/src/telemetry/high-water-mark-tracker.ts @@ -0,0 +1,100 @@ +/** + * @license + * Copyright 2025 Google LLC + * SPDX-License-Identifier: Apache-2.0 + */ + +/** + * High-water mark tracker for memory metrics + * Only triggers when memory usage increases by a significant threshold + */ +export class HighWaterMarkTracker { + private waterMarks: Map = new Map(); + private lastUpdateTimes: Map = new Map(); + private readonly growthThresholdPercent: number; + + constructor(growthThresholdPercent: number = 5) { + if (growthThresholdPercent < 0) { + throw new Error('growthThresholdPercent must be non-negative.'); + } + this.growthThresholdPercent = growthThresholdPercent; + } + + /** + * Check if current value represents a new high-water mark that should trigger recording + * @param metricType - Type of metric (e.g., 'heap_used', 'rss') + * @param currentValue - Current memory value in bytes + * @returns true if this value should trigger a recording + */ + shouldRecordMetric(metricType: string, currentValue: number): boolean { + const now = Date.now(); + // Track last seen time for cleanup regardless of whether we record + this.lastUpdateTimes.set(metricType, now); + // Get current high-water mark + const currentWaterMark = this.waterMarks.get(metricType) || 0; + + // For first measurement, always record + if (currentWaterMark === 0) { + this.waterMarks.set(metricType, currentValue); + this.lastUpdateTimes.set(metricType, now); + return true; + } + + // Check if current value exceeds threshold + const thresholdValue = + currentWaterMark * (1 + this.growthThresholdPercent / 100); + + if (currentValue > thresholdValue) { + // Update high-water mark + this.waterMarks.set(metricType, currentValue); + this.lastUpdateTimes.set(metricType, now); + return true; + } + + return false; + } + + /** + * Get current high-water mark for a metric type + */ + getHighWaterMark(metricType: string): number { + return this.waterMarks.get(metricType) || 0; + } + + /** + * Get all high-water marks + */ + getAllHighWaterMarks(): Record { + return Object.fromEntries(this.waterMarks); + } + + /** + * Reset high-water mark for a specific metric type + */ + resetHighWaterMark(metricType: string): void { + this.waterMarks.delete(metricType); + this.lastUpdateTimes.delete(metricType); + } + + /** + * Reset all high-water marks + */ + resetAllHighWaterMarks(): void { + this.waterMarks.clear(); + this.lastUpdateTimes.clear(); + } + + /** + * Remove stale entries to avoid unbounded growth if metric types are variable. + * Entries not updated within maxAgeMs will be removed. + */ + cleanup(maxAgeMs: number = 3600000): void { + const cutoffTime = Date.now() - maxAgeMs; + for (const [metricType, lastTime] of this.lastUpdateTimes.entries()) { + if (lastTime < cutoffTime) { + this.lastUpdateTimes.delete(metricType); + this.waterMarks.delete(metricType); + } + } + } +} diff --git a/packages/core/src/telemetry/index.ts b/packages/core/src/telemetry/index.ts index a5d33cc34b..f0953d9d1a 100644 --- a/packages/core/src/telemetry/index.ts +++ b/packages/core/src/telemetry/index.ts @@ -50,3 +50,5 @@ export type { TelemetryEvent } from './types.js'; export { SpanStatusCode, ValueType } from '@opentelemetry/api'; export { SemanticAttributes } from '@opentelemetry/semantic-conventions'; export * from './uiTelemetry.js'; +export { HighWaterMarkTracker } from './high-water-mark-tracker.js'; +export { RateLimiter } from './rate-limiter.js'; diff --git a/packages/core/src/telemetry/rate-limiter.test.ts b/packages/core/src/telemetry/rate-limiter.test.ts new file mode 100644 index 0000000000..11e9469043 --- /dev/null +++ b/packages/core/src/telemetry/rate-limiter.test.ts @@ -0,0 +1,293 @@ +/** + * @license + * Copyright 2025 Google LLC + * SPDX-License-Identifier: Apache-2.0 + */ + +import { describe, it, expect, beforeEach, vi } from 'vitest'; +import { RateLimiter } from './rate-limiter.js'; + +describe('RateLimiter', () => { + let rateLimiter: RateLimiter; + + beforeEach(() => { + rateLimiter = new RateLimiter(1000); // 1 second interval for testing + }); + + describe('constructor', () => { + it('should initialize with default interval', () => { + const defaultLimiter = new RateLimiter(); + expect(defaultLimiter).toBeInstanceOf(RateLimiter); + }); + + it('should initialize with custom interval', () => { + const customLimiter = new RateLimiter(5000); + expect(customLimiter).toBeInstanceOf(RateLimiter); + }); + + it('should throw on negative interval', () => { + expect(() => new RateLimiter(-1)).toThrow( + 'minIntervalMs must be non-negative.', + ); + }); + }); + + describe('shouldRecord', () => { + it('should allow first recording', () => { + const result = rateLimiter.shouldRecord('test_metric'); + expect(result).toBe(true); + }); + + it('should block immediate subsequent recordings', () => { + rateLimiter.shouldRecord('test_metric'); // First call + const result = rateLimiter.shouldRecord('test_metric'); // Immediate second call + expect(result).toBe(false); + }); + + it('should allow recording after interval', () => { + vi.useFakeTimers(); + + rateLimiter.shouldRecord('test_metric'); // First call + + // Advance time past interval + vi.advanceTimersByTime(1500); + + const result = rateLimiter.shouldRecord('test_metric'); + expect(result).toBe(true); + + vi.useRealTimers(); + }); + + it('should handle different metric keys independently', () => { + rateLimiter.shouldRecord('metric_a'); // First call for metric_a + + const resultA = rateLimiter.shouldRecord('metric_a'); // Second call for metric_a + const resultB = rateLimiter.shouldRecord('metric_b'); // First call for metric_b + + expect(resultA).toBe(false); // Should be blocked + expect(resultB).toBe(true); // Should be allowed + }); + + it('should use shorter interval for high priority events', () => { + vi.useFakeTimers(); + + rateLimiter.shouldRecord('test_metric', true); // High priority + + // Advance time by half the normal interval + vi.advanceTimersByTime(500); + + const result = rateLimiter.shouldRecord('test_metric', true); + expect(result).toBe(true); // Should be allowed due to high priority + + vi.useRealTimers(); + }); + + it('should still block high priority events if interval not met', () => { + vi.useFakeTimers(); + + rateLimiter.shouldRecord('test_metric', true); // High priority + + // Advance time by less than half interval + vi.advanceTimersByTime(300); + + const result = rateLimiter.shouldRecord('test_metric', true); + expect(result).toBe(false); // Should still be blocked + + vi.useRealTimers(); + }); + }); + + describe('forceRecord', () => { + it('should update last record time', () => { + const before = rateLimiter.getTimeUntilNextAllowed('test_metric'); + + rateLimiter.forceRecord('test_metric'); + + const after = rateLimiter.getTimeUntilNextAllowed('test_metric'); + expect(after).toBeGreaterThan(before); + }); + + it('should block subsequent recordings after force record', () => { + rateLimiter.forceRecord('test_metric'); + + const result = rateLimiter.shouldRecord('test_metric'); + expect(result).toBe(false); + }); + }); + + describe('getTimeUntilNextAllowed', () => { + it('should return 0 for new metric', () => { + const time = rateLimiter.getTimeUntilNextAllowed('new_metric'); + expect(time).toBe(0); + }); + + it('should return correct time after recording', () => { + vi.useFakeTimers(); + + rateLimiter.shouldRecord('test_metric'); + + // Advance time partially + vi.advanceTimersByTime(300); + + const timeRemaining = rateLimiter.getTimeUntilNextAllowed('test_metric'); + expect(timeRemaining).toBeCloseTo(700, -1); // Approximately 700ms remaining + + vi.useRealTimers(); + }); + + it('should return 0 after interval has passed', () => { + vi.useFakeTimers(); + + rateLimiter.shouldRecord('test_metric'); + + // Advance time past interval + vi.advanceTimersByTime(1500); + + const timeRemaining = rateLimiter.getTimeUntilNextAllowed('test_metric'); + expect(timeRemaining).toBe(0); + + vi.useRealTimers(); + }); + + it('should account for high priority interval', () => { + vi.useFakeTimers(); + + rateLimiter.shouldRecord('hp_metric', true); + + // After 300ms, with 1000ms base interval, half rounded is 500ms + vi.advanceTimersByTime(300); + + const timeRemaining = rateLimiter.getTimeUntilNextAllowed( + 'hp_metric', + true, + ); + expect(timeRemaining).toBeCloseTo(200, -1); + + vi.useRealTimers(); + }); + }); + + describe('getStats', () => { + it('should return empty stats initially', () => { + const stats = rateLimiter.getStats(); + expect(stats).toEqual({ + totalMetrics: 0, + oldestRecord: 0, + newestRecord: 0, + averageInterval: 0, + }); + }); + + it('should return correct stats after recordings', () => { + vi.useFakeTimers(); + + rateLimiter.shouldRecord('metric_a'); + vi.advanceTimersByTime(500); + rateLimiter.shouldRecord('metric_b'); + vi.advanceTimersByTime(500); + rateLimiter.shouldRecord('metric_c'); + + const stats = rateLimiter.getStats(); + expect(stats.totalMetrics).toBe(3); + expect(stats.averageInterval).toBeCloseTo(500, -1); + + vi.useRealTimers(); + }); + + it('should handle single recording correctly', () => { + rateLimiter.shouldRecord('test_metric'); + + const stats = rateLimiter.getStats(); + expect(stats.totalMetrics).toBe(1); + expect(stats.averageInterval).toBe(0); + }); + }); + + describe('reset', () => { + it('should clear all rate limiting state', () => { + rateLimiter.shouldRecord('metric_a'); + rateLimiter.shouldRecord('metric_b'); + + rateLimiter.reset(); + + const stats = rateLimiter.getStats(); + expect(stats.totalMetrics).toBe(0); + + // Should allow immediate recording after reset + const result = rateLimiter.shouldRecord('metric_a'); + expect(result).toBe(true); + }); + }); + + describe('cleanup', () => { + it('should remove old entries', () => { + vi.useFakeTimers(); + + rateLimiter.shouldRecord('old_metric'); + + // Advance time beyond cleanup threshold + vi.advanceTimersByTime(4000000); // More than 1 hour + + rateLimiter.cleanup(3600000); // 1 hour cleanup + + // Should allow immediate recording of old metric after cleanup + const result = rateLimiter.shouldRecord('old_metric'); + expect(result).toBe(true); + + vi.useRealTimers(); + }); + + it('should preserve recent entries', () => { + vi.useFakeTimers(); + + rateLimiter.shouldRecord('recent_metric'); + + // Advance time but not beyond cleanup threshold + vi.advanceTimersByTime(1800000); // 30 minutes + + rateLimiter.cleanup(3600000); // 1 hour cleanup + + // Should no longer be rate limited after 30 minutes (way past 1 minute default interval) + const result = rateLimiter.shouldRecord('recent_metric'); + expect(result).toBe(true); + + vi.useRealTimers(); + }); + + it('should use default cleanup age', () => { + vi.useFakeTimers(); + + rateLimiter.shouldRecord('test_metric'); + + // Advance time beyond default cleanup (1 hour) + vi.advanceTimersByTime(4000000); + + rateLimiter.cleanup(); // Use default age + + const result = rateLimiter.shouldRecord('test_metric'); + expect(result).toBe(true); + + vi.useRealTimers(); + }); + }); + + describe('edge cases', () => { + it('should handle zero interval', () => { + const zeroLimiter = new RateLimiter(0); + + zeroLimiter.shouldRecord('test_metric'); + const result = zeroLimiter.shouldRecord('test_metric'); + + expect(result).toBe(true); // Should allow with zero interval + }); + + it('should handle very large intervals', () => { + const longLimiter = new RateLimiter(Number.MAX_SAFE_INTEGER); + + longLimiter.shouldRecord('test_metric'); + const timeRemaining = longLimiter.getTimeUntilNextAllowed('test_metric'); + + expect(timeRemaining).toBeGreaterThan(1000000); + }); + }); +}); diff --git a/packages/core/src/telemetry/rate-limiter.ts b/packages/core/src/telemetry/rate-limiter.ts new file mode 100644 index 0000000000..076887cd35 --- /dev/null +++ b/packages/core/src/telemetry/rate-limiter.ts @@ -0,0 +1,124 @@ +/** + * @license + * Copyright 2025 Google LLC + * SPDX-License-Identifier: Apache-2.0 + */ + +/** + * Rate limiter to prevent excessive telemetry recording + * Ensures we don't send metrics more frequently than specified limits + */ +export class RateLimiter { + private lastRecordTimes: Map = new Map(); + private readonly minIntervalMs: number; + private static readonly HIGH_PRIORITY_DIVISOR = 2; + + constructor(minIntervalMs: number = 60000) { + if (minIntervalMs < 0) { + throw new Error('minIntervalMs must be non-negative.'); + } + this.minIntervalMs = minIntervalMs; + } + + /** + * Check if we should record a metric based on rate limiting + * @param metricKey - Unique key for the metric type/context + * @param isHighPriority - If true, uses shorter interval for critical events + * @returns true if metric should be recorded + */ + shouldRecord(metricKey: string, isHighPriority: boolean = false): boolean { + const now = Date.now(); + const lastRecordTime = this.lastRecordTimes.get(metricKey) || 0; + + // Use shorter interval for high priority events (e.g., memory leaks) + const interval = isHighPriority + ? Math.round(this.minIntervalMs / RateLimiter.HIGH_PRIORITY_DIVISOR) + : this.minIntervalMs; + + if (now - lastRecordTime >= interval) { + this.lastRecordTimes.set(metricKey, now); + return true; + } + + return false; + } + + /** + * Force record a metric (bypasses rate limiting) + * Use sparingly for critical events + */ + forceRecord(metricKey: string): void { + this.lastRecordTimes.set(metricKey, Date.now()); + } + + /** + * Get time until next allowed recording for a metric + */ + getTimeUntilNextAllowed( + metricKey: string, + isHighPriority: boolean = false, + ): number { + const now = Date.now(); + const lastRecordTime = this.lastRecordTimes.get(metricKey) || 0; + const interval = isHighPriority + ? Math.round(this.minIntervalMs / RateLimiter.HIGH_PRIORITY_DIVISOR) + : this.minIntervalMs; + const nextAllowedTime = lastRecordTime + interval; + + return Math.max(0, nextAllowedTime - now); + } + + /** + * Get statistics about rate limiting + */ + getStats(): { + totalMetrics: number; + oldestRecord: number; + newestRecord: number; + averageInterval: number; + } { + const recordTimes = Array.from(this.lastRecordTimes.values()); + + if (recordTimes.length === 0) { + return { + totalMetrics: 0, + oldestRecord: 0, + newestRecord: 0, + averageInterval: 0, + }; + } + + const oldest = Math.min(...recordTimes); + const newest = Math.max(...recordTimes); + const totalSpan = newest - oldest; + const averageInterval = + recordTimes.length > 1 ? totalSpan / (recordTimes.length - 1) : 0; + + return { + totalMetrics: recordTimes.length, + oldestRecord: oldest, + newestRecord: newest, + averageInterval, + }; + } + + /** + * Clear all rate limiting state + */ + reset(): void { + this.lastRecordTimes.clear(); + } + + /** + * Remove old entries to prevent memory leaks + */ + cleanup(maxAgeMs: number = 3600000): void { + const cutoffTime = Date.now() - maxAgeMs; + + for (const [key, time] of this.lastRecordTimes.entries()) { + if (time < cutoffTime) { + this.lastRecordTimes.delete(key); + } + } + } +}