Define KPIs once in YAML or Python — kpi-engine handles SQL/DataFrame computation, MoM/YoY comparisons, Slack/email/PagerDuty alerts, cron scheduling, audit logging, and a REST API. Saves 100+ engineering hours per project.
Project description
kpi-engine
A declarative framework for defining, computing, and alerting on KPIs from SQL or DataFrames — with built-in period-over-period comparisons.
Overview
kpi-engine brings structure and repeatability to business metrics. Instead of writing ad-hoc SQL queries and notebook cells to compute KPIs, you define them once in a declarative YAML or Python DSL — then kpi-engine handles computation, historical comparisons, trend analysis, and alerting automatically.
Supported backends: PostgreSQL, MySQL, SQLite, BigQuery, Snowflake (via SQLAlchemy) and pandas DataFrames.
Impact & ROI — By the Numbers
These estimates are grounded in the actual architecture of kpi-engine and reflect real patterns observed across data teams in industry.
Engineering Time Saved
Building a production-grade KPI system manually — with SQL templating, period comparison logic, alert routing, audit logging, a REST API, and cron scheduling — is a multi-week project. Here is how kpi-engine collapses that effort:
| Task | Manual (per KPI) | With kpi-engine |
|---|---|---|
| Define KPI + write SQL | ~30 min | ~5 min (YAML or DSL) |
| Period-over-period comparison (MoM, YoY, etc.) | ~3–4 hrs | Zero — built-in |
| Alert threshold logic | ~2 hrs | Zero — condition strings |
| Slack / email / PagerDuty wiring | ~1–2 days (each) | Zero — one-line channel config |
| Cron scheduling | ~4–8 hrs | Zero — engine.schedule(cron=...) |
| Audit log (CSV + SQLite) | ~1 day | Zero — audit_log="audit.db" |
| REST API for KPI consumption | ~3–5 days | Zero — engine.serve(port=8000) |
| Unit tests per KPI | ~1 hr | Covered by the library's 41 tests |
For a team managing 20 KPIs:
- Manual setup: ~120–160 engineering hours (3–4 engineer-weeks)
- With kpi-engine: ~2–3 hours of YAML definitions
- Time saved: 98–99% reduction in initial build time
Annualised maintenance (adding KPIs, updating queries, adjusting alert thresholds):
- Manual: ~6–10 hrs/month per data engineer
- With kpi-engine: ~30–45 min/month
- Annual saving per engineer: ~65–110 hours (~$6,500–$11,000 at a $100/hr blended rate)
Compute Cost Savings
Most data teams run the same KPI queries multiple times — in notebooks, dashboards, ad-hoc checks — without centralised scheduling. kpi-engine runs each query exactly once per scheduled period, eliminating duplicate scans.
Cloud data warehouse cost example (Google BigQuery):
| Scenario | Queries/day | Avg scan/query | Daily scan | Annual cost* |
|---|---|---|---|---|
| Ad-hoc (no kpi-engine) | 20 KPIs × 5 manual runs | 10 GB | 1 TB/day | ~$1,825/yr |
| kpi-engine scheduled | 20 KPIs × 1 scheduled run | 10 GB | 200 GB/day | ~$365/yr |
| Savings | 800 GB/day | ~$1,460/yr |
*BigQuery on-demand pricing: $5/TB scanned. Scales proportionally for Snowflake, Redshift, and Azure Synapse.
For larger teams (50 KPIs, 10 analysts running ad-hoc queries):
- Estimated annual warehouse spend without discipline: $8,000–$15,000
- With kpi-engine as the single computation layer: $1,500–$3,000
- Potential annual savings: $6,000–$12,000 in compute alone
Boilerplate Eliminated
kpi-engine ships ~750 lines of production library code. Replacing what it provides manually would require writing and maintaining:
| Component | Estimated Manual LoC |
|---|---|
| Period resolver (6 formats, timezone-safe) | ~120 lines |
| SQL backend with Jinja2 templating | ~90 lines |
| DataFrame backend (5 aggregations, date filtering) | ~110 lines |
| Derived KPI DAG + topological sort | ~80 lines |
| Period-over-period comparator (5 comparison types) | ~90 lines |
| Alert evaluator (5 operators, severity ranking) | ~60 lines |
| Alert dispatcher + Slack + Email + PagerDuty | ~250 lines |
| Cron scheduler (daemon threads, error handling) | ~80 lines |
| Audit log (CSV + SQLite dual format) | ~110 lines |
| FastAPI REST server (3 endpoints, Pydantic schemas) | ~90 lines |
| Total | ~1,080 lines of boilerplate |
A single kpis.yaml with 20 KPI definitions is typically 80–120 lines — replacing thousands of lines of glue code across notebooks, scripts, and dashboards.
Incident Response & Revenue Protection
Manual monitoring (checking dashboards, running notebooks) is reactive and slow. kpi-engine's built-in alert system makes it proactive:
| Monitoring Method | Detection Lag | Alert Routing |
|---|---|---|
| Manual dashboard review | 4–24 hours | None — human must act |
| kpi-engine (scheduled daily) | < 5 min after run | Slack + email + PagerDuty |
| kpi-engine (scheduled hourly) | < 5 min after run | Slack + email + PagerDuty |
Revenue impact of faster detection:
A company generating $1M/month in revenue loses roughly $1,400/hr if a transaction processing issue goes undetected. Cutting detection lag from 8 hours to under 1 hour saves ~$9,800 per incident in protected revenue. For teams experiencing even 2–3 such incidents per year, that is $20,000–$30,000 in annual risk reduction.
Data Science & Analytics Impact
kpi-engine changes the data science workflow at a structural level:
1. Single Source of Truth
Without a centralised definition layer, different analysts query "revenue" differently — some include refunds, some do not; some filter by region, some do not. kpi-engine enforces one canonical definition per KPI, stored in version-controlled YAML. Metric disagreements that burn hours in review meetings disappear.
2. Reproducible Period Computation
Every period string ("last_month", "2024-Q3", "yesterday") resolves to an exact (start, end) datetime pair using timezone-safe arithmetic with python-dateutil. Two analysts running the same period will always get the same window — no more "which month did you use?" confusion.
3. Automatic DAG Execution for Derived KPIs
Derived KPIs like ARPU (revenue / active_users) are computed after their dependencies using topological ordering. This means derived metrics are always consistent with the base metrics from the same run — not from different queries executed at different times, which is a common source of subtle metric inconsistency in notebook-driven workflows.
4. Audit Trail for Compliance and Debugging
Every engine.run() call writes all KPI values, alert statuses, and query durations to a CSV or SQLite audit log. This gives data teams:
- A searchable history of every metric computation
- Evidence for compliance audits (SOC 2, GDPR data processing records)
- Query duration tracking to identify performance regressions
5. Backend Flexibility Without Rewriting Logic
The same KPI definition runs against PostgreSQL, BigQuery, Snowflake, or a pandas DataFrame by changing one config parameter. Teams migrating from a local pandas prototype to a production SQL warehouse do not need to rewrite any KPI logic — only the source and connection parameters change.
6. Democratises Production-Grade Monitoring
Before kpi-engine, building a KPI pipeline with alerting and scheduling required a data engineer familiar with FastAPI, SQLAlchemy, threading, SMTP, Slack APIs, and cron. With kpi-engine, a data analyst who knows basic Python and SQL can deploy a fully monitored, scheduled KPI system in an afternoon.
Workflow Comparison
Before kpi-engine — typical data team setup:
Jupyter Notebook A → Revenue query (hardcoded dates, no comparison)
Jupyter Notebook B → Customer count query (different date format)
dbt model C → Derived ARPU (may not match A ÷ B due to timing)
Slack bot script → Manual threshold check, last updated 6 months ago
Cron job in crontab → Undocumented, runs at 3am, no error handling
Google Sheet → "Audit log" manually updated by one analyst
Custom Flask app → REST endpoint, 400 lines, only one person understands it
After kpi-engine:
# kpis.yaml — the entire system in one file
kpis:
- name: monthly_revenue
source: sql
query: "SELECT SUM(amount) FROM orders WHERE order_date >= '{{ period_start }}'"
aggregation: sum
unit: USD
compare: [MoM, YoY]
alerts:
- condition: "< 100000"
severity: critical
message: "Revenue dropped below $100K"
- name: active_customers
source: sql
query: "SELECT COUNT(DISTINCT user_id) FROM orders WHERE order_date >= '{{ period_start }}'"
aggregation: count
compare: [MoM]
- name: arpu
source: derived
expression: "monthly_revenue / active_customers"
unit: USD
compare: [MoM, YoY]
engine = KPIEngine.from_yaml("kpis.yaml",
connection=create_engine("postgresql://..."),
alert_channels=[SlackChannel(webhook_url="...")],
audit_log="audit.db"
)
engine.schedule(cron="0 9 1 * *", period_fn=lambda: "last_month")
engine.serve(port=8000)
That is the entire production system: SQL computation, DAG ordering, period comparisons, Slack alerts, SQLite audit log, cron scheduling, and a REST API.
At a Glance
| Metric | Impact |
|---|---|
| Initial KPI system build time | ~98% reduction (weeks → hours) |
| Lines of boilerplate eliminated | ~1,080 lines per project |
| Annual cloud warehouse cost savings | $1,500–$12,000 depending on team size |
| Annual engineering hours saved | 65–110 hrs per engineer |
| Alert detection lag | < 5 minutes vs. 4–24 hrs manually |
| Metric consistency issues | Eliminated via single YAML definition |
| Backend portability | Zero rewrite when switching SQL dialects |
| Compliance audit readiness | Built-in CSV/SQLite audit trail |
Installation
pip install kpi-engine
With optional extras:
pip install "kpi-engine[alerts]" # Slack, email, PagerDuty
pip install "kpi-engine[server]" # FastAPI REST server
pip install "kpi-engine[scheduler]" # Cron scheduling
Quick Start
From a YAML file
# kpis.yaml
kpis:
- name: monthly_revenue
label: Monthly Revenue
source: sql
query: >
SELECT SUM(amount) FROM orders
WHERE order_date >= '{{ period_start }}'::date
AND order_date < '{{ period_end }}'::date
aggregation: sum
unit: USD
compare: [MoM, YoY]
alerts:
- condition: "< 100000"
severity: critical
message: Revenue dropped below $100K
from sqlalchemy import create_engine
from kpi_engine import KPIEngine
engine = KPIEngine.from_yaml(
"kpis.yaml",
connection=create_engine("postgresql://user:pass@host/db")
)
results = engine.run(period="last_month")
for kpi in results:
print(f"{kpi.label}: {kpi.value:,.2f} {kpi.unit}")
if "MoM" in kpi.comparisons:
print(f" MoM: {kpi.mom_change_pct:+.1f}%")
print(f" Status: {kpi.alert_status}")
From Python directly
from kpi_engine import KPIEngine
from kpi_engine.models import KPIDefinition, Alert
import pandas as pd
df = pd.DataFrame({
"revenue": [1000, 2000, 3000],
"order_date": pd.to_datetime(["2024-11-01", "2024-11-15", "2024-11-28"]),
})
kpis = [
KPIDefinition(
name="revenue",
label="Monthly Revenue",
source="dataframe",
aggregation="sum",
unit="USD",
query="orders.revenue", # "table.column" format
compare=["MoM"],
alerts=[Alert(condition="< 1000", severity="warning")],
)
]
engine = KPIEngine(kpis=kpis, dataframes={"orders": df})
results = engine.run(period="2024-11")
How It Works
KPI Definitions (YAML or Python DSL)
│
▼
┌─────────────────────────────┐
│ KPI Registry │ ← Parses and validates all KPI definitions
└──────────────┬──────────────┘
▼
┌─────────────────────────────┐
│ Period Resolver │ ← Converts "last_month", "2024-Q3", "yesterday"
│ │ into concrete start/end datetime pairs
└──────────────┬──────────────┘
▼
┌─────────────────────────────────────────────────────┐
│ Computation Engine │
│ ┌──────────────┐ ┌──────────────┐ Derived KPI │
│ │ SQL Backend │ │ DataFrame │ (expression) │
│ │ (SQLAlchemy) │ │ Backend │ │
│ └──────────────┘ └──────────────┘ │
└──────────────┬──────────────────────────────────────┘
▼
┌─────────────────────────────┐
│ Period-over-Period Comparator│ ← Computes Δ and Δ%
└──────────────┬──────────────┘
▼
┌─────────────────────────────┐
│ Alert Evaluator │ ← Threshold, change %, anomaly rules
└──────────────┬──────────────┘
▼
┌─────────────────────────────┐
│ Alert Dispatcher │ ← Slack, email, PagerDuty, webhooks
└──────────────┬──────────────┘
▼
┌─────────────────────────────┐
│ KPIResult + Audit Log │ ← Structured result + CSV/SQLite history
└─────────────────────────────┘
Period Strings
| Input | Resolves To |
|---|---|
"yesterday" |
Previous calendar day |
"last_week" |
Mon–Sun of the previous week |
"last_month" |
Full previous calendar month |
"last_quarter" |
Previous Q1/Q2/Q3/Q4 |
"2024-Q3" |
July 1 – September 30, 2024 |
"2024-11" |
All of November 2024 |
KPI Sources
SQL Backend
Queries run via SQLAlchemy. Use Jinja2 template variables {{ period_start }} and {{ period_end }} in your query:
KPIDefinition(
name="signups",
label="New Signups",
source="sql",
aggregation="count",
query="SELECT COUNT(*) FROM users WHERE created_at >= '{{ period_start }}'",
)
DataFrame Backend
Pass a dict of DataFrames. Use "table.column" in the query field:
KPIDefinition(
name="revenue",
source="dataframe",
aggregation="sum",
query="sales.amount", # sales DataFrame, amount column
)
Aggregations: sum, avg, count, last, rate
Derived KPIs
Computed from already-resolved KPI values using a Python expression:
KPIDefinition(
name="arpu",
label="ARPU",
source="derived",
expression="revenue / active_users",
unit="USD",
)
Derived KPIs always run after their dependencies. The engine builds a DAG automatically.
Alerts
Condition syntax
| Condition | Triggers when |
|---|---|
"< 1000" |
value is below 1000 |
"> 0.15" |
value is above 0.15 |
"<= 100" |
value is at most 100 |
">= 500" |
value is at least 500 |
"== 0" |
value equals 0 |
Alert channels
Slack:
from kpi_engine.alerts import SlackChannel
engine = KPIEngine(
kpis=kpis,
alert_channels=[SlackChannel(webhook_url="https://hooks.slack.com/...")]
)
Email:
from kpi_engine.alerts import EmailChannel
EmailChannel(
smtp_host="smtp.gmail.com", smtp_port=587,
from_email="alerts@company.com",
to_emails=["team@company.com"],
username="alerts@company.com", password="..."
)
PagerDuty:
from kpi_engine.alerts import PagerDutyChannel
PagerDutyChannel(integration_key="your-integration-key")
REST API
pip install "kpi-engine[server]"
engine.serve(port=8000)
| Endpoint | Description |
|---|---|
GET /kpis?period=last_month |
Compute all KPIs |
GET /kpis/{name}?period=2024-11 |
Compute a single KPI |
GET /kpis/{name}/history?n=10 |
Last n results |
Scheduling
pip install "kpi-engine[scheduler]"
scheduler = engine.schedule(
cron="0 9 1 * *", # 1st of every month at 9am UTC
period_fn=lambda: "last_month",
callback=lambda results: print(f"Done: {len(results)} KPIs")
)
# runs in a background daemon thread
# scheduler.stop() to cancel
Audit Log
engine = KPIEngine(kpis=kpis, connection=conn, audit_log="audit.csv")
# or
engine = KPIEngine(kpis=kpis, connection=conn, audit_log="audit.db") # SQLite
Every engine.run() call appends results to the audit log automatically.
API Reference
KPIEngine
KPIEngine(
kpis: list[KPIDefinition],
connection=None, # SQLAlchemy engine
dataframes: dict = None, # {"table_name": pd.DataFrame}
alert_channels: list = None,
audit_log: str = None # path to .csv or .db file
)
| Method | Returns | Description |
|---|---|---|
engine.run(period) |
list[KPIResult] |
Compute all KPIs |
engine.run_kpi(name, period) |
KPIResult |
Compute one KPI |
engine.history(name, n) |
list[KPIResult] |
Last n results |
engine.schedule(cron, period_fn) |
KPIScheduler |
Schedule recurring runs |
engine.serve(port) |
— | Start REST API (blocking) |
KPIEngine.from_yaml(path, ...) |
KPIEngine |
Load from YAML config |
KPIResult
result.value # float
result.unit # str
result.alert_status # "ok" | "warning" | "critical"
result.comparisons # dict[str, ComparisonResult]
result.alerts_triggered # list[AlertResult]
result.mom_change_pct # float | None
result.yoy_change_pct # float | None
result.period_start # datetime
result.period_end # datetime
result.query_duration_ms # float
KPIDefinition fields
| Field | Type | Description |
|---|---|---|
name |
str |
Unique identifier |
label |
str |
Human-readable name |
source |
str |
"sql" | "dataframe" | "derived" |
aggregation |
str |
"sum" | "avg" | "count" | "rate" | "last" |
query |
str |
SQL template or "table.column" |
expression |
str |
Python expression for derived KPIs |
compare |
list[str] |
["MoM", "YoY", "QoQ", "WoW", "DoD"] |
polarity |
str |
"higher_is_better" | "lower_is_better" |
alerts |
list[Alert] |
Alert definitions |
unit |
str |
Display unit (e.g. "USD", "%") |
Project Structure
kpi-engine/
├── kpi_engine/
│ ├── engine.py # KPIEngine orchestrator
│ ├── registry.py # KPI registry and validation
│ ├── models.py # KPIDefinition, KPIResult, Alert dataclasses
│ ├── period.py # Period resolution logic
│ ├── backends/
│ │ ├── base.py # BaseBackend abstract class
│ │ ├── sql.py # SQLAlchemy backend
│ │ ├── dataframe.py # Pandas backend
│ │ └── derived.py # Derived KPI expression evaluator
│ ├── comparator.py # Period-over-period comparison
│ ├── alerts/
│ │ ├── evaluator.py # Alert threshold evaluation
│ │ ├── dispatcher.py # Routes alerts to channels
│ │ ├── slack.py # Slack webhook channel
│ │ ├── email.py # SMTP email channel
│ │ └── pagerduty.py # PagerDuty Events API channel
│ ├── scheduler.py # Cron-based scheduling
│ ├── audit.py # Audit log (CSV or SQLite)
│ └── server.py # FastAPI REST server
└── tests/
Changelog
v1.0.2
- Added deep Impact & ROI section: engineering hours saved, compute cost savings, boilerplate eliminated, incident response improvements, and data science workflow analysis
- Expanded PyPI keywords for discoverability
- Updated package description to reflect full feature scope
v1.0.1
- Added full PyPI metadata: classifiers, keywords, author, project URLs
- Exposed
__version__from package root
v1.0.0
- Declarative KPI definition (YAML + Python DSL)
- SQL backend (SQLAlchemy + Jinja2 templates)
- DataFrame backend (pandas,
"table.column"query syntax) - Derived KPI expressions with automatic DAG resolution
- Period-over-period comparisons: DoD, WoW, MoM, QoQ, YoY
- Alert conditions: threshold, with Slack, email, and PagerDuty channels
- FastAPI REST server (
engine.serve()) - Cron scheduling via croniter (
engine.schedule()) - Audit log to CSV or SQLite
License
MIT — see LICENSE
Contributing
PRs welcome. Add tests for new KPI types and alert conditions.
pip install -e ".[dev]"
pytest tests/ -v
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file kpi_engine-1.0.2.tar.gz.
File metadata
- Download URL: kpi_engine-1.0.2.tar.gz
- Upload date:
- Size: 33.1 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
478696fc4ff649e12a699a9c9c0f9aba08d15ba3fb61f5370a7ac2742b954bc3
|
|
| MD5 |
9f24737408599927864bfed095c12a47
|
|
| BLAKE2b-256 |
29eef64ea026032292f1977564ed02c4f0bd35c49e02e4f548d0269efc30354b
|
File details
Details for the file kpi_engine-1.0.2-py3-none-any.whl.
File metadata
- Download URL: kpi_engine-1.0.2-py3-none-any.whl
- Upload date:
- Size: 25.3 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
0f33ac93698e94cfa223b50076a8da78123b23d2fe7ee2e8e2838acae7c3f155
|
|
| MD5 |
19b14658ac0e4c56e8c2598344469e79
|
|
| BLAKE2b-256 |
e1c72f29619e9eb388c87f443197e77bcd653876cc4048c6631819237b304702
|