Sliding-window event-frequency monitors that detect anomalous rates — both excessive and insufficient — via configurable thresholds and callbacks. Zero dependencies.
Project description
frequency-anomaly-reporter
Sliding-window event-frequency monitors that detect anomalous rates — both excessive and insufficient — via configurable thresholds and callbacks. Zero runtime dependencies.
Table of Contents
- Key Features
- Components Overview
- Installation
- ExcessiveFrequencyReporter
- InsufficientFrequencyReporter
- Use Cases
- Design Notes
- Development
- License
Key Features
- Sliding-window anomaly detection — Track how many events fall inside a moving time window you configure. When that count violates your threshold, your callback runs: too many events in a short span (
ExcessiveFrequencyReporter), or too few after an initial grace period (InsufficientFrequencyReporter). - Both excessive and insufficient frequency monitors in a single package.
- Supports both sync and async callbacks.
- Start / stop lifecycle — every reporter can be explicitly stopped and re-armed with
start(), giving callers full control over the monitoring lifecycle. - Per-component reporting-mode enums —
ExcessiveFrequencyReportingModeandInsufficientFrequencyReportingModelet you choose between continuous reporting and report-once-then-stop semantics. - Zero runtime dependencies. Only the Python standard library (
time,collections,asyncio,enum). - Fully type-annotated with a
py.typedmarker (PEP 561). Works out of the box with mypy, pyright, and other type checkers. - Tested on Python 3.10 through 3.14.
Components Overview
| Component | Detects | Callback fires when |
|---|---|---|
ExcessiveFrequencyReporter |
Too many events in a sliding window | events_in_window > max_events_in_window |
InsufficientFrequencyReporter |
Too few events in a sliding window | After grace, events_in_window < min_events_in_window |
Installation
pip install frequency-anomaly-reporter
Or with uv:
uv add frequency-anomaly-reporter
ExcessiveFrequencyReporter
Monitors a stream of events and invokes a callback whenever the number of events within a sliding window exceeds a configured threshold. Useful for detecting error floods, and any scenario where "too often" signals a systemic problem.
Quick Start
import asyncio
import logging
from frequency_anomaly_reporter import (
ExcessiveFrequencyReporter,
ExcessiveFrequencyReporterConfig,
ExcessiveFrequencyReportingMode,
)
class KafkaConsumerOrchestrator:
def __init__(self, logger: logging.Logger) -> None:
self._logger = logger
config = ExcessiveFrequencyReporterConfig(
window_duration_ms=60_000,
max_events_in_window=3,
on_excessive_frequency=self._on_too_many_disconnects,
reporting_mode=ExcessiveFrequencyReportingMode.REPORT_ONCE_THEN_STOP,
)
self._reporter = ExcessiveFrequencyReporter(config)
def start(self) -> None:
self._reporter.start()
def stop(self) -> None:
self._reporter.stop()
def _on_too_many_disconnects(self, event_count: int) -> None:
self._logger.critical(
"%d kafka disconnects in the last 60 s — triggering escalation",
event_count,
)
async def handle_kafka_disconnect(self) -> None:
# ... reconnection logic ...
await self._reporter.record_event()
Realistic Usage
The reporter is typically owned by the component that monitors events, rather than constructed and passed around as a standalone object. The component exposes its own start() and stop() methods that delegate to the reporter, keeping the lifecycle fully encapsulated.
class PrivilegeEscalationMonitor:
def __init__(self, siem_client: SiemClient) -> None:
self._siem_client = siem_client
config = ExcessiveFrequencyReporterConfig(
# window_duration_ms=...,
# max_events_in_window=...,
on_excessive_frequency=self._on_escalation_flood,
# reporting_mode=...,
)
self._reporter = ExcessiveFrequencyReporter(config)
def start(self) -> None:
self._reporter.start()
def stop(self) -> None:
self._reporter.stop()
async def _on_escalation_flood(self, event_count: int) -> None:
await self._siem_client.alert(
severity="CRITICAL",
detail=f"{event_count} unauthorised sudo attempts in window",
)
async def process_audit_entry(self, entry: AuditEntry) -> None:
if entry.event_type == AuditEventType.SUDO_FAILURE:
await self._reporter.record_event()
API
ExcessiveFrequencyReporterConfig
All fields are required — no defaults are provided, so every configuration decision is made explicitly.
| Field | Type | Description |
|---|---|---|
window_duration_ms |
int |
Length of the sliding window in milliseconds. Must be positive. |
max_events_in_window |
int |
Maximum number of events allowed within the window before the callback is invoked. The callback fires when the count exceeds this value. Must be positive. |
on_excessive_frequency |
Callable[[int], Any] |
Callback invoked when the threshold is exceeded. Receives the current number of events in the window. May be sync or async. |
reporting_mode |
ExcessiveFrequencyReportingMode |
Controls callback invocation behaviour after the first breach. See Reporting Modes. |
Validation runs in __post_init__ and raises ValueError or TypeError on invalid inputs, so errors surface at config-creation time.
ExcessiveFrequencyReporter
| Member | Kind | Description |
|---|---|---|
ExcessiveFrequencyReporter(config) |
constructor | Create a reporter from an ExcessiveFrequencyReporterConfig. The reporter starts inactive; call start() to begin monitoring. |
await record_event() |
async method | Record an event. Prunes stale timestamps, appends the new one, and invokes the callback if the threshold is exceeded. Silent no-op when the reporter is inactive. |
start() |
method | Activate the reporter without touching window contents. To start with a clean slate, call clear() first. Idempotent — no-op if already active. |
stop() |
method | Deactivate the reporter without touching window contents. To discard accumulated history, call clear() explicitly. Idempotent — no-op if already stopped. |
clear() |
method | Remove all recorded timestamps from the sliding window. Independent of active/inactive state — can be called at any time. Use when past event history is no longer relevant (e.g. after a successful recovery). |
is_active |
property | True if the reporter is currently active. |
current_window_event_count |
property | Number of events currently within the sliding window. Evicts stale timestamps before counting, so the value always reflects the true trailing window. Useful for live metrics. |
Reporting Modes
ExcessiveFrequencyReportingMode controls what happens after the frequency threshold is first exceeded:
| Value | Behavior |
|---|---|
REPORT_EVERY |
Invoke the callback on every record_event() call that finds the window over the threshold. Best for metrics and counters. |
REPORT_ONCE_THEN_STOP |
Invoke the callback once on the first breach, then automatically stop the reporter. Further record_event() calls become silent no-ops until start() is called to re-arm. Best for fatal escalation where intervention is expected. |
InsufficientFrequencyReporter
Monitors a stream of events and invokes a callback when the sliding window contains too few events — for example missed heartbeats. A background checker task wakes every check_interval_ms and compares the live event count to min_events_in_window. record_event() is synchronous and only appends a timestamp; it does not invoke the callback (unlike ExcessiveFrequencyReporter, where await record_event() may invoke the callback immediately).
Realistic Usage
The reporter is usually owned by the component that receives heartbeats or ticks, which forwards record_event() and exposes start() / stop() on its own lifecycle.
from frequency_anomaly_reporter import (
InsufficientFrequencyReporter,
InsufficientFrequencyReporterConfig,
InsufficientFrequencyReportingMode,
)
class HeartbeatMonitor:
"""Expect a heartbeat at least once per 30 s window; alert when the window is empty."""
def __init__(self, alerting_client: AlertingClient) -> None:
self._alerting_client = alerting_client
config = InsufficientFrequencyReporterConfig(
window_duration_ms=30_000,
min_events_in_window=1,
check_interval_ms=5_000,
on_insufficient_frequency=self._on_heartbeat_silence,
reporting_mode=InsufficientFrequencyReportingMode.REPORT_EVERY,
)
self._reporter = InsufficientFrequencyReporter(config)
def start(self) -> None:
self._reporter.start()
def stop(self) -> None:
self._reporter.stop()
async def _on_heartbeat_silence(self, event_count: int) -> None:
await self._alerting_client.alert(
severity="WARNING",
detail=f"Only {event_count} heartbeats in window",
)
def on_heartbeat_received(self) -> None:
self._reporter.record_event()
With min_events_in_window=1, the callback runs when the count is below 1 (so event_count is 0) after the grace period. A 30 s window lines up with “nothing in the last 30 seconds”; check_interval_ms=5_000 controls how often the checker runs (see Liveness and Heartbeat Monitoring).
API
InsufficientFrequencyReporterConfig
All five fields in the table below are required — no defaults, so every choice is explicit. Optional _monotonic_ns and _sleep exist for tests and dependency injection; they are not part of the supported public contract for application code.
| Field | Type | Description |
|---|---|---|
window_duration_ms |
int |
Sliding window length in milliseconds (must be positive). After each successful start(), the reporter also waits this long (grace) before it may call the callback, so the window is not judged “too quiet” immediately. |
min_events_in_window |
int |
Minimum number of events expected inside the trailing window. The callback runs when the count is strictly less than this value (after grace). Must be positive. |
check_interval_ms |
int |
How often the background checker evaluates the window. Must be positive. |
on_insufficient_frequency |
Callable[[int], Any] |
Invoked when the window is under-filled. Receives the current count in the window. May be sync or async. Must not raise — an unhandled exception would end the checker task. |
reporting_mode |
InsufficientFrequencyReportingMode |
Controls repeat behaviour. See Reporting Modes. |
Validation runs in __post_init__ and raises ValueError or TypeError on invalid inputs.
InsufficientFrequencyReporter
| Member | Kind | Description |
|---|---|---|
InsufficientFrequencyReporter(config) |
constructor | Creates an inactive reporter; call start() from async code to begin monitoring. |
start() |
method | Records grace start time and spawns the periodic checker task. Requires a running event loop. Idempotent — no-op if already active. |
stop() |
method | Deactivates the reporter and cancels the checker. Does not clear timestamps. Idempotent — no-op if already stopped. |
clear() |
method | Removes all recorded timestamps. Independent of active/inactive state. |
record_event() |
sync method | When active, appends one monotonic timestamp. Does not call the callback. No-op when inactive. |
is_active |
property | True while the checker task is running. |
current_window_event_count |
property | Count of events in the trailing window after evicting stale timestamps. |
Reporting Modes
InsufficientFrequencyReportingMode controls what happens when the checker finds the window below min_events_in_window:
| Value | Behaviour |
|---|---|
REPORT_EVERY |
Call on_insufficient_frequency on every checker cycle that still sees a shortfall. |
REPORT_ONCE_THEN_STOP |
Call the callback once on the first shortfall, then stop the reporter. Further checker cycles do nothing until start() is called to re-arm. |
Use Cases
Distinguishing Error from Fatal
A Kafka consumer that disconnects once is an error — something to log and recover from. A consumer that disconnects three times in a minute has a systemic problem that may warrant crashing the application or escalating to an engineer. The ExcessiveFrequencyReporter turns this judgment call into a configurable, reusable component: set a threshold, supply a callback, and let the reporter decide when an error pattern becomes fatal.
Liveness and Heartbeat Monitoring
An upstream service that sends heartbeats every 5 seconds is healthy. If no heartbeat arrives for 30 seconds, something is wrong. The InsufficientFrequencyReporter detects this silence and triggers an alert — without requiring the heartbeat consumer to embed that logic itself. A concrete pattern is in InsufficientFrequencyReporter — Realistic Usage.
Observability Integration
Both reporters integrate naturally with observability stacks. The callback can increment a Prometheus counter, emit a StatsD metric, send a structured log event, or publish to a message bus — without coupling the monitored component to any specific observability backend.
Design Notes
Why time.monotonic_ns()
Timestamps are captured via time.monotonic_ns(), which returns an int in nanoseconds. This choice gives two benefits:
- Monotonic clock — immune to system clock adjustments (NTP jumps, daylight saving changes). A wall-clock source like
time.time()could cause the sliding window to behave incorrectly if the clock is set backwards. - Integer storage —
intis the slimmest representation for a timestamp in Python. Nofloatrounding, nodatetimeobject overhead. The internaldeque[int]keeps memory usage minimal.
Development
git clone https://github.com/ori88c-python-packages/frequency-anomaly-reporter.git
cd frequency-anomaly-reporter
uv sync
# Run tests
uv run pytest
# Lint and format
uv run ruff check .
uv run ruff format .
# Type check
uv run mypy src
License
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file frequency_anomaly_reporter-1.0.2.tar.gz.
File metadata
- Download URL: frequency_anomaly_reporter-1.0.2.tar.gz
- Upload date:
- Size: 44.2 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.10.11 {"installer":{"name":"uv","version":"0.10.11","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":null,"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
89f7eefa88d516a819e7c998cb2ead0c5b82eaac8d64d63575b1424178ef3769
|
|
| MD5 |
bee7b0caf4556d1f2a85cd2a8623d753
|
|
| BLAKE2b-256 |
4f03d7e698d810fb93589801cf35793097ca42e2cb0f1e8bc22a9fb3eb652fb7
|
File details
Details for the file frequency_anomaly_reporter-1.0.2-py3-none-any.whl.
File metadata
- Download URL: frequency_anomaly_reporter-1.0.2-py3-none-any.whl
- Upload date:
- Size: 18.5 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.10.11 {"installer":{"name":"uv","version":"0.10.11","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":null,"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
13a5a6c9ad6cc82679465e3feef089791697846c26dfeb775054c6bd25468965
|
|
| MD5 |
5f496473170249d85660f2c5399a3065
|
|
| BLAKE2b-256 |
425173bd5ae97575d148f2fd3768e9ed0a9f4df1e69c524a2e6c93bf1215cdc8
|