Granyt SDK for Apache Airflow - Automatic lineage tracking and error monitoring
Project description
Granyt SDK for Apache Airflow
Python SDK for Granyt - the modern, open-source Airflow observability platform
Automatically capture lineage, errors, and metrics from your Airflow DAGs
Note: This is the Python SDK component of Granyt. For the complete project overview, see the main README.
✨ Features
- Automatic Lineage Tracking - Captures DAG/task run status using OpenLineage without any manual instrumentation
- Rich Error Capture - Sentry-like error capturing with full stack traces and context variables
- Zero Configuration - Just install and set environment variables - no code changes required
- Operator Adapters - Built-in support for popular operators (Snowflake, BigQuery, dbt, S3, and more)
- DataFrame Metrics - Automatic schema and metrics extraction from Pandas/Polars DataFrames
- 100% Open Source & Self-Hostable - Fully open source under the MIT license
📦 Installation
pip install granyt-sdk
✅ Compatibility
| Airflow Version | Status |
|---|---|
| 2.5.x - 2.10.x | ✅ Fully Supported |
| 3.0.x+ | 🚧 Coming Soon |
Python: Requires Python 3.10 or later.
⚙️ Configuration
Set the following environment variables:
export GRANYT_ENDPOINT="https://your-granyt-backend.com"
export GRANYT_API_KEY="your-api-key" # Get from Granyt App deployment
🔧 How It Works
Automatic Lineage Tracking
The SDK integrates with Airflow's plugin system and listener mechanism to automatically:
- Capture task/DAG run starts, completions, and failures
- Extract OpenLineage-compatible metadata
- Send lineage events to your Granyt backend
Operator Adapters
The SDK includes built-in adapters for popular Airflow operators that automatically extract rich metrics:
| Category | Operators | Key Metrics |
|---|---|---|
| Snowflake | SnowflakeOperator, SnowflakeSqlApiOperator, S3ToSnowflakeOperator |
row_count, query_id, warehouse, database, schema |
| BigQuery | BigQueryInsertJobOperator, BigQueryCheckOperator, GCSToBigQueryOperator |
bytes_processed, row_count, query_id, slot_milliseconds |
| Generic SQL | SQLExecuteQueryOperator, SQLColumnCheckOperator, BranchSQLOperator |
row_count, database, schema, table |
| AWS S3 | S3CopyObjectOperator, S3ListOperator, S3DeleteObjectsOperator |
files_processed, bytes_processed, source_path, destination_path |
| GCS | GCSCreateBucketOperator, GCSListObjectsOperator, GCSSynchronizeBucketsOperator |
files_processed, bytes_processed, source_path, destination_path |
| dbt | DbtCloudRunJobOperator, DbtRunOperator, DbtTestOperator |
models_run, tests_passed, tests_failed, row_count |
For more details on how we extract metrics from specific operators, see the Operator Adapters documentation.
Rich Error Capture
When a task fails, the SDK automatically captures:
- Full stack trace with local variables
- Task instance metadata (dag_id, task_id, run_id, try_number, etc.)
- DAG configuration and task parameters
- Environment context
- Previous log entries
🚀 Usage
Once installed and configured, the SDK works automatically. No code changes are required in your DAGs.
Reporting Custom Metrics from Python Tasks
The most flexible way to report metrics from an Airflow @task or PythonOperator is to include a granyt key in your return value. The SDK automatically captures everything inside this dictionary from the xcom.
Simple Manual Metrics
You can pass any key-value pairs you want to track in your dashboard:
@task
def process_data():
# ... your logic ...
return {
"granyt": {
"row_count": 1500,
"data_quality_passed": True,
"source_file": "users.csv"
}
}
Automatic Metric Calculation
For deep data insights, use compute_df_metrics. It automatically calculates row counts, null counts, and column types from your Pandas or Polars DataFrames. Pass the result to granyt["df_metrics"] to get schema change detection and rich metrics:
from granyt_sdk import compute_df_metrics
@task
def transform_data():
df = pd.read_parquet("data.parquet")
return {
"granyt": {
# automatically captures schema and df metadata
"df_metrics": compute_df_metrics(df),
"data_quality_passed": True
}
}
Custom DataFrame Support
You can add support for other DataFrame types by creating a custom adapter:
from granyt_sdk import DataFrameAdapter, register_adapter
class SparkAdapter(DataFrameAdapter):
@classmethod
def can_handle(cls, df):
return hasattr(df, 'rdd')
@classmethod
def get_type_name(cls):
return "spark"
@classmethod
def get_columns_with_dtypes(cls, df):
return [(f.name, str(f.dataType)) for f in df.schema.fields]
@classmethod
def get_row_count(cls, df):
return df.count()
register_adapter(SparkAdapter)
📖 API Reference
Environment Variables
| Variable | Description | Default |
|---|---|---|
GRANYT_ENDPOINT |
Backend API endpoint (single endpoint mode) | Required* |
GRANYT_API_KEY |
API key for authentication (single endpoint mode) | Required* |
GRANYT_DEBUG |
Enable debug logging | false |
GRANYT_DISABLED |
Disable the SDK | false |
📧 Contact
- GitHub: @jhkessler
- Email: johnny@granyt.dev
- Issues: GitHub Issues
⚖️ License
MIT License - see LICENSE for details.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file granyt_sdk-0.1.0.tar.gz.
File metadata
- Download URL: granyt_sdk-0.1.0.tar.gz
- Upload date:
- Size: 46.7 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.9
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
41c284271bfafb52ec1229df7f228f30732939212c1553e57c911458eae120a3
|
|
| MD5 |
cf3b79f29c1f1fca1eb1a9b1cadd0817
|
|
| BLAKE2b-256 |
a65ae128af2729e2f6aec18b5c55c0c1d639fac10c7cfe08a22b716f55070530
|
File details
Details for the file granyt_sdk-0.1.0-py3-none-any.whl.
File metadata
- Download URL: granyt_sdk-0.1.0-py3-none-any.whl
- Upload date:
- Size: 64.9 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.9
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
f1d0fa2fd7b42b17ce748cec47c2b2a9d01acc91d9ad1b2b21ccd1b05049c053
|
|
| MD5 |
d2eef69001fd4c2175f901fd337d1102
|
|
| BLAKE2b-256 |
96a61d2be385e7c71c8dc69cad5195760a1860f7dfc95bfdc9bd9176b0ef28aa
|