Skip to main content

Declarative KPI computation and alerting framework with SQL/DataFrame backends, period-over-period comparisons, and built-in alerting

Project description

kpi-engine

A declarative framework for defining, computing, and alerting on KPIs from SQL or DataFrames — with built-in period-over-period comparisons.

PyPI version Python License: MIT


Overview

kpi-engine brings structure and repeatability to business metrics. Instead of writing ad-hoc SQL queries and notebook cells to compute KPIs, you define them once in a declarative YAML or Python DSL — then kpi-engine handles computation, historical comparisons, trend analysis, and alerting automatically.

Supported backends: PostgreSQL, MySQL, SQLite, BigQuery, Snowflake (via SQLAlchemy) and pandas DataFrames.


Installation

pip install kpi-engine

With optional extras:

pip install "kpi-engine[alerts]"     # Slack, email, PagerDuty
pip install "kpi-engine[server]"     # FastAPI REST server
pip install "kpi-engine[scheduler]"  # Cron scheduling

Quick Start

From a YAML file

# kpis.yaml
kpis:
  - name: monthly_revenue
    label: Monthly Revenue
    source: sql
    query: >
      SELECT SUM(amount) FROM orders
      WHERE order_date >= '{{ period_start }}'::date
        AND order_date <  '{{ period_end }}'::date
    aggregation: sum
    unit: USD
    compare: [MoM, YoY]
    alerts:
      - condition: "< 100000"
        severity: critical
        message: Revenue dropped below $100K
from sqlalchemy import create_engine
from kpi_engine import KPIEngine

engine = KPIEngine.from_yaml(
    "kpis.yaml",
    connection=create_engine("postgresql://user:pass@host/db")
)

results = engine.run(period="last_month")

for kpi in results:
    print(f"{kpi.label}: {kpi.value:,.2f} {kpi.unit}")
    if "MoM" in kpi.comparisons:
        print(f"  MoM: {kpi.mom_change_pct:+.1f}%")
    print(f"  Status: {kpi.alert_status}")

From Python directly

from kpi_engine import KPIEngine
from kpi_engine.models import KPIDefinition, Alert
import pandas as pd

df = pd.DataFrame({
    "revenue": [1000, 2000, 3000],
    "order_date": pd.to_datetime(["2024-11-01", "2024-11-15", "2024-11-28"]),
})

kpis = [
    KPIDefinition(
        name="revenue",
        label="Monthly Revenue",
        source="dataframe",
        aggregation="sum",
        unit="USD",
        query="orders.revenue",   # "table.column" format
        compare=["MoM"],
        alerts=[Alert(condition="< 1000", severity="warning")],
    )
]

engine = KPIEngine(kpis=kpis, dataframes={"orders": df})
results = engine.run(period="2024-11")

How It Works

KPI Definitions (YAML or Python DSL)
        │
        ▼
┌─────────────────────────────┐
│      KPI Registry            │  ← Parses and validates all KPI definitions
└──────────────┬──────────────┘
               ▼
┌─────────────────────────────┐
│     Period Resolver          │  ← Converts "last_month", "2024-Q3", "yesterday"
│                              │     into concrete start/end datetime pairs
└──────────────┬──────────────┘
               ▼
┌─────────────────────────────────────────────────────┐
│                   Computation Engine                 │
│  ┌──────────────┐   ┌──────────────┐  Derived KPI  │
│  │  SQL Backend  │   │  DataFrame   │  (expression) │
│  │  (SQLAlchemy) │   │  Backend     │               │
│  └──────────────┘   └──────────────┘               │
└──────────────┬──────────────────────────────────────┘
               ▼
┌─────────────────────────────┐
│  Period-over-Period Comparator│  ← Computes Δ and Δ%
└──────────────┬──────────────┘
               ▼
┌─────────────────────────────┐
│     Alert Evaluator          │  ← Threshold, change %, anomaly rules
└──────────────┬──────────────┘
               ▼
┌─────────────────────────────┐
│     Alert Dispatcher         │  ← Slack, email, PagerDuty, webhooks
└──────────────┬──────────────┘
               ▼
┌─────────────────────────────┐
│     KPIResult + Audit Log    │  ← Structured result + CSV/SQLite history
└─────────────────────────────┘

Period Strings

Input Resolves To
"yesterday" Previous calendar day
"last_week" Mon–Sun of the previous week
"last_month" Full previous calendar month
"last_quarter" Previous Q1/Q2/Q3/Q4
"2024-Q3" July 1 – September 30, 2024
"2024-11" All of November 2024

KPI Sources

SQL Backend

Queries run via SQLAlchemy. Use Jinja2 template variables {{ period_start }} and {{ period_end }} in your query:

KPIDefinition(
    name="signups",
    label="New Signups",
    source="sql",
    aggregation="count",
    query="SELECT COUNT(*) FROM users WHERE created_at >= '{{ period_start }}'",
)

DataFrame Backend

Pass a dict of DataFrames. Use "table.column" in the query field:

KPIDefinition(
    name="revenue",
    source="dataframe",
    aggregation="sum",
    query="sales.amount",   # sales DataFrame, amount column
)

Aggregations: sum, avg, count, last, rate

Derived KPIs

Computed from already-resolved KPI values using a Python expression:

KPIDefinition(
    name="arpu",
    label="ARPU",
    source="derived",
    expression="revenue / active_users",
    unit="USD",
)

Derived KPIs always run after their dependencies. The engine builds a DAG automatically.


Alerts

Condition syntax

Condition Triggers when
"< 1000" value is below 1000
"> 0.15" value is above 0.15
"<= 100" value is at most 100
">= 500" value is at least 500
"== 0" value equals 0

Alert channels

Slack:

from kpi_engine.alerts import SlackChannel

engine = KPIEngine(
    kpis=kpis,
    alert_channels=[SlackChannel(webhook_url="https://hooks.slack.com/...")]
)

Email:

from kpi_engine.alerts import EmailChannel

EmailChannel(
    smtp_host="smtp.gmail.com", smtp_port=587,
    from_email="alerts@company.com",
    to_emails=["team@company.com"],
    username="alerts@company.com", password="..."
)

PagerDuty:

from kpi_engine.alerts import PagerDutyChannel

PagerDutyChannel(integration_key="your-integration-key")

REST API

pip install "kpi-engine[server]"
engine.serve(port=8000)
Endpoint Description
GET /kpis?period=last_month Compute all KPIs
GET /kpis/{name}?period=2024-11 Compute a single KPI
GET /kpis/{name}/history?n=10 Last n results

Scheduling

pip install "kpi-engine[scheduler]"
scheduler = engine.schedule(
    cron="0 9 1 * *",          # 1st of every month at 9am UTC
    period_fn=lambda: "last_month",
    callback=lambda results: print(f"Done: {len(results)} KPIs")
)
# runs in a background daemon thread
# scheduler.stop() to cancel

Audit Log

engine = KPIEngine(kpis=kpis, connection=conn, audit_log="audit.csv")
# or
engine = KPIEngine(kpis=kpis, connection=conn, audit_log="audit.db")  # SQLite

Every engine.run() call appends results to the audit log automatically.


API Reference

KPIEngine

KPIEngine(
    kpis: list[KPIDefinition],
    connection=None,           # SQLAlchemy engine
    dataframes: dict = None,   # {"table_name": pd.DataFrame}
    alert_channels: list = None,
    audit_log: str = None      # path to .csv or .db file
)
Method Returns Description
engine.run(period) list[KPIResult] Compute all KPIs
engine.run_kpi(name, period) KPIResult Compute one KPI
engine.history(name, n) list[KPIResult] Last n results
engine.schedule(cron, period_fn) KPIScheduler Schedule recurring runs
engine.serve(port) Start REST API (blocking)
KPIEngine.from_yaml(path, ...) KPIEngine Load from YAML config

KPIResult

result.value                # float
result.unit                 # str
result.alert_status         # "ok" | "warning" | "critical"
result.comparisons          # dict[str, ComparisonResult]
result.alerts_triggered     # list[AlertResult]
result.mom_change_pct       # float | None
result.yoy_change_pct       # float | None
result.period_start         # datetime
result.period_end           # datetime
result.query_duration_ms    # float

KPIDefinition fields

Field Type Description
name str Unique identifier
label str Human-readable name
source str "sql" | "dataframe" | "derived"
aggregation str "sum" | "avg" | "count" | "rate" | "last"
query str SQL template or "table.column"
expression str Python expression for derived KPIs
compare list[str] ["MoM", "YoY", "QoQ", "WoW", "DoD"]
polarity str "higher_is_better" | "lower_is_better"
alerts list[Alert] Alert definitions
unit str Display unit (e.g. "USD", "%")

Project Structure

kpi-engine/
├── kpi_engine/
│   ├── engine.py               # KPIEngine orchestrator
│   ├── registry.py             # KPI registry and validation
│   ├── models.py               # KPIDefinition, KPIResult, Alert dataclasses
│   ├── period.py               # Period resolution logic
│   ├── backends/
│   │   ├── base.py             # BaseBackend abstract class
│   │   ├── sql.py              # SQLAlchemy backend
│   │   ├── dataframe.py        # Pandas backend
│   │   └── derived.py          # Derived KPI expression evaluator
│   ├── comparator.py           # Period-over-period comparison
│   ├── alerts/
│   │   ├── evaluator.py        # Alert threshold evaluation
│   │   ├── dispatcher.py       # Routes alerts to channels
│   │   ├── slack.py            # Slack webhook channel
│   │   ├── email.py            # SMTP email channel
│   │   └── pagerduty.py        # PagerDuty Events API channel
│   ├── scheduler.py            # Cron-based scheduling
│   ├── audit.py                # Audit log (CSV or SQLite)
│   └── server.py               # FastAPI REST server
└── tests/

Changelog

v1.0.1

  • Added full PyPI metadata: classifiers, keywords, author, project URLs
  • Exposed __version__ from package root

v1.0.0

  • Declarative KPI definition (YAML + Python DSL)
  • SQL backend (SQLAlchemy + Jinja2 templates)
  • DataFrame backend (pandas, "table.column" query syntax)
  • Derived KPI expressions with automatic DAG resolution
  • Period-over-period comparisons: DoD, WoW, MoM, QoQ, YoY
  • Alert conditions: threshold, with Slack, email, and PagerDuty channels
  • FastAPI REST server (engine.serve())
  • Cron scheduling via croniter (engine.schedule())
  • Audit log to CSV or SQLite

License

MIT — see LICENSE


Contributing

PRs welcome. Add tests for new KPI types and alert conditions.

pip install -e ".[dev]"
pytest tests/ -v

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

kpi_engine-1.0.1.tar.gz (24.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

kpi_engine-1.0.1-py3-none-any.whl (21.6 kB view details)

Uploaded Python 3

File details

Details for the file kpi_engine-1.0.1.tar.gz.

File metadata

  • Download URL: kpi_engine-1.0.1.tar.gz
  • Upload date:
  • Size: 24.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.12

File hashes

Hashes for kpi_engine-1.0.1.tar.gz
Algorithm Hash digest
SHA256 62805cf145246c8b9d330933adec517e58b94545bc3ed04a28e198c95cac3792
MD5 1f4f5cf92864a1d17f174fabef005a2a
BLAKE2b-256 f733ec44cf8c3e17ce6efbb77ea2101603bfec559bfac4adb82d6b78966544a4

See more details on using hashes here.

File details

Details for the file kpi_engine-1.0.1-py3-none-any.whl.

File metadata

  • Download URL: kpi_engine-1.0.1-py3-none-any.whl
  • Upload date:
  • Size: 21.6 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.12

File hashes

Hashes for kpi_engine-1.0.1-py3-none-any.whl
Algorithm Hash digest
SHA256 7feb8d16f5f630e3842b74e659b663eae4d12e4f11f858ac6b8212481575875f
MD5 349300b9a9eb9ed9dac763e79563cc1b
BLAKE2b-256 c21f98c8a4b02a69f8563886b384914d87262bbbc9f1241c17962db77613c3e8

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page