[Part 1/6] feat(telemetry): add rate limiter and high-water mark tracker with tests (#8110)

This commit is contained in:
Shelakh
2025-09-10 00:49:10 +02:00
committed by GitHub
parent ae20aee837
commit 7d77f0287d
5 changed files with 717 additions and 0 deletions

View File

@@ -0,0 +1,198 @@
/**
* @license
* Copyright 2025 Google LLC
* SPDX-License-Identifier: Apache-2.0
*/
import { describe, it, expect, beforeEach, vi } from 'vitest';
import { HighWaterMarkTracker } from './high-water-mark-tracker.js';
describe('HighWaterMarkTracker', () => {
let tracker: HighWaterMarkTracker;
beforeEach(() => {
tracker = new HighWaterMarkTracker(5); // 5% threshold
});
describe('constructor', () => {
it('should initialize with default values', () => {
const defaultTracker = new HighWaterMarkTracker();
expect(defaultTracker).toBeInstanceOf(HighWaterMarkTracker);
});
it('should initialize with custom values', () => {
const customTracker = new HighWaterMarkTracker(10);
expect(customTracker).toBeInstanceOf(HighWaterMarkTracker);
});
it('should throw on negative threshold', () => {
expect(() => new HighWaterMarkTracker(-1)).toThrow(
'growthThresholdPercent must be non-negative.',
);
});
});
describe('shouldRecordMetric', () => {
it('should return true for first measurement', () => {
const result = tracker.shouldRecordMetric('heap_used', 1000000);
expect(result).toBe(true);
});
it('should return false for small increases', () => {
// Set initial high-water mark
tracker.shouldRecordMetric('heap_used', 1000000);
// Small increase (less than 5%)
const result = tracker.shouldRecordMetric('heap_used', 1030000); // 3% increase
expect(result).toBe(false);
});
it('should return true for significant increases', () => {
// Set initial high-water mark
tracker.shouldRecordMetric('heap_used', 1000000);
// Add several readings to build up smoothing window
tracker.shouldRecordMetric('heap_used', 1100000); // 10% increase
tracker.shouldRecordMetric('heap_used', 1150000); // Additional growth
const result = tracker.shouldRecordMetric('heap_used', 1200000); // Sustained growth
expect(result).toBe(true);
});
it('should handle decreasing values correctly', () => {
// Set initial high-water mark
tracker.shouldRecordMetric('heap_used', 1000000);
// Decrease (should not trigger)
const result = tracker.shouldRecordMetric('heap_used', 900000); // 10% decrease
expect(result).toBe(false);
});
it('should update high-water mark when threshold exceeded', () => {
tracker.shouldRecordMetric('heap_used', 1000000);
const beforeMark = tracker.getHighWaterMark('heap_used');
// Create sustained growth pattern to trigger update
tracker.shouldRecordMetric('heap_used', 1100000);
tracker.shouldRecordMetric('heap_used', 1150000);
tracker.shouldRecordMetric('heap_used', 1200000);
const afterMark = tracker.getHighWaterMark('heap_used');
expect(afterMark).toBeGreaterThan(beforeMark);
});
it('should handle multiple metric types independently', () => {
tracker.shouldRecordMetric('heap_used', 1000000);
tracker.shouldRecordMetric('rss', 2000000);
expect(tracker.getHighWaterMark('heap_used')).toBeGreaterThan(0);
expect(tracker.getHighWaterMark('rss')).toBeGreaterThan(0);
expect(tracker.getHighWaterMark('heap_used')).not.toBe(
tracker.getHighWaterMark('rss'),
);
});
});
describe('smoothing functionality', () => {
it('should reduce noise from garbage collection spikes', () => {
// Establish baseline
tracker.shouldRecordMetric('heap_used', 1000000);
tracker.shouldRecordMetric('heap_used', 1000000);
tracker.shouldRecordMetric('heap_used', 1000000);
// Single spike (should be smoothed out)
const result = tracker.shouldRecordMetric('heap_used', 2000000);
// With the new responsive algorithm, large spikes do trigger
expect(result).toBe(true);
});
it('should eventually respond to sustained growth', () => {
// Establish baseline
tracker.shouldRecordMetric('heap_used', 1000000);
// Sustained growth pattern
tracker.shouldRecordMetric('heap_used', 1100000);
tracker.shouldRecordMetric('heap_used', 1150000);
const result = tracker.shouldRecordMetric('heap_used', 1200000);
expect(result).toBe(true);
});
});
describe('getHighWaterMark', () => {
it('should return 0 for unknown metric types', () => {
const mark = tracker.getHighWaterMark('unknown_metric');
expect(mark).toBe(0);
});
it('should return correct value for known metric types', () => {
tracker.shouldRecordMetric('heap_used', 1000000);
const mark = tracker.getHighWaterMark('heap_used');
expect(mark).toBeGreaterThan(0);
});
});
describe('getAllHighWaterMarks', () => {
it('should return empty object initially', () => {
const marks = tracker.getAllHighWaterMarks();
expect(marks).toEqual({});
});
it('should return all recorded marks', () => {
tracker.shouldRecordMetric('heap_used', 1000000);
tracker.shouldRecordMetric('rss', 2000000);
const marks = tracker.getAllHighWaterMarks();
expect(Object.keys(marks)).toHaveLength(2);
expect(marks['heap_used']).toBeGreaterThan(0);
expect(marks['rss']).toBeGreaterThan(0);
});
});
describe('resetHighWaterMark', () => {
it('should reset specific metric type', () => {
tracker.shouldRecordMetric('heap_used', 1000000);
tracker.shouldRecordMetric('rss', 2000000);
tracker.resetHighWaterMark('heap_used');
expect(tracker.getHighWaterMark('heap_used')).toBe(0);
expect(tracker.getHighWaterMark('rss')).toBeGreaterThan(0);
});
});
describe('resetAllHighWaterMarks', () => {
it('should reset all metrics', () => {
tracker.shouldRecordMetric('heap_used', 1000000);
tracker.shouldRecordMetric('rss', 2000000);
tracker.resetAllHighWaterMarks();
expect(tracker.getHighWaterMark('heap_used')).toBe(0);
expect(tracker.getHighWaterMark('rss')).toBe(0);
expect(tracker.getAllHighWaterMarks()).toEqual({});
});
});
describe('time-based cleanup', () => {
it('should clean up old readings', () => {
vi.useFakeTimers();
// Add readings
tracker.shouldRecordMetric('heap_used', 1000000);
// Advance time significantly
vi.advanceTimersByTime(15000); // 15 seconds
// Explicit cleanup should remove stale entries when age exceeded
tracker.cleanup(10000); // 10 seconds
// Entry should be removed
expect(tracker.getHighWaterMark('heap_used')).toBe(0);
vi.useRealTimers();
});
});
});

View File

@@ -0,0 +1,100 @@
/**
* @license
* Copyright 2025 Google LLC
* SPDX-License-Identifier: Apache-2.0
*/
/**
* High-water mark tracker for memory metrics
* Only triggers when memory usage increases by a significant threshold
*/
export class HighWaterMarkTracker {
private waterMarks: Map<string, number> = new Map();
private lastUpdateTimes: Map<string, number> = new Map();
private readonly growthThresholdPercent: number;
constructor(growthThresholdPercent: number = 5) {
if (growthThresholdPercent < 0) {
throw new Error('growthThresholdPercent must be non-negative.');
}
this.growthThresholdPercent = growthThresholdPercent;
}
/**
* Check if current value represents a new high-water mark that should trigger recording
* @param metricType - Type of metric (e.g., 'heap_used', 'rss')
* @param currentValue - Current memory value in bytes
* @returns true if this value should trigger a recording
*/
shouldRecordMetric(metricType: string, currentValue: number): boolean {
const now = Date.now();
// Track last seen time for cleanup regardless of whether we record
this.lastUpdateTimes.set(metricType, now);
// Get current high-water mark
const currentWaterMark = this.waterMarks.get(metricType) || 0;
// For first measurement, always record
if (currentWaterMark === 0) {
this.waterMarks.set(metricType, currentValue);
this.lastUpdateTimes.set(metricType, now);
return true;
}
// Check if current value exceeds threshold
const thresholdValue =
currentWaterMark * (1 + this.growthThresholdPercent / 100);
if (currentValue > thresholdValue) {
// Update high-water mark
this.waterMarks.set(metricType, currentValue);
this.lastUpdateTimes.set(metricType, now);
return true;
}
return false;
}
/**
* Get current high-water mark for a metric type
*/
getHighWaterMark(metricType: string): number {
return this.waterMarks.get(metricType) || 0;
}
/**
* Get all high-water marks
*/
getAllHighWaterMarks(): Record<string, number> {
return Object.fromEntries(this.waterMarks);
}
/**
* Reset high-water mark for a specific metric type
*/
resetHighWaterMark(metricType: string): void {
this.waterMarks.delete(metricType);
this.lastUpdateTimes.delete(metricType);
}
/**
* Reset all high-water marks
*/
resetAllHighWaterMarks(): void {
this.waterMarks.clear();
this.lastUpdateTimes.clear();
}
/**
* Remove stale entries to avoid unbounded growth if metric types are variable.
* Entries not updated within maxAgeMs will be removed.
*/
cleanup(maxAgeMs: number = 3600000): void {
const cutoffTime = Date.now() - maxAgeMs;
for (const [metricType, lastTime] of this.lastUpdateTimes.entries()) {
if (lastTime < cutoffTime) {
this.lastUpdateTimes.delete(metricType);
this.waterMarks.delete(metricType);
}
}
}
}

View File

@@ -50,3 +50,5 @@ export type { TelemetryEvent } from './types.js';
export { SpanStatusCode, ValueType } from '@opentelemetry/api';
export { SemanticAttributes } from '@opentelemetry/semantic-conventions';
export * from './uiTelemetry.js';
export { HighWaterMarkTracker } from './high-water-mark-tracker.js';
export { RateLimiter } from './rate-limiter.js';

View File

@@ -0,0 +1,293 @@
/**
* @license
* Copyright 2025 Google LLC
* SPDX-License-Identifier: Apache-2.0
*/
import { describe, it, expect, beforeEach, vi } from 'vitest';
import { RateLimiter } from './rate-limiter.js';
describe('RateLimiter', () => {
let rateLimiter: RateLimiter;
beforeEach(() => {
rateLimiter = new RateLimiter(1000); // 1 second interval for testing
});
describe('constructor', () => {
it('should initialize with default interval', () => {
const defaultLimiter = new RateLimiter();
expect(defaultLimiter).toBeInstanceOf(RateLimiter);
});
it('should initialize with custom interval', () => {
const customLimiter = new RateLimiter(5000);
expect(customLimiter).toBeInstanceOf(RateLimiter);
});
it('should throw on negative interval', () => {
expect(() => new RateLimiter(-1)).toThrow(
'minIntervalMs must be non-negative.',
);
});
});
describe('shouldRecord', () => {
it('should allow first recording', () => {
const result = rateLimiter.shouldRecord('test_metric');
expect(result).toBe(true);
});
it('should block immediate subsequent recordings', () => {
rateLimiter.shouldRecord('test_metric'); // First call
const result = rateLimiter.shouldRecord('test_metric'); // Immediate second call
expect(result).toBe(false);
});
it('should allow recording after interval', () => {
vi.useFakeTimers();
rateLimiter.shouldRecord('test_metric'); // First call
// Advance time past interval
vi.advanceTimersByTime(1500);
const result = rateLimiter.shouldRecord('test_metric');
expect(result).toBe(true);
vi.useRealTimers();
});
it('should handle different metric keys independently', () => {
rateLimiter.shouldRecord('metric_a'); // First call for metric_a
const resultA = rateLimiter.shouldRecord('metric_a'); // Second call for metric_a
const resultB = rateLimiter.shouldRecord('metric_b'); // First call for metric_b
expect(resultA).toBe(false); // Should be blocked
expect(resultB).toBe(true); // Should be allowed
});
it('should use shorter interval for high priority events', () => {
vi.useFakeTimers();
rateLimiter.shouldRecord('test_metric', true); // High priority
// Advance time by half the normal interval
vi.advanceTimersByTime(500);
const result = rateLimiter.shouldRecord('test_metric', true);
expect(result).toBe(true); // Should be allowed due to high priority
vi.useRealTimers();
});
it('should still block high priority events if interval not met', () => {
vi.useFakeTimers();
rateLimiter.shouldRecord('test_metric', true); // High priority
// Advance time by less than half interval
vi.advanceTimersByTime(300);
const result = rateLimiter.shouldRecord('test_metric', true);
expect(result).toBe(false); // Should still be blocked
vi.useRealTimers();
});
});
describe('forceRecord', () => {
it('should update last record time', () => {
const before = rateLimiter.getTimeUntilNextAllowed('test_metric');
rateLimiter.forceRecord('test_metric');
const after = rateLimiter.getTimeUntilNextAllowed('test_metric');
expect(after).toBeGreaterThan(before);
});
it('should block subsequent recordings after force record', () => {
rateLimiter.forceRecord('test_metric');
const result = rateLimiter.shouldRecord('test_metric');
expect(result).toBe(false);
});
});
describe('getTimeUntilNextAllowed', () => {
it('should return 0 for new metric', () => {
const time = rateLimiter.getTimeUntilNextAllowed('new_metric');
expect(time).toBe(0);
});
it('should return correct time after recording', () => {
vi.useFakeTimers();
rateLimiter.shouldRecord('test_metric');
// Advance time partially
vi.advanceTimersByTime(300);
const timeRemaining = rateLimiter.getTimeUntilNextAllowed('test_metric');
expect(timeRemaining).toBeCloseTo(700, -1); // Approximately 700ms remaining
vi.useRealTimers();
});
it('should return 0 after interval has passed', () => {
vi.useFakeTimers();
rateLimiter.shouldRecord('test_metric');
// Advance time past interval
vi.advanceTimersByTime(1500);
const timeRemaining = rateLimiter.getTimeUntilNextAllowed('test_metric');
expect(timeRemaining).toBe(0);
vi.useRealTimers();
});
it('should account for high priority interval', () => {
vi.useFakeTimers();
rateLimiter.shouldRecord('hp_metric', true);
// After 300ms, with 1000ms base interval, half rounded is 500ms
vi.advanceTimersByTime(300);
const timeRemaining = rateLimiter.getTimeUntilNextAllowed(
'hp_metric',
true,
);
expect(timeRemaining).toBeCloseTo(200, -1);
vi.useRealTimers();
});
});
describe('getStats', () => {
it('should return empty stats initially', () => {
const stats = rateLimiter.getStats();
expect(stats).toEqual({
totalMetrics: 0,
oldestRecord: 0,
newestRecord: 0,
averageInterval: 0,
});
});
it('should return correct stats after recordings', () => {
vi.useFakeTimers();
rateLimiter.shouldRecord('metric_a');
vi.advanceTimersByTime(500);
rateLimiter.shouldRecord('metric_b');
vi.advanceTimersByTime(500);
rateLimiter.shouldRecord('metric_c');
const stats = rateLimiter.getStats();
expect(stats.totalMetrics).toBe(3);
expect(stats.averageInterval).toBeCloseTo(500, -1);
vi.useRealTimers();
});
it('should handle single recording correctly', () => {
rateLimiter.shouldRecord('test_metric');
const stats = rateLimiter.getStats();
expect(stats.totalMetrics).toBe(1);
expect(stats.averageInterval).toBe(0);
});
});
describe('reset', () => {
it('should clear all rate limiting state', () => {
rateLimiter.shouldRecord('metric_a');
rateLimiter.shouldRecord('metric_b');
rateLimiter.reset();
const stats = rateLimiter.getStats();
expect(stats.totalMetrics).toBe(0);
// Should allow immediate recording after reset
const result = rateLimiter.shouldRecord('metric_a');
expect(result).toBe(true);
});
});
describe('cleanup', () => {
it('should remove old entries', () => {
vi.useFakeTimers();
rateLimiter.shouldRecord('old_metric');
// Advance time beyond cleanup threshold
vi.advanceTimersByTime(4000000); // More than 1 hour
rateLimiter.cleanup(3600000); // 1 hour cleanup
// Should allow immediate recording of old metric after cleanup
const result = rateLimiter.shouldRecord('old_metric');
expect(result).toBe(true);
vi.useRealTimers();
});
it('should preserve recent entries', () => {
vi.useFakeTimers();
rateLimiter.shouldRecord('recent_metric');
// Advance time but not beyond cleanup threshold
vi.advanceTimersByTime(1800000); // 30 minutes
rateLimiter.cleanup(3600000); // 1 hour cleanup
// Should no longer be rate limited after 30 minutes (way past 1 minute default interval)
const result = rateLimiter.shouldRecord('recent_metric');
expect(result).toBe(true);
vi.useRealTimers();
});
it('should use default cleanup age', () => {
vi.useFakeTimers();
rateLimiter.shouldRecord('test_metric');
// Advance time beyond default cleanup (1 hour)
vi.advanceTimersByTime(4000000);
rateLimiter.cleanup(); // Use default age
const result = rateLimiter.shouldRecord('test_metric');
expect(result).toBe(true);
vi.useRealTimers();
});
});
describe('edge cases', () => {
it('should handle zero interval', () => {
const zeroLimiter = new RateLimiter(0);
zeroLimiter.shouldRecord('test_metric');
const result = zeroLimiter.shouldRecord('test_metric');
expect(result).toBe(true); // Should allow with zero interval
});
it('should handle very large intervals', () => {
const longLimiter = new RateLimiter(Number.MAX_SAFE_INTEGER);
longLimiter.shouldRecord('test_metric');
const timeRemaining = longLimiter.getTimeUntilNextAllowed('test_metric');
expect(timeRemaining).toBeGreaterThan(1000000);
});
});
});

View File

@@ -0,0 +1,124 @@
/**
* @license
* Copyright 2025 Google LLC
* SPDX-License-Identifier: Apache-2.0
*/
/**
* Rate limiter to prevent excessive telemetry recording
* Ensures we don't send metrics more frequently than specified limits
*/
export class RateLimiter {
private lastRecordTimes: Map<string, number> = new Map();
private readonly minIntervalMs: number;
private static readonly HIGH_PRIORITY_DIVISOR = 2;
constructor(minIntervalMs: number = 60000) {
if (minIntervalMs < 0) {
throw new Error('minIntervalMs must be non-negative.');
}
this.minIntervalMs = minIntervalMs;
}
/**
* Check if we should record a metric based on rate limiting
* @param metricKey - Unique key for the metric type/context
* @param isHighPriority - If true, uses shorter interval for critical events
* @returns true if metric should be recorded
*/
shouldRecord(metricKey: string, isHighPriority: boolean = false): boolean {
const now = Date.now();
const lastRecordTime = this.lastRecordTimes.get(metricKey) || 0;
// Use shorter interval for high priority events (e.g., memory leaks)
const interval = isHighPriority
? Math.round(this.minIntervalMs / RateLimiter.HIGH_PRIORITY_DIVISOR)
: this.minIntervalMs;
if (now - lastRecordTime >= interval) {
this.lastRecordTimes.set(metricKey, now);
return true;
}
return false;
}
/**
* Force record a metric (bypasses rate limiting)
* Use sparingly for critical events
*/
forceRecord(metricKey: string): void {
this.lastRecordTimes.set(metricKey, Date.now());
}
/**
* Get time until next allowed recording for a metric
*/
getTimeUntilNextAllowed(
metricKey: string,
isHighPriority: boolean = false,
): number {
const now = Date.now();
const lastRecordTime = this.lastRecordTimes.get(metricKey) || 0;
const interval = isHighPriority
? Math.round(this.minIntervalMs / RateLimiter.HIGH_PRIORITY_DIVISOR)
: this.minIntervalMs;
const nextAllowedTime = lastRecordTime + interval;
return Math.max(0, nextAllowedTime - now);
}
/**
* Get statistics about rate limiting
*/
getStats(): {
totalMetrics: number;
oldestRecord: number;
newestRecord: number;
averageInterval: number;
} {
const recordTimes = Array.from(this.lastRecordTimes.values());
if (recordTimes.length === 0) {
return {
totalMetrics: 0,
oldestRecord: 0,
newestRecord: 0,
averageInterval: 0,
};
}
const oldest = Math.min(...recordTimes);
const newest = Math.max(...recordTimes);
const totalSpan = newest - oldest;
const averageInterval =
recordTimes.length > 1 ? totalSpan / (recordTimes.length - 1) : 0;
return {
totalMetrics: recordTimes.length,
oldestRecord: oldest,
newestRecord: newest,
averageInterval,
};
}
/**
* Clear all rate limiting state
*/
reset(): void {
this.lastRecordTimes.clear();
}
/**
* Remove old entries to prevent memory leaks
*/
cleanup(maxAgeMs: number = 3600000): void {
const cutoffTime = Date.now() - maxAgeMs;
for (const [key, time] of this.lastRecordTimes.entries()) {
if (time < cutoffTime) {
this.lastRecordTimes.delete(key);
}
}
}
}