Skip to main content

Granyt SDK for Apache Airflow - Automatic lineage tracking and error monitoring

Project description

Granyt Logo

Granyt SDK for Apache Airflow

Python 3.9+ Airflow Support MIT License

Python SDK for Granyt - the modern, open-source Airflow observability platform
Automatically capture lineage, errors, and metrics from your Airflow DAGs

Note: This is the Python SDK component of Granyt. For the complete project overview, see the main README.


Granyt Dashboard


✨ Features

  • Automatic Lineage Tracking - Captures DAG/task run status using OpenLineage without any manual instrumentation
  • Rich Error Capture - Sentry-like error capturing with full stack traces and context variables
  • Zero Configuration - Just install and set environment variables - no code changes required
  • Operator Adapters - Built-in support for popular operators (Snowflake, BigQuery, dbt, S3, and more)
  • DataFrame Metrics - Automatic schema and metrics extraction from Pandas/Polars DataFrames
  • 100% Open Source & Self-Hostable - Fully open source under the MIT license

📦 Installation

pip install granyt-sdk

✅ Compatibility

Airflow Version Status
2.5.x - 2.10.x ✅ Fully Supported
3.0.x+ 🚧 Coming Soon

Python: Requires Python 3.10 or later.


⚙️ Configuration

Set the following environment variables:

export GRANYT_ENDPOINT="https://your-granyt-backend.com"
export GRANYT_API_KEY="your-api-key" # Get from Granyt App deployment

🔧 How It Works

Automatic Lineage Tracking

The SDK integrates with Airflow's plugin system and listener mechanism to automatically:

  1. Capture task/DAG run starts, completions, and failures
  2. Extract OpenLineage-compatible metadata
  3. Send lineage events to your Granyt backend

Operator Adapters

The SDK includes built-in adapters for popular Airflow operators that automatically extract rich metrics:

Category Operators Key Metrics
Snowflake SnowflakeOperator, SnowflakeSqlApiOperator, S3ToSnowflakeOperator row_count, query_id, warehouse, database, schema
BigQuery BigQueryInsertJobOperator, BigQueryCheckOperator, GCSToBigQueryOperator bytes_processed, row_count, query_id, slot_milliseconds
Generic SQL SQLExecuteQueryOperator, SQLColumnCheckOperator, BranchSQLOperator row_count, database, schema, table
AWS S3 S3CopyObjectOperator, S3ListOperator, S3DeleteObjectsOperator files_processed, bytes_processed, source_path, destination_path
GCS GCSCreateBucketOperator, GCSListObjectsOperator, GCSSynchronizeBucketsOperator files_processed, bytes_processed, source_path, destination_path
dbt DbtCloudRunJobOperator, DbtRunOperator, DbtTestOperator models_run, tests_passed, tests_failed, row_count

For more details on how we extract metrics from specific operators, see the Operator Adapters documentation.


Rich Error Capture

When a task fails, the SDK automatically captures:

  • Full stack trace with local variables
  • Task instance metadata (dag_id, task_id, run_id, try_number, etc.)
  • DAG configuration and task parameters
  • Environment context
  • Previous log entries

🚀 Usage

Once installed and configured, the SDK works automatically. No code changes are required in your DAGs.

Reporting Custom Metrics from Python Tasks

The most flexible way to report metrics from an Airflow @task or PythonOperator is to include a granyt key in your return value. The SDK automatically captures everything inside this dictionary from the xcom.

Simple Manual Metrics

You can pass any key-value pairs you want to track in your dashboard:

@task
def process_data():
    # ... your logic ...
    return {
        "granyt": {
            "row_count": 1500,
            "data_quality_passed": True,
            "source_file": "users.csv"
        }
    }

Automatic Metric Calculation

For deep data insights, use compute_df_metrics. It automatically calculates row counts, null counts, and column types from your Pandas or Polars DataFrames. Pass the result to granyt["df_metrics"] to get schema change detection and rich metrics:

from granyt_sdk import compute_df_metrics

@task
def transform_data():
    df = pd.read_parquet("data.parquet")
    
    return {
        "granyt": {
            # automatically captures schema and df metadata
            "df_metrics": compute_df_metrics(df),
            "data_quality_passed": True
        }
    }

Custom DataFrame Support

You can add support for other DataFrame types by creating a custom adapter:

from granyt_sdk import DataFrameAdapter, register_adapter

class SparkAdapter(DataFrameAdapter):
    @classmethod
    def can_handle(cls, df):
        return hasattr(df, 'rdd')
    
    @classmethod
    def get_type_name(cls):
        return "spark"
    
    @classmethod
    def get_columns_with_dtypes(cls, df):
        return [(f.name, str(f.dataType)) for f in df.schema.fields]
    
    @classmethod
    def get_row_count(cls, df):
        return df.count()

register_adapter(SparkAdapter)

📖 API Reference

Environment Variables

Variable Description Default
GRANYT_ENDPOINT Backend API endpoint (single endpoint mode) Required*
GRANYT_API_KEY API key for authentication (single endpoint mode) Required*
GRANYT_DEBUG Enable debug logging false
GRANYT_DISABLED Disable the SDK false

📧 Contact


⚖️ License

MIT License - see LICENSE for details.


← Back to main README

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

granyt_sdk-0.1.0.tar.gz (46.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

granyt_sdk-0.1.0-py3-none-any.whl (64.9 kB view details)

Uploaded Python 3

File details

Details for the file granyt_sdk-0.1.0.tar.gz.

File metadata

  • Download URL: granyt_sdk-0.1.0.tar.gz
  • Upload date:
  • Size: 46.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.9

File hashes

Hashes for granyt_sdk-0.1.0.tar.gz
Algorithm Hash digest
SHA256 41c284271bfafb52ec1229df7f228f30732939212c1553e57c911458eae120a3
MD5 cf3b79f29c1f1fca1eb1a9b1cadd0817
BLAKE2b-256 a65ae128af2729e2f6aec18b5c55c0c1d639fac10c7cfe08a22b716f55070530

See more details on using hashes here.

File details

Details for the file granyt_sdk-0.1.0-py3-none-any.whl.

File metadata

  • Download URL: granyt_sdk-0.1.0-py3-none-any.whl
  • Upload date:
  • Size: 64.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.9

File hashes

Hashes for granyt_sdk-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 f1d0fa2fd7b42b17ce748cec47c2b2a9d01acc91d9ad1b2b21ccd1b05049c053
MD5 d2eef69001fd4c2175f901fd337d1102
BLAKE2b-256 96a61d2be385e7c71c8dc69cad5195760a1860f7dfc95bfdc9bd9176b0ef28aa

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page