Granyt SDK for Apache Airflow - Automatic lineage tracking and error monitoring
Project description
Granyt
Modern, open source Airflow monitoring with error tracking, metrics, and alerts that actually understand your pipelines. Self-hosted. 5 minutes to deploy.
Features • Why Granyt? • Quick Start
Features
- DAG Monitoring: Know when pipelines break before downstream teams do. Track run history, duration trends, and success rates.
- Smart Alerts: Get notified when your BigQuery job returns 0 rows instead of finding out from your CEO on Monday. Schema changes, row count drops, failures: all covered.
- Sentry-like Error Tracking: Stack traces with DAG context, not just Python tracebacks. Errors grouped by fingerprint so you see patterns, not noise.
- Automatic Metrics: Snowflake query stats, BigQuery bytes scanned, dbt test results - captured automatically from your existing operators.
- Multi-Environment: Compare errors across dev, staging, and prod without switching tabs or grep-ing through logs.
- Self-Hosted: Your data never leaves your infrastructure. No $2k/month observability bill. MIT licensed.
Why Granyt?
The Airflow UI shows you what happened. Granyt tells you what went wrong.
If you've ever:
- Discovered a DAG failed because someone asked "why is the dashboard empty?"
- Spent 20 minutes clicking through task logs to find the actual error
- Wished Sentry understood that
dag_idandtask_idmatter - Set up Grafana dashboards that nobody looks at
...then you know why generic monitoring tools don't cut it for data pipelines.
Granyt is built specifically for Airflow. It speaks DAG, not just HTTP status codes.
Architecture
Granyt consists of three parts:
- Frontend: Next.js 15 web application
- SDK: Airflow pip package for capturing metrics and errors
- PostgreSQL Database
Quick Start
Get from zero to monitoring in under 5 minutes. No credit card, no sales call, no "contact us for pricing."
1. Deploy the Granyt App
The easiest way to run Granyt is using Docker:
# Clone the repository
git clone https://github.com/jhkessler/getgranyt.git
cd getgranyt
# Create a .env file with required variables
cat > .env << EOF
POSTGRES_PASSWORD=$(openssl rand -hex 24)
BETTER_AUTH_SECRET=$(openssl rand -hex 32)
BETTER_AUTH_URL=http://localhost:3000
EOF
# Build and start with Docker Compose
docker compose -f docker/docker-compose.yml --env-file .env up --build -d
Open http://localhost:3000 and create your account.
For production deployment with prebuilt images, Kubernetes, and more options, see the Deployment Guide.
2. Install the SDK in Airflow
Requirements: Apache Airflow 2.5 – 2.10 · Python 3.9 – 3.12
The Granyt SDK is a lightweight Python listener that runs alongside your Airflow workers and scheduler. It automatically captures DAG and task execution events and sends them to your Granyt dashboard.
Install the SDK in your Airflow environment's Python (e.g., add to your requirements.txt or install directly in your Airflow container/virtualenv):
pip install granyt-sdk
3. Configure the SDK
Set environment variables in your Airflow environment:
export GRANYT_ENDPOINT="https://granyt.yourdomain.com"
export GRANYT_API_KEY="your-api-key" # Get this from the Granyt dashboard
That's it! The SDK automatically captures task events and errors from your DAGs.
Capture metrics from Popular Operators
Granyt works with the Airflow lifecycle to automatically capture metrics from popular operators. Need support for a custom operator? You can easily build and register your own adapters to extract any metadata you need. Learn more in our docs.
Supported Operators include:
| Category | Operators |
|---|---|
| SQL & Warehouses | Snowflake, BigQuery, Redshift, Postgres |
| Cloud Storage | AWS S3, Google Cloud Storage, Azure Blob |
| Transformation | dbt Cloud, dbt Core, Spark, Bash |
Custom Metrics in Python Tasks
You can emit custom metrics in your python tasks by returning a granyt key in your task's return value.
from granyt_sdk import compute_df_metrics
@task
def transform_data():
# Load raw data
df_raw = pd.read_sql("SELECT * FROM raw_events", conn)
return {
"granyt": {
# pass the number of rows to the special "row_count" key to get anomaly warnings
"row_count": len(df_raw),
# add custom metrics you want to track
"high_value_orders": (df_raw["amount"] > 1000).sum()
}
}
For deep data insights, use compute_df_metrics. It automatically calculates row counts, null counts, and column types from your Pandas or Polars DataFrames. Pass the result to granyt["df_metrics"] to get schema change detection and rich metrics:
from granyt_sdk import compute_df_metrics
@task
def transform_data():
df = pd.read_parquet("data.parquet")
return {
"granyt": {
# automatically captures schema and df metadata
"df_metrics": compute_df_metrics(df),
"data_quality_passed": True
}
}
Creating Custom Alerts
Note: The
create_alertfeature requires granyt-sdk and granyt-app version 0.2.0 or above.
You can programmatically create alerts from your DAG by including a create_alert key in your return value. This is useful for custom data validation, business rule violations, or any condition you want to surface as an alert:
@task
def validate_data():
df = pd.read_parquet("data.parquet")
invalid_count = (df['status'] == 'invalid').sum()
alert = {
"title": f"High invalid record count: {invalid_count}",
"description": "Found more than 100 invalid records in the data pipeline",
"send_notification": True # Optional, defaults to False
}
return {
"granyt": {
"create_alert": None if invalid_count <= 100 else alert
}
}
Alert Fields:
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
title |
string | Yes | - | Alert title (max 200 chars) |
description |
string | No | - | Detailed description (max 2000 chars) |
send_notification |
bool | No | False |
Send email notification to team |
Proactive Data Alerts
Granyt automatically monitors your pipelines and alerts you when data anomalies occur.
Built-in Alert Types:
| Alert Type | What It Detects |
|---|---|
| Schema Change | Columns added, removed, or data types changed (requires df_metrics key) |
| Row Count Drop | Sudden drops in row count compared to historical baseline (requires row_count or df_metrics key) |
| Null Occurrence | Columns that historically never had nulls now contain null values (requires df_metrics key) |
You can also set up custom alerts for your own metrics in the dashboard.
Contact
- GitHub: @jhkessler
- Email: johnny@granyt.dev
- Issues: GitHub Issues
License
This project is licensed under the MIT License - see the LICENSE file for details.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file granyt_sdk-0.2.2.tar.gz.
File metadata
- Download URL: granyt_sdk-0.2.2.tar.gz
- Upload date:
- Size: 48.2 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
e6428fd9935ad5d790b95d3768846bbd5fa7f0a3e1c6a2e0d031dbbaea9a483a
|
|
| MD5 |
91b323e24ec3f100ac84faea4c681335
|
|
| BLAKE2b-256 |
7932e56ce219aa4f01c4d0696dbf3031fc9761709cac208c8ca5b0369e955863
|
File details
Details for the file granyt_sdk-0.2.2-py3-none-any.whl.
File metadata
- Download URL: granyt_sdk-0.2.2-py3-none-any.whl
- Upload date:
- Size: 65.3 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
d2bd059bc69903016307201c4d51324128e99849c37e17b8e717091184632528
|
|
| MD5 |
2a56d113ca90af1c06fea1c0da0a7e30
|
|
| BLAKE2b-256 |
41743de0b89e535972202b89bd9f9922a318aa5a113578252d122fc06c778282
|