Declarative KPI computation and alerting framework with SQL/DataFrame backends, period-over-period comparisons, and built-in alerting
Project description
kpi-engine
A declarative framework for defining, computing, and alerting on KPIs from SQL or DataFrames — with built-in period-over-period comparisons.
Overview
kpi-engine brings structure and repeatability to business metrics. Instead of writing ad-hoc SQL queries and notebook cells to compute KPIs, you define them once in a declarative YAML or Python DSL — then kpi-engine handles computation, historical comparisons, trend analysis, and alerting automatically.
Supported backends: PostgreSQL, MySQL, SQLite, BigQuery, Snowflake (via SQLAlchemy) and pandas DataFrames.
Installation
pip install kpi-engine
With optional extras:
pip install "kpi-engine[alerts]" # Slack, email, PagerDuty
pip install "kpi-engine[server]" # FastAPI REST server
pip install "kpi-engine[scheduler]" # Cron scheduling
Quick Start
From a YAML file
# kpis.yaml
kpis:
- name: monthly_revenue
label: Monthly Revenue
source: sql
query: >
SELECT SUM(amount) FROM orders
WHERE order_date >= '{{ period_start }}'::date
AND order_date < '{{ period_end }}'::date
aggregation: sum
unit: USD
compare: [MoM, YoY]
alerts:
- condition: "< 100000"
severity: critical
message: Revenue dropped below $100K
from sqlalchemy import create_engine
from kpi_engine import KPIEngine
engine = KPIEngine.from_yaml(
"kpis.yaml",
connection=create_engine("postgresql://user:pass@host/db")
)
results = engine.run(period="last_month")
for kpi in results:
print(f"{kpi.label}: {kpi.value:,.2f} {kpi.unit}")
if "MoM" in kpi.comparisons:
print(f" MoM: {kpi.mom_change_pct:+.1f}%")
print(f" Status: {kpi.alert_status}")
From Python directly
from kpi_engine import KPIEngine
from kpi_engine.models import KPIDefinition, Alert
import pandas as pd
df = pd.DataFrame({
"revenue": [1000, 2000, 3000],
"order_date": pd.to_datetime(["2024-11-01", "2024-11-15", "2024-11-28"]),
})
kpis = [
KPIDefinition(
name="revenue",
label="Monthly Revenue",
source="dataframe",
aggregation="sum",
unit="USD",
query="orders.revenue", # "table.column" format
compare=["MoM"],
alerts=[Alert(condition="< 1000", severity="warning")],
)
]
engine = KPIEngine(kpis=kpis, dataframes={"orders": df})
results = engine.run(period="2024-11")
How It Works
KPI Definitions (YAML or Python DSL)
│
▼
┌─────────────────────────────┐
│ KPI Registry │ ← Parses and validates all KPI definitions
└──────────────┬──────────────┘
▼
┌─────────────────────────────┐
│ Period Resolver │ ← Converts "last_month", "2024-Q3", "yesterday"
│ │ into concrete start/end datetime pairs
└──────────────┬──────────────┘
▼
┌─────────────────────────────────────────────────────┐
│ Computation Engine │
│ ┌──────────────┐ ┌──────────────┐ Derived KPI │
│ │ SQL Backend │ │ DataFrame │ (expression) │
│ │ (SQLAlchemy) │ │ Backend │ │
│ └──────────────┘ └──────────────┘ │
└──────────────┬──────────────────────────────────────┘
▼
┌─────────────────────────────┐
│ Period-over-Period Comparator│ ← Computes Δ and Δ%
└──────────────┬──────────────┘
▼
┌─────────────────────────────┐
│ Alert Evaluator │ ← Threshold, change %, anomaly rules
└──────────────┬──────────────┘
▼
┌─────────────────────────────┐
│ Alert Dispatcher │ ← Slack, email, PagerDuty, webhooks
└──────────────┬──────────────┘
▼
┌─────────────────────────────┐
│ KPIResult + Audit Log │ ← Structured result + CSV/SQLite history
└─────────────────────────────┘
Period Strings
| Input | Resolves To |
|---|---|
"yesterday" |
Previous calendar day |
"last_week" |
Mon–Sun of the previous week |
"last_month" |
Full previous calendar month |
"last_quarter" |
Previous Q1/Q2/Q3/Q4 |
"2024-Q3" |
July 1 – September 30, 2024 |
"2024-11" |
All of November 2024 |
KPI Sources
SQL Backend
Queries run via SQLAlchemy. Use Jinja2 template variables {{ period_start }} and {{ period_end }} in your query:
KPIDefinition(
name="signups",
label="New Signups",
source="sql",
aggregation="count",
query="SELECT COUNT(*) FROM users WHERE created_at >= '{{ period_start }}'",
)
DataFrame Backend
Pass a dict of DataFrames. Use "table.column" in the query field:
KPIDefinition(
name="revenue",
source="dataframe",
aggregation="sum",
query="sales.amount", # sales DataFrame, amount column
)
Aggregations: sum, avg, count, last, rate
Derived KPIs
Computed from already-resolved KPI values using a Python expression:
KPIDefinition(
name="arpu",
label="ARPU",
source="derived",
expression="revenue / active_users",
unit="USD",
)
Derived KPIs always run after their dependencies. The engine builds a DAG automatically.
Alerts
Condition syntax
| Condition | Triggers when |
|---|---|
"< 1000" |
value is below 1000 |
"> 0.15" |
value is above 0.15 |
"<= 100" |
value is at most 100 |
">= 500" |
value is at least 500 |
"== 0" |
value equals 0 |
Alert channels
Slack:
from kpi_engine.alerts import SlackChannel
engine = KPIEngine(
kpis=kpis,
alert_channels=[SlackChannel(webhook_url="https://hooks.slack.com/...")]
)
Email:
from kpi_engine.alerts import EmailChannel
EmailChannel(
smtp_host="smtp.gmail.com", smtp_port=587,
from_email="alerts@company.com",
to_emails=["team@company.com"],
username="alerts@company.com", password="..."
)
PagerDuty:
from kpi_engine.alerts import PagerDutyChannel
PagerDutyChannel(integration_key="your-integration-key")
REST API
pip install "kpi-engine[server]"
engine.serve(port=8000)
| Endpoint | Description |
|---|---|
GET /kpis?period=last_month |
Compute all KPIs |
GET /kpis/{name}?period=2024-11 |
Compute a single KPI |
GET /kpis/{name}/history?n=10 |
Last n results |
Scheduling
pip install "kpi-engine[scheduler]"
scheduler = engine.schedule(
cron="0 9 1 * *", # 1st of every month at 9am UTC
period_fn=lambda: "last_month",
callback=lambda results: print(f"Done: {len(results)} KPIs")
)
# runs in a background daemon thread
# scheduler.stop() to cancel
Audit Log
engine = KPIEngine(kpis=kpis, connection=conn, audit_log="audit.csv")
# or
engine = KPIEngine(kpis=kpis, connection=conn, audit_log="audit.db") # SQLite
Every engine.run() call appends results to the audit log automatically.
API Reference
KPIEngine
KPIEngine(
kpis: list[KPIDefinition],
connection=None, # SQLAlchemy engine
dataframes: dict = None, # {"table_name": pd.DataFrame}
alert_channels: list = None,
audit_log: str = None # path to .csv or .db file
)
| Method | Returns | Description |
|---|---|---|
engine.run(period) |
list[KPIResult] |
Compute all KPIs |
engine.run_kpi(name, period) |
KPIResult |
Compute one KPI |
engine.history(name, n) |
list[KPIResult] |
Last n results |
engine.schedule(cron, period_fn) |
KPIScheduler |
Schedule recurring runs |
engine.serve(port) |
— | Start REST API (blocking) |
KPIEngine.from_yaml(path, ...) |
KPIEngine |
Load from YAML config |
KPIResult
result.value # float
result.unit # str
result.alert_status # "ok" | "warning" | "critical"
result.comparisons # dict[str, ComparisonResult]
result.alerts_triggered # list[AlertResult]
result.mom_change_pct # float | None
result.yoy_change_pct # float | None
result.period_start # datetime
result.period_end # datetime
result.query_duration_ms # float
KPIDefinition fields
| Field | Type | Description |
|---|---|---|
name |
str |
Unique identifier |
label |
str |
Human-readable name |
source |
str |
"sql" | "dataframe" | "derived" |
aggregation |
str |
"sum" | "avg" | "count" | "rate" | "last" |
query |
str |
SQL template or "table.column" |
expression |
str |
Python expression for derived KPIs |
compare |
list[str] |
["MoM", "YoY", "QoQ", "WoW", "DoD"] |
polarity |
str |
"higher_is_better" | "lower_is_better" |
alerts |
list[Alert] |
Alert definitions |
unit |
str |
Display unit (e.g. "USD", "%") |
Project Structure
kpi-engine/
├── kpi_engine/
│ ├── engine.py # KPIEngine orchestrator
│ ├── registry.py # KPI registry and validation
│ ├── models.py # KPIDefinition, KPIResult, Alert dataclasses
│ ├── period.py # Period resolution logic
│ ├── backends/
│ │ ├── base.py # BaseBackend abstract class
│ │ ├── sql.py # SQLAlchemy backend
│ │ ├── dataframe.py # Pandas backend
│ │ └── derived.py # Derived KPI expression evaluator
│ ├── comparator.py # Period-over-period comparison
│ ├── alerts/
│ │ ├── evaluator.py # Alert threshold evaluation
│ │ ├── dispatcher.py # Routes alerts to channels
│ │ ├── slack.py # Slack webhook channel
│ │ ├── email.py # SMTP email channel
│ │ └── pagerduty.py # PagerDuty Events API channel
│ ├── scheduler.py # Cron-based scheduling
│ ├── audit.py # Audit log (CSV or SQLite)
│ └── server.py # FastAPI REST server
└── tests/
Changelog
v1.0.1
- Added full PyPI metadata: classifiers, keywords, author, project URLs
- Exposed
__version__from package root
v1.0.0
- Declarative KPI definition (YAML + Python DSL)
- SQL backend (SQLAlchemy + Jinja2 templates)
- DataFrame backend (pandas,
"table.column"query syntax) - Derived KPI expressions with automatic DAG resolution
- Period-over-period comparisons: DoD, WoW, MoM, QoQ, YoY
- Alert conditions: threshold, with Slack, email, and PagerDuty channels
- FastAPI REST server (
engine.serve()) - Cron scheduling via croniter (
engine.schedule()) - Audit log to CSV or SQLite
License
MIT — see LICENSE
Contributing
PRs welcome. Add tests for new KPI types and alert conditions.
pip install -e ".[dev]"
pytest tests/ -v
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file kpi_engine-1.0.1.tar.gz.
File metadata
- Download URL: kpi_engine-1.0.1.tar.gz
- Upload date:
- Size: 24.7 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
62805cf145246c8b9d330933adec517e58b94545bc3ed04a28e198c95cac3792
|
|
| MD5 |
1f4f5cf92864a1d17f174fabef005a2a
|
|
| BLAKE2b-256 |
f733ec44cf8c3e17ce6efbb77ea2101603bfec559bfac4adb82d6b78966544a4
|
File details
Details for the file kpi_engine-1.0.1-py3-none-any.whl.
File metadata
- Download URL: kpi_engine-1.0.1-py3-none-any.whl
- Upload date:
- Size: 21.6 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
7feb8d16f5f630e3842b74e659b663eae4d12e4f11f858ac6b8212481575875f
|
|
| MD5 |
349300b9a9eb9ed9dac763e79563cc1b
|
|
| BLAKE2b-256 |
c21f98c8a4b02a69f8563886b384914d87262bbbc9f1241c17962db77613c3e8
|