Integration package for Dishka DI and airflow
Project description
Airflow integration for Dishka
Though it is not required, you can use dishka-airflow integration. It features:
REQUESTscope management tied to the Airflow task instance lifecycle viaDishkaPluginand@injectAirflowProviderfor accessingContextandTaskInstancein the container atREQUESTscope- Automatic dependency resolution for
@task,@setup,@teardown,@assetand@asset.multidecorated functions - Both synchronous and asynchronous containers and task functions are supported
Scope mapping
| Scope | Airflow lifecycle | Description |
|---|---|---|
APP |
Root container (lives for the process lifetime) | Shared across all task runs. |
REQUEST |
on_task_instance_running → on_task_instance_success / on_task_instance_failed |
Opened once per task execution. Closed when the task finishes. |
Supported Airflow features
| Airflow decorator | Scope.APP |
Scope.REQUEST |
Notes |
|---|---|---|---|
@task @inject |
yes | yes | Main supported path. Decorator order: @task outside, @inject inside. |
@setup @inject |
yes | yes | Setup tasks run before other tasks in their DAG/TaskGroup context. |
@teardown @inject |
yes | yes | Teardown tasks get a fresh REQUEST scope independent of the main task's scope. |
@asset @inject |
yes | yes | Module-level definition required (@asset rejects nested functions). |
@asset.multi @inject |
yes | yes | Same restriction as @asset. Decorator order: @asset.multi outside, @inject inside. |
See the examples directory for a runnable deployment:
examples/dags/dishka_simple_dag.py—@task @injectwith an APP-scoped service.examples/dags/dishka_context_manager_dag.py—with DAG(...)context-manager style.examples/dags/dishka_with_params_dag.py— XCom task arguments alongside injected dependencies.examples/dags/dishka_airflow_provider_dag.py— injectContextand a REQUEST-scoped value.
Installation
Install using pip
pip install dishka-airflow
Or with uv
uv add dishka-airflow
How to use
- Import
from dishka_airflow import AirflowProvider, DishkaPlugin, FromDishka, inject
from dishka import make_container, Provider, provide, Scope
- Create a provider. Use
Scope.APPfor long-lived singletons (database clients, repositories, configuration). UseScope.REQUESTfor per-task objects. AddAirflowProviderto exposeContextandTaskInstance.
from airflow.sdk import Context
from airflow.sdk.types import TaskInstance
class MyProvider(Provider):
@provide(scope=Scope.APP)
def greeting_service(self) -> GreetingService:
return GreetingService()
@provide(scope=Scope.REQUEST)
def task_report(self, ti: TaskInstance) -> TaskReport:
return TaskReport(task_id=ti.task_id, run_id=ti.run_id)
- Mark the task parameters that should be injected with
FromDishka[]
from airflow.sdk import task
from dishka_airflow import FromDishka, inject
@task
@inject
def greet(
name: str,
service: FromDishka[GreetingService],
) -> str:
return service.greet(name)
- Create a
DishkaPluginsubclass in your Airflowplugins/directory and assign a container. Airflow discovers it automatically and registers the listener that manages theREQUESTscope for every task run.
# plugins/dishka_plugin.py
from dishka import make_container
from dishka_airflow import AirflowProvider, DishkaPlugin
from myapp.providers import MyProvider
class MyDishkaPlugin(DishkaPlugin):
name = "my_dishka_plugin"
container = make_container(MyProvider(), AirflowProvider())
That is all — no further wiring is needed. Once Airflow loads the plugin,
every @inject-decorated task function will have its FromDishka dependencies
resolved automatically.
Decorator ordering
Decorator order matters. @inject must be the innermost decorator so it
wraps the raw function and strips FromDishka parameters from the signature
before Airflow inspects it. Airflow decorators (@task, @setup, @teardown,
@asset, @asset.multi) must be outermost.
# Correct
@task
@inject
def my_task(dep: FromDishka[MyService]) -> str: ...
# Wrong — Airflow sees the unstripped FromDishka parameter
@inject
@task
def my_task(dep: FromDishka[MyService]) -> str: ...
XCom arguments
@inject only removes FromDishka parameters. Ordinary Airflow arguments
(XCom values, operator links) are preserved in the signature and passed through
as usual.
@task
@inject
def consume(
name: str, # XCom value from upstream task
service: FromDishka[GreetingService], # injected by dishka
) -> str:
return service.greet(name)
Assets
@asset and @asset.multi require module-level function definitions. Defining
an asset inside a test or another function raises a ValueError at decoration
time.
# Module level — required for @asset
@asset(schedule=None)
@inject
def my_asset(service: FromDishka[MyService]) -> str:
return service.run()
AirflowProvider integration types
AirflowProvider registers the following Airflow objects as context
dependencies so you can use them as factory parameters in your providers.
| Type | Scope | Description |
|---|---|---|
Context |
REQUEST |
Airflow template context for the current task run |
TaskInstance |
REQUEST |
The running task instance object |
from airflow.sdk import Context
from airflow.sdk.types import TaskInstance
from dishka import Provider, Scope, provide
class ReportProvider(Provider):
@provide(scope=Scope.REQUEST)
def task_report(self, ti: TaskInstance, ctx: Context) -> TaskReport:
return TaskReport(
task_id=ti.task_id,
run_id=ti.run_id,
dag_id=ctx["dag"].dag_id,
)
Async containers
Pass an AsyncContainer to DishkaPlugin when you use async providers (e.g.
AsyncEngine, async HTTP clients). @inject detects the container kind
automatically and dispatches to the right resolution path.
An atexit handler is registered to close the async container when the task
subprocess exits.
from dishka import make_async_container
from dishka_airflow import AirflowProvider, DishkaPlugin
from myapp.providers import AsyncMyProvider
class MyDishkaPlugin(DishkaPlugin):
name = "my_dishka_plugin"
container = make_async_container(AsyncMyProvider(), AirflowProvider())
Full example
# plugins/dishka_plugin.py
from dishka import Provider, Scope, make_container, provide
from dishka_airflow import AirflowProvider, DishkaPlugin
class GreetingService:
def greet(self, name: str) -> str:
return f"Hello, {name}!"
class MyProvider(Provider):
@provide(scope=Scope.APP)
def greeting_service(self) -> GreetingService:
return GreetingService()
class MyDishkaPlugin(DishkaPlugin):
name = "my_dishka_plugin"
container = make_container(MyProvider(), AirflowProvider())
# dags/my_dag.py
import pendulum
from airflow.sdk import dag, task
from dishka_airflow import FromDishka, inject
from myapp.services import GreetingService
@dag(
schedule=None,
start_date=pendulum.datetime(2025, 1, 1, tz="UTC"),
catchup=False,
)
def my_dag() -> None:
@task
@inject
def greet(service: FromDishka[GreetingService]) -> str:
message = service.greet("Airflow")
print(message)
return message
greet()
my_dag()
Run the example
A fully wired Docker deployment lives in the examples/ directory. It bundles
a layered myapp package (domain / application / infrastructure), four
example DAGs and a DishkaPlugin.
cd examples
docker compose up --build
Open http://localhost:8080, log in as admin (password in the logs), and
trigger the example DAGs to see injected values in the task logs.
docker compose down -v # tear down and remove volumes
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file dishka_airflow-0.1.tar.gz.
File metadata
- Download URL: dishka_airflow-0.1.tar.gz
- Upload date:
- Size: 264.7 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: uv/0.11.24 {"installer":{"name":"uv","version":"0.11.24","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
ba6d2259b6d863e247978c2ba17320c5ba39cde32838890b67d84e594fe4ed18
|
|
| MD5 |
38c1a0953c43a8625fbb62f3d663bee5
|
|
| BLAKE2b-256 |
a77e6e5dc4da6f08e4da80466c42015cc45a2afcdd3e2507a4ef449a70d8bdd6
|
File details
Details for the file dishka_airflow-0.1-py3-none-any.whl.
File metadata
- Download URL: dishka_airflow-0.1-py3-none-any.whl
- Upload date:
- Size: 13.0 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: uv/0.11.24 {"installer":{"name":"uv","version":"0.11.24","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
75c62485322d95f8751cd7b584bb20078ed0dddeb4cef0902102ea0738f2f2d0
|
|
| MD5 |
53135ea3712f450261aa0deac1ef4ab7
|
|
| BLAKE2b-256 |
21367908382b7827be91ee38ef595a3c787ab97a5c407e4fdfb9fcacb8d15130
|