Skip to main content

Sliding-window event-frequency monitors that detect anomalous rates — both excessive and insufficient — via configurable thresholds and callbacks. Zero dependencies.

Project description

frequency-anomaly-reporter

CI PyPI version Python versions License

Sliding-window event-frequency monitors that detect anomalous rates — both excessive and insufficient — via configurable thresholds and callbacks. Zero runtime dependencies.

Table of Contents

Key Features

  • Sliding-window anomaly detection — Track how many events fall inside a moving time window you configure. When that count violates your threshold, your callback runs: too many events in a short span (ExcessiveFrequencyReporter), or too few after an initial grace period (InsufficientFrequencyReporter).
  • Both excessive and insufficient frequency monitors in a single package.
  • Supports both sync and async callbacks.
  • Start / stop lifecycle — every reporter can be explicitly stopped and re-armed with start(), giving callers full control over the monitoring lifecycle.
  • Per-component reporting-mode enumsExcessiveFrequencyReportingMode and InsufficientFrequencyReportingMode let you choose between continuous reporting and report-once-then-stop semantics.
  • Zero runtime dependencies. Only the Python standard library (time, collections, asyncio, enum).
  • Fully type-annotated with a py.typed marker (PEP 561). Works out of the box with mypy, pyright, and other type checkers.
  • Tested on Python 3.10 through 3.14.

Components Overview

Component Detects Callback fires when
ExcessiveFrequencyReporter Too many events in a sliding window events_in_window > max_events_in_window
InsufficientFrequencyReporter Too few events in a sliding window After grace, events_in_window < min_events_in_window

Installation

pip install frequency-anomaly-reporter

Or with uv:

uv add frequency-anomaly-reporter

ExcessiveFrequencyReporter

Monitors a stream of events and invokes a callback whenever the number of events within a sliding window exceeds a configured threshold. Useful for detecting error floods, and any scenario where "too often" signals a systemic problem.

Quick Start

import asyncio
import logging

from frequency_anomaly_reporter import (
    ExcessiveFrequencyReporter,
    ExcessiveFrequencyReporterConfig,
    ExcessiveFrequencyReportingMode,
)


class KafkaConsumerOrchestrator:
    def __init__(self, logger: logging.Logger) -> None:
        self._logger = logger
        config = ExcessiveFrequencyReporterConfig(
            window_duration_ms=60_000,
            max_events_in_window=3,
            on_excessive_frequency=self._on_too_many_disconnects,
            reporting_mode=ExcessiveFrequencyReportingMode.REPORT_ONCE_THEN_STOP,
        )
        self._reporter = ExcessiveFrequencyReporter(config)

    def start(self) -> None:
        self._reporter.start()

    def stop(self) -> None:
        self._reporter.stop()

    def _on_too_many_disconnects(self, event_count: int) -> None:
        self._logger.critical(
            "%d kafka disconnects in the last 60 s — triggering escalation",
            event_count,
        )

    async def handle_kafka_disconnect(self) -> None:
        # ... reconnection logic ...
        await self._reporter.record_event()

Realistic Usage

The reporter is typically owned by the component that monitors events, rather than constructed and passed around as a standalone object. The component exposes its own start() and stop() methods that delegate to the reporter, keeping the lifecycle fully encapsulated.

class PrivilegeEscalationMonitor:
    def __init__(self, siem_client: SiemClient) -> None:
        self._siem_client = siem_client
        config = ExcessiveFrequencyReporterConfig(
            # window_duration_ms=...,
            # max_events_in_window=...,
            on_excessive_frequency=self._on_escalation_flood,
            # reporting_mode=...,
        )
        self._reporter = ExcessiveFrequencyReporter(config)

    def start(self) -> None:
        self._reporter.start()

    def stop(self) -> None:
        self._reporter.stop()

    async def _on_escalation_flood(self, event_count: int) -> None:
        await self._siem_client.alert(
            severity="CRITICAL",
            detail=f"{event_count} unauthorised sudo attempts in window",
        )

    async def process_audit_entry(self, entry: AuditEntry) -> None:
        if entry.event_type == AuditEventType.SUDO_FAILURE:
            await self._reporter.record_event()

API

ExcessiveFrequencyReporterConfig

All fields are required — no defaults are provided, so every configuration decision is made explicitly.

Field Type Description
window_duration_ms int Length of the sliding window in milliseconds. Must be positive.
max_events_in_window int Maximum number of events allowed within the window before the callback is invoked. The callback fires when the count exceeds this value. Must be positive.
on_excessive_frequency Callable[[int], Any] Callback invoked when the threshold is exceeded. Receives the current number of events in the window. May be sync or async.
reporting_mode ExcessiveFrequencyReportingMode Controls callback invocation behaviour after the first breach. See Reporting Modes.

Validation runs in __post_init__ and raises ValueError or TypeError on invalid inputs, so errors surface at config-creation time.

ExcessiveFrequencyReporter

Member Kind Description
ExcessiveFrequencyReporter(config) constructor Create a reporter from an ExcessiveFrequencyReporterConfig. The reporter starts inactive; call start() to begin monitoring.
await record_event() async method Record an event. Prunes stale timestamps, appends the new one, and invokes the callback if the threshold is exceeded. Silent no-op when the reporter is inactive.
start() method Activate the reporter without touching window contents. To start with a clean slate, call clear() first. Idempotent — no-op if already active.
stop() method Deactivate the reporter without touching window contents. To discard accumulated history, call clear() explicitly. Idempotent — no-op if already stopped.
clear() method Remove all recorded timestamps from the sliding window. Independent of active/inactive state — can be called at any time. Use when past event history is no longer relevant (e.g. after a successful recovery).
is_active property True if the reporter is currently active.
current_window_event_count property Number of events currently within the sliding window. Evicts stale timestamps before counting, so the value always reflects the true trailing window. Useful for live metrics.

Reporting Modes

ExcessiveFrequencyReportingMode controls what happens after the frequency threshold is first exceeded:

Value Behavior
REPORT_EVERY Invoke the callback on every record_event() call that finds the window over the threshold. Best for metrics and counters.
REPORT_ONCE_THEN_STOP Invoke the callback once on the first breach, then automatically stop the reporter. Further record_event() calls become silent no-ops until start() is called to re-arm. Best for fatal escalation where intervention is expected.

InsufficientFrequencyReporter

Monitors a stream of events and invokes a callback when the sliding window contains too few events — for example missed heartbeats. A background checker task wakes every check_interval_ms and compares the live event count to min_events_in_window. record_event() is synchronous and only appends a timestamp; it does not invoke the callback (unlike ExcessiveFrequencyReporter, where await record_event() may invoke the callback immediately).

Realistic Usage

The reporter is usually owned by the component that receives heartbeats or ticks, which forwards record_event() and exposes start() / stop() on its own lifecycle.

from frequency_anomaly_reporter import (
    InsufficientFrequencyReporter,
    InsufficientFrequencyReporterConfig,
    InsufficientFrequencyReportingMode,
)


class HeartbeatMonitor:
    """Expect a heartbeat at least once per 30 s window; alert when the window is empty."""

    def __init__(self, alerting_client: AlertingClient) -> None:
        self._alerting_client = alerting_client
        config = InsufficientFrequencyReporterConfig(
            window_duration_ms=30_000,
            min_events_in_window=1,
            check_interval_ms=5_000,
            on_insufficient_frequency=self._on_heartbeat_silence,
            reporting_mode=InsufficientFrequencyReportingMode.REPORT_EVERY,
        )
        self._reporter = InsufficientFrequencyReporter(config)

    def start(self) -> None:
        self._reporter.start()

    def stop(self) -> None:
        self._reporter.stop()

    async def _on_heartbeat_silence(self, event_count: int) -> None:
        await self._alerting_client.alert(
            severity="WARNING",
            detail=f"Only {event_count} heartbeats in window",
        )

    def on_heartbeat_received(self) -> None:
        self._reporter.record_event()

With min_events_in_window=1, the callback runs when the count is below 1 (so event_count is 0) after the grace period. A 30 s window lines up with “nothing in the last 30 seconds”; check_interval_ms=5_000 controls how often the checker runs (see Liveness and Heartbeat Monitoring).

API

InsufficientFrequencyReporterConfig

All five fields in the table below are required — no defaults, so every choice is explicit. Optional _monotonic_ns and _sleep exist for tests and dependency injection; they are not part of the supported public contract for application code.

Field Type Description
window_duration_ms int Sliding window length in milliseconds (must be positive). After each successful start(), the reporter also waits this long (grace) before it may call the callback, so the window is not judged “too quiet” immediately.
min_events_in_window int Minimum number of events expected inside the trailing window. The callback runs when the count is strictly less than this value (after grace). Must be positive.
check_interval_ms int How often the background checker evaluates the window. Must be positive.
on_insufficient_frequency Callable[[int], Any] Invoked when the window is under-filled. Receives the current count in the window. May be sync or async. Must not raise — an unhandled exception would end the checker task.
reporting_mode InsufficientFrequencyReportingMode Controls repeat behaviour. See Reporting Modes.

Validation runs in __post_init__ and raises ValueError or TypeError on invalid inputs.

InsufficientFrequencyReporter

Member Kind Description
InsufficientFrequencyReporter(config) constructor Creates an inactive reporter; call start() from async code to begin monitoring.
start() method Records grace start time and spawns the periodic checker task. Requires a running event loop. Idempotent — no-op if already active.
stop() method Deactivates the reporter and cancels the checker. Does not clear timestamps. Idempotent — no-op if already stopped.
clear() method Removes all recorded timestamps. Independent of active/inactive state.
record_event() sync method When active, appends one monotonic timestamp. Does not call the callback. No-op when inactive.
is_active property True while the checker task is running.
current_window_event_count property Count of events in the trailing window after evicting stale timestamps.

Reporting Modes

InsufficientFrequencyReportingMode controls what happens when the checker finds the window below min_events_in_window:

Value Behaviour
REPORT_EVERY Call on_insufficient_frequency on every checker cycle that still sees a shortfall.
REPORT_ONCE_THEN_STOP Call the callback once on the first shortfall, then stop the reporter. Further checker cycles do nothing until start() is called to re-arm.

Use Cases

Distinguishing Error from Fatal

A Kafka consumer that disconnects once is an error — something to log and recover from. A consumer that disconnects three times in a minute has a systemic problem that may warrant crashing the application or escalating to an engineer. The ExcessiveFrequencyReporter turns this judgment call into a configurable, reusable component: set a threshold, supply a callback, and let the reporter decide when an error pattern becomes fatal.

Liveness and Heartbeat Monitoring

An upstream service that sends heartbeats every 5 seconds is healthy. If no heartbeat arrives for 30 seconds, something is wrong. The InsufficientFrequencyReporter detects this silence and triggers an alert — without requiring the heartbeat consumer to embed that logic itself. A concrete pattern is in InsufficientFrequencyReporter — Realistic Usage.

Observability Integration

Both reporters integrate naturally with observability stacks. The callback can increment a Prometheus counter, emit a StatsD metric, send a structured log event, or publish to a message bus — without coupling the monitored component to any specific observability backend.

Design Notes

Why time.monotonic_ns()

Timestamps are captured via time.monotonic_ns(), which returns an int in nanoseconds. This choice gives two benefits:

  1. Monotonic clock — immune to system clock adjustments (NTP jumps, daylight saving changes). A wall-clock source like time.time() could cause the sliding window to behave incorrectly if the clock is set backwards.
  2. Integer storageint is the slimmest representation for a timestamp in Python. No float rounding, no datetime object overhead. The internal deque[int] keeps memory usage minimal.

Development

git clone https://github.com/ori88c-python-packages/frequency-anomaly-reporter.git
cd frequency-anomaly-reporter
uv sync

# Run tests
uv run pytest

# Lint and format
uv run ruff check .
uv run ruff format .

# Type check
uv run mypy src

License

Apache 2.0

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

frequency_anomaly_reporter-1.0.2.tar.gz (44.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

frequency_anomaly_reporter-1.0.2-py3-none-any.whl (18.5 kB view details)

Uploaded Python 3

File details

Details for the file frequency_anomaly_reporter-1.0.2.tar.gz.

File metadata

  • Download URL: frequency_anomaly_reporter-1.0.2.tar.gz
  • Upload date:
  • Size: 44.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.10.11 {"installer":{"name":"uv","version":"0.10.11","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":null,"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for frequency_anomaly_reporter-1.0.2.tar.gz
Algorithm Hash digest
SHA256 89f7eefa88d516a819e7c998cb2ead0c5b82eaac8d64d63575b1424178ef3769
MD5 bee7b0caf4556d1f2a85cd2a8623d753
BLAKE2b-256 4f03d7e698d810fb93589801cf35793097ca42e2cb0f1e8bc22a9fb3eb652fb7

See more details on using hashes here.

File details

Details for the file frequency_anomaly_reporter-1.0.2-py3-none-any.whl.

File metadata

  • Download URL: frequency_anomaly_reporter-1.0.2-py3-none-any.whl
  • Upload date:
  • Size: 18.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.10.11 {"installer":{"name":"uv","version":"0.10.11","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":null,"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for frequency_anomaly_reporter-1.0.2-py3-none-any.whl
Algorithm Hash digest
SHA256 13a5a6c9ad6cc82679465e3feef089791697846c26dfeb775054c6bd25468965
MD5 5f496473170249d85660f2c5399a3065
BLAKE2b-256 425173bd5ae97575d148f2fd3768e9ed0a9f4df1e69c524a2e6c93bf1215cdc8

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page