Skip to main content

Integration package for Dishka DI and airflow

Project description

Airflow integration for Dishka

Downloads Package version Supported Python versions

Though it is not required, you can use dishka-airflow integration. It features:

  • REQUEST scope management tied to the Airflow task instance lifecycle via DishkaPlugin and @inject
  • AirflowProvider for accessing Context and TaskInstance in the container at REQUEST scope
  • Automatic dependency resolution for @task, @setup, @teardown, @asset and @asset.multi decorated functions
  • Both synchronous and asynchronous containers and task functions are supported

Scope mapping

Scope Airflow lifecycle Description
APP Root container (lives for the process lifetime) Shared across all task runs.
REQUEST on_task_instance_runningon_task_instance_success / on_task_instance_failed Opened once per task execution. Closed when the task finishes.

Supported Airflow features

Airflow decorator Scope.APP Scope.REQUEST Notes
@task @inject yes yes Main supported path. Decorator order: @task outside, @inject inside.
@setup @inject yes yes Setup tasks run before other tasks in their DAG/TaskGroup context.
@teardown @inject yes yes Teardown tasks get a fresh REQUEST scope independent of the main task's scope.
@asset @inject yes yes Module-level definition required (@asset rejects nested functions).
@asset.multi @inject yes yes Same restriction as @asset. Decorator order: @asset.multi outside, @inject inside.

See the examples directory for a runnable deployment:

  • examples/dags/dishka_simple_dag.py@task @inject with an APP-scoped service.
  • examples/dags/dishka_context_manager_dag.pywith DAG(...) context-manager style.
  • examples/dags/dishka_with_params_dag.py — XCom task arguments alongside injected dependencies.
  • examples/dags/dishka_airflow_provider_dag.py — inject Context and a REQUEST-scoped value.

Installation

Install using pip

pip install dishka-airflow

Or with uv

uv add dishka-airflow

How to use

  1. Import
from dishka_airflow import AirflowProvider, DishkaPlugin, FromDishka, inject
from dishka import make_container, Provider, provide, Scope
  1. Create a provider. Use Scope.APP for long-lived singletons (database clients, repositories, configuration). Use Scope.REQUEST for per-task objects. Add AirflowProvider to expose Context and TaskInstance.
from airflow.sdk import Context
from airflow.sdk.types import TaskInstance

class MyProvider(Provider):
    @provide(scope=Scope.APP)
    def greeting_service(self) -> GreetingService:
        return GreetingService()

    @provide(scope=Scope.REQUEST)
    def task_report(self, ti: TaskInstance) -> TaskReport:
        return TaskReport(task_id=ti.task_id, run_id=ti.run_id)
  1. Mark the task parameters that should be injected with FromDishka[]
from airflow.sdk import task
from dishka_airflow import FromDishka, inject

@task
@inject
def greet(
    name: str,
    service: FromDishka[GreetingService],
) -> str:
    return service.greet(name)
  1. Create a DishkaPlugin subclass in your Airflow plugins/ directory and assign a container. Airflow discovers it automatically and registers the listener that manages the REQUEST scope for every task run.
# plugins/dishka_plugin.py
from dishka import make_container
from dishka_airflow import AirflowProvider, DishkaPlugin

from myapp.providers import MyProvider


class MyDishkaPlugin(DishkaPlugin):
    name = "my_dishka_plugin"
    container = make_container(MyProvider(), AirflowProvider())

That is all — no further wiring is needed. Once Airflow loads the plugin, every @inject-decorated task function will have its FromDishka dependencies resolved automatically.

Decorator ordering

Decorator order matters. @inject must be the innermost decorator so it wraps the raw function and strips FromDishka parameters from the signature before Airflow inspects it. Airflow decorators (@task, @setup, @teardown, @asset, @asset.multi) must be outermost.

# Correct
@task
@inject
def my_task(dep: FromDishka[MyService]) -> str: ...

# Wrong — Airflow sees the unstripped FromDishka parameter
@inject
@task
def my_task(dep: FromDishka[MyService]) -> str: ...

XCom arguments

@inject only removes FromDishka parameters. Ordinary Airflow arguments (XCom values, operator links) are preserved in the signature and passed through as usual.

@task
@inject
def consume(
    name: str,                              # XCom value from upstream task
    service: FromDishka[GreetingService],   # injected by dishka
) -> str:
    return service.greet(name)

Assets

@asset and @asset.multi require module-level function definitions. Defining an asset inside a test or another function raises a ValueError at decoration time.

# Module level — required for @asset
@asset(schedule=None)
@inject
def my_asset(service: FromDishka[MyService]) -> str:
    return service.run()

AirflowProvider integration types

AirflowProvider registers the following Airflow objects as context dependencies so you can use them as factory parameters in your providers.

Type Scope Description
Context REQUEST Airflow template context for the current task run
TaskInstance REQUEST The running task instance object
from airflow.sdk import Context
from airflow.sdk.types import TaskInstance
from dishka import Provider, Scope, provide

class ReportProvider(Provider):
    @provide(scope=Scope.REQUEST)
    def task_report(self, ti: TaskInstance, ctx: Context) -> TaskReport:
        return TaskReport(
            task_id=ti.task_id,
            run_id=ti.run_id,
            dag_id=ctx["dag"].dag_id,
        )

Async containers

Pass an AsyncContainer to DishkaPlugin when you use async providers (e.g. AsyncEngine, async HTTP clients). @inject detects the container kind automatically and dispatches to the right resolution path.

An atexit handler is registered to close the async container when the task subprocess exits.

from dishka import make_async_container
from dishka_airflow import AirflowProvider, DishkaPlugin

from myapp.providers import AsyncMyProvider


class MyDishkaPlugin(DishkaPlugin):
    name = "my_dishka_plugin"
    container = make_async_container(AsyncMyProvider(), AirflowProvider())

Full example

# plugins/dishka_plugin.py
from dishka import Provider, Scope, make_container, provide
from dishka_airflow import AirflowProvider, DishkaPlugin


class GreetingService:
    def greet(self, name: str) -> str:
        return f"Hello, {name}!"


class MyProvider(Provider):
    @provide(scope=Scope.APP)
    def greeting_service(self) -> GreetingService:
        return GreetingService()


class MyDishkaPlugin(DishkaPlugin):
    name = "my_dishka_plugin"
    container = make_container(MyProvider(), AirflowProvider())
# dags/my_dag.py
import pendulum
from airflow.sdk import dag, task
from dishka_airflow import FromDishka, inject

from myapp.services import GreetingService


@dag(
    schedule=None,
    start_date=pendulum.datetime(2025, 1, 1, tz="UTC"),
    catchup=False,
)
def my_dag() -> None:
    @task
    @inject
    def greet(service: FromDishka[GreetingService]) -> str:
        message = service.greet("Airflow")
        print(message)
        return message

    greet()


my_dag()

Run the example

A fully wired Docker deployment lives in the examples/ directory. It bundles a layered myapp package (domain / application / infrastructure), four example DAGs and a DishkaPlugin.

cd examples
docker compose up --build

Open http://localhost:8080, log in as admin (password in the logs), and trigger the example DAGs to see injected values in the task logs.

docker compose down -v   # tear down and remove volumes

Project details


Release history Release notifications | RSS feed

This version

0.1

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

dishka_airflow-0.1.tar.gz (264.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

dishka_airflow-0.1-py3-none-any.whl (13.0 kB view details)

Uploaded Python 3

File details

Details for the file dishka_airflow-0.1.tar.gz.

File metadata

  • Download URL: dishka_airflow-0.1.tar.gz
  • Upload date:
  • Size: 264.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: uv/0.11.24 {"installer":{"name":"uv","version":"0.11.24","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for dishka_airflow-0.1.tar.gz
Algorithm Hash digest
SHA256 ba6d2259b6d863e247978c2ba17320c5ba39cde32838890b67d84e594fe4ed18
MD5 38c1a0953c43a8625fbb62f3d663bee5
BLAKE2b-256 a77e6e5dc4da6f08e4da80466c42015cc45a2afcdd3e2507a4ef449a70d8bdd6

See more details on using hashes here.

File details

Details for the file dishka_airflow-0.1-py3-none-any.whl.

File metadata

  • Download URL: dishka_airflow-0.1-py3-none-any.whl
  • Upload date:
  • Size: 13.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: uv/0.11.24 {"installer":{"name":"uv","version":"0.11.24","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for dishka_airflow-0.1-py3-none-any.whl
Algorithm Hash digest
SHA256 75c62485322d95f8751cd7b584bb20078ed0dddeb4cef0902102ea0738f2f2d0
MD5 53135ea3712f450261aa0deac1ef4ab7
BLAKE2b-256 21367908382b7827be91ee38ef595a3c787ab97a5c407e4fdfb9fcacb8d15130

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page