OpenTelemetry instrumentation for Apache Airflow tasks
Project description
airflow-otel
OpenTelemetry instrumentation for Apache Airflow tasks.
Wraps Airflow task execution with traces, metrics, and logs exported via OTLP. Designed to work with any OpenTelemetry-compatible backend (Jaeger, Tempo, Dash0, Honeycomb, etc.) — point it at your collector and go.
Features
- Creates a
CONSUMERroot span per task execution named{dag_id}.{task_id} - Cross-task trace linking via W3C Trace Context propagation through XCom — tasks in a DAG run appear as a single connected trace in Dash0, Grafana App O11y, Jaeger, etc.
- Sets
ERRORstatus and emits a correlated log record on exception - Supports both the TaskFlow API (decorator) and traditional PythonOperator (context manager)
- Cardinality-safe: high-cardinality values (
run_id,try_number,logical_date) go on span/log attributes, never on resource attributes get_tracer()/get_meter()give you access for custom child spans and metrics inside tasks
Compatibility
| Airflow | Python | Status |
|---|---|---|
| 2.5 – 2.x | 3.9 – 3.13 | Supported |
| 3.x | 3.9 – 3.13 | Supported |
Installation
uv add airflow-otel
Or with pip:
pip install airflow-otel
Configuration
Configuration is via environment variables — no code changes needed to switch environments.
| Variable | Default | Description |
|---|---|---|
OTEL_EXPORTER_OTLP_ENDPOINT |
http://localhost:4318 |
OTLP HTTP endpoint |
OTEL_EXPORTER_OTLP_HEADERS |
(none) | Comma-separated key=value headers (e.g. for auth tokens) |
OTEL_RESOURCE_ATTRIBUTES |
(none) | Extra resource attributes as comma-separated key=value pairs |
AIRFLOW_ENV / ENV |
production |
Sets deployment.environment.name on the resource |
Usage
TaskFlow API — @instrument_task decorator
Apply @instrument_task below @task. The decorator handles OTel setup and teardown automatically.
from airflow.decorators import dag, task
from airflow_otel import instrument_task, get_tracer, get_meter
from datetime import datetime
@dag(schedule=None, start_date=datetime(2024, 1, 1))
def my_pipeline():
@task
@instrument_task
def process_records():
tracer = get_tracer()
meter = get_meter()
records_counter = meter.create_counter(
"records.processed",
description="Number of records processed",
)
with tracer.start_as_current_span("fetch") as span:
records = fetch_from_source()
span.set_attribute("records.fetched", len(records))
with tracer.start_as_current_span("transform"):
transformed = [transform(r) for r in records]
records_counter.add(len(transformed))
return len(transformed)
process_records()
dag_instance = my_pipeline()
Traditional PythonOperator — instrument_task_context context manager
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow_otel import instrument_task_context, get_tracer
from datetime import datetime
def process_records(**context):
with instrument_task_context(context) as span:
tracer = get_tracer()
with tracer.start_as_current_span("fetch") as child:
records = fetch_from_source()
child.set_attribute("records.fetched", len(records))
# Add attributes to the root span
span.set_attribute("records.processed", len(records))
with DAG("my_dag", start_date=datetime(2024, 1, 1), schedule=None) as dag:
PythonOperator(
task_id="process_records",
python_callable=process_records,
)
What gets emitted
Resource attributes
These are stable for the lifetime of the task process and safe for metric cardinality:
| Attribute | Value |
|---|---|
service.name |
task_id |
service.namespace |
dag_id |
service.instance.id |
Random UUID v4 per process |
deployment.environment.name |
AIRFLOW_ENV / ENV env var, or production |
Root span attributes
High-cardinality values are placed on the span (not the resource) to avoid unbounded metric time series:
| Attribute | Value |
|---|---|
airflow.dag_id |
DAG identifier |
airflow.task_id |
Task identifier |
airflow.run_id |
Run identifier |
airflow.try_number |
Attempt number |
airflow.logical_date |
Logical execution date (if available) |
Cross-task trace linking
Each task automatically propagates the W3C traceparent header through Airflow's XCom under the key __otel_trace_context__.
- Before starting the root span: the library pulls context from each upstream task's XCom and uses it as the parent, linking this task's span into the same trace.
- After the root span starts: the library injects the current context into XCom so downstream tasks can pick it up.
The result is that all tasks in a DAG run appear as a single connected trace in your observability tool's service graph, with each task as a child span of its upstream.
For fan-in patterns (multiple upstream tasks), the first upstream task that has a stored context is used as the parent.
Development
# Install with dev dependencies
uv sync --extra dev
# Run tests
uv run pytest
# Run tests with coverage
uv run pytest --cov=airflow_otel
Tests use an in-memory span exporter — no running collector required.
License
MIT — see LICENSE.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file airflow_otel-0.1.0.tar.gz.
File metadata
- Download URL: airflow_otel-0.1.0.tar.gz
- Upload date:
- Size: 281.5 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
d45959eed8f6908572a974b89e186fb467a9fb066aa460c8c64a751861470ac8
|
|
| MD5 |
93c38d96b468b51a57aab95d1f48df14
|
|
| BLAKE2b-256 |
299df6a92d118c442ab2fba86c2eed45e0f4df297d247c3c81af3a78608352c1
|
Provenance
The following attestation bundles were made for airflow_otel-0.1.0.tar.gz:
Publisher:
publish.yml on proffalken/airflow-otel
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
airflow_otel-0.1.0.tar.gz -
Subject digest:
d45959eed8f6908572a974b89e186fb467a9fb066aa460c8c64a751861470ac8 - Sigstore transparency entry: 1154417254
- Sigstore integration time:
-
Permalink:
proffalken/airflow-otel@cdb041f077687c1b28355df8514c64cfe90e18fe -
Branch / Tag:
refs/tags/v0.1.0 - Owner: https://github.com/proffalken
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@cdb041f077687c1b28355df8514c64cfe90e18fe -
Trigger Event:
push
-
Statement type:
File details
Details for the file airflow_otel-0.1.0-py3-none-any.whl.
File metadata
- Download URL: airflow_otel-0.1.0-py3-none-any.whl
- Upload date:
- Size: 10.5 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
96360d0f45d342140e80eb46e4b2f361e85196a688f48a09da2e0ffd14ca4fd5
|
|
| MD5 |
69a33958c5a3cf20e7973a793b9d2697
|
|
| BLAKE2b-256 |
114f33c8d156d1a126ba51e8693a818e4eed307f710d7adbe33b74cb532e2728
|
Provenance
The following attestation bundles were made for airflow_otel-0.1.0-py3-none-any.whl:
Publisher:
publish.yml on proffalken/airflow-otel
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
airflow_otel-0.1.0-py3-none-any.whl -
Subject digest:
96360d0f45d342140e80eb46e4b2f361e85196a688f48a09da2e0ffd14ca4fd5 - Sigstore transparency entry: 1154417261
- Sigstore integration time:
-
Permalink:
proffalken/airflow-otel@cdb041f077687c1b28355df8514c64cfe90e18fe -
Branch / Tag:
refs/tags/v0.1.0 - Owner: https://github.com/proffalken
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@cdb041f077687c1b28355df8514c64cfe90e18fe -
Trigger Event:
push
-
Statement type: