Skip to main content

Tooling to assist with migrating from Airflow to Dagster.

Project description

Airlift

Airlift is a toolkit for observing Airflow instances from within Dagster and for accelerating the migration of Airflow DAGs to Dagster assets.

Goals

  • Observe Airflow DAGs and their execution history with no changes to Airflow code
  • Model and observe assets orchestrated by Airflow with no changes to Airflow code
  • (Future) Enable a migration process that
    • Can be done task-by-task in any order with minimal coordination
    • Has task-by-task rollback to reduce risk
    • That retains Airflow DAG structure and execution history during the migration

Process

  • Peer
    • Observe an Airflow instance from within a Dagster Deployment via the Airflow REST API.
    • This loads every Airflow DAG as an asset definition and creates a sensor that polls Airflow for execution history.
  • Observe
    • Add a mapping that maps the airflow dag and task id to a basket of definitions that you want to observe. (e.g. render the full lineage the dbt models an Airflow task orchestrates)
    • The sensor used for peering also polls for task execution history, and adds materializations to an observed asset when its corresponding task successfully executes

REST API Availability

Airlift depends on the the availability of Airflow’s REST API. Airflow’s REST API was made stable in its 2.0 release (Dec 2020) and was introduced experimentally in 1.10 in August 2018. Currently Airflow requires the availability of the REST API.

  • OSS: Stable as of 2.00
  • MWAA
    • Note: only available in Airflow 2.4.3 or later on MWAA.
  • Cloud Composer: No limitations as far as we know.
  • Astronomer: No limitations as far as we know.

Peering

The first step is to peer the Dagster Deployment and the Airflow instance.

To do this you need to install airlift and make it available in your Dagster deployment.

pip install uv
uv pip install dagster-airlift[core]

At that point you create a Definitions object using build_defs_from_airflow_instance.

from dagster_airlift.core import AirflowInstance, build_defs_from_airflow_instance

defs = build_defs_from_airflow_instance(
    airflow_instance=AirflowInstance(
        # other backends available (e.g. MwaaSessionAuthBackend)
        auth_backend=BasicAuthBackend(
            webserver_url="http://localhost:8080",
            username="admin",
            password="admin",
        ),
        name="airflow_instance_one",
    )
)

Note: When a code server is loaded for the first time it will query the target Airflow REST API. Subsequent process loads (e.g. a run worker loading) will used a cached response. If you want Dagster to pick up new DAGs, you will need restart the code server.

An MWAA auth backend is available at dagster_airlift.mwaa.auth.MwaaSessionAuthBackend

This function creates:

  • An asset representing each DAG. This asset is “materialized” whenever a DAG run completes.
  • A sensor that polls the Airflow instance for operational information. This is what keeps the DAG execution history up to date. It must be on to get timely information.

Peering to multiple instances

Airlift supports peering to multiple Airflow instances, as you can invoke create_airflow_instance_defs multiple times and combine them with Definitions.merge:

from dagster import Definitions

from dagster_airlift.core import AirflowInstance, build_defs_from_airflow_instance

defs = Definitions.merge(
    build_defs_from_airflow_instance(
        airflow_instance=AirflowInstance(
            auth_backend=BasicAuthBackend(
                webserver_url="http://yourcompany.com/instance_one",
                username="admin",
                password="admin",
            ),
            name="airflow_instance_one",
        )
    ),
    build_defs_from_airflow_instance(
        airflow_instance=AirflowInstance(
            auth_backend=BasicAuthBackend(
                webserver_url="http://yourcompany.com/instance_two",
                username="admin",
                password="admin",
            ),
            name="airflow_instance_two",
        )
    ),
)

Observing Assets

The next step is to observe the assets that are orchestrated from Airflow. In order to do that we must create the corresponding definitions in the Dagster deployment.

We have an included example at examples/experimental/dagster-airlift/examples/peering-with-dbt. We suggest mimicking the structure of this project as a starting point.

To add definitions that an Airlift-enabled deployment will observed, you need to use the orchestrated_defs argument to build_defs_from_airflow_instance

Note: This also accepts an object of type Definitions. We have recently added a function, Definitions.merge, which allows users to combine and compose Definitions objects, which this utilizes.

For most projects we anticipate the following process:

  • Create a new factory class for every operator type or specialized use case of an operator (e.g. a BashOperator invoking a dbt project).
  • Use that factory to create definitions associating with each Airflow task.

In our example we have a pre-written factory class (in examples/experimental/dagster-airlift/dagster_airlift/dbt/multi_asset.py and installable by uv pip install dagster-airlift[dbt]) that is associated with invoking a dbt project:

from dagster_airlift.def_factory import DefsFactory

@dataclass
class DbtProjectDefs(DefsFactory):
    dbt_project_path: Path
    name: str
    group: Optional[str] = None

    def build_defs(self) -> Definitions:
        dbt_manifest_path = self.dbt_project_path / "target" / "manifest.json"

        @dbt_assets(manifest=json.loads(dbt_manifest_path.read_text()), name=self.name)
        def _dbt_asset(context: AssetExecutionContext, dbt: DbtCliResource):
            yield from dbt.cli(["build"], context=context).stream()

        if self.group:
            _dbt_asset = _dbt_asset.with_attributes(
                group_names_by_key={key: self.group for key in _dbt_asset.keys}
            )

        return Definitions(
            assets=[_dbt_asset],
            resources={
                "dbt": DbtCliResource(
                    project_dir=self.dbt_project_path, profiles_dir=self.dbt_project_path
                )
            },
        )

We imagine most people will have to customize this, so we encourage you to copy and paste it for now.

Then you can add instances of the Factory to your build_defs_from_airflow_instance call:

defs = build_defs_from_airflow_instance(
    airflow_instance=airflow_instance,
    orchestrated_defs=defs_from_factories(
        DbtProjectDefs(
            name="dbt_dag__build_dbt_models",
            dbt_project_path=dbt_project_path(),
            group="dbt",
        ),
    ),
)

Mapping assets to tasks

We default to a "convention over configuration" approach to affiliate Dagster assets with Airflow tasks.

Note the naming convention dbt_dag__build_dbt_models. By default we use this convention to encode the dag name (dbt_dag) and task id (build_dbt_models) of the Airflow task actually orchestrating the computation.

This name gets parsed and set corresponding tags (airlift/dag_id and airlift/task_id) on both the asset and the op that computes it. Alternatively you can set those tags explicitly if you don’t want to rely on the naming convention.

Once this is done you should be able to reload your definitions and the see the full dbt project. Once you run the corresponding Airflow Dag and task, each task completion will corresponding to an asset materialization in any asset that is orchestrated by that task.

Note: There will be some delay as this process is managed by a Dagster sensor that polls the Airflow instance for task history. This is every 30 seconds by default (you can reduce down to one second via the minimum_interval_seconds argument to sensor), so there will be some delay.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

dagster_airlift-0.0.5.tar.gz (23.5 kB view details)

Uploaded Source

Built Distribution

dagster_airlift-0.0.5-py3-none-any.whl (24.3 kB view details)

Uploaded Python 3

File details

Details for the file dagster_airlift-0.0.5.tar.gz.

File metadata

  • Download URL: dagster_airlift-0.0.5.tar.gz
  • Upload date:
  • Size: 23.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/1.15.0 pkginfo/1.10.0 requests/2.32.2 setuptools/70.0.0 requests-toolbelt/1.0.0 tqdm/4.66.4 CPython/3.11.9

File hashes

Hashes for dagster_airlift-0.0.5.tar.gz
Algorithm Hash digest
SHA256 06e1058d86f3bf2cfc3cd4dcd4001b92641cb780035fb7803a5f0209b1e02be4
MD5 1bfe307025b5daa46f801c7c1f4be22e
BLAKE2b-256 1a9c12b115e976469227ded3cf2f033eb47eee5ada3c2505bcdf22b8070dc356

See more details on using hashes here.

File details

Details for the file dagster_airlift-0.0.5-py3-none-any.whl.

File metadata

  • Download URL: dagster_airlift-0.0.5-py3-none-any.whl
  • Upload date:
  • Size: 24.3 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/1.15.0 pkginfo/1.10.0 requests/2.32.2 setuptools/70.0.0 requests-toolbelt/1.0.0 tqdm/4.66.4 CPython/3.11.9

File hashes

Hashes for dagster_airlift-0.0.5-py3-none-any.whl
Algorithm Hash digest
SHA256 c4506b621fe032448e5401701982a30e3e06fbb0843feb6823427ff19021a79b
MD5 3b6e592df4df63f404eab17efb16aff9
BLAKE2b-256 37aa47e7443c375f7a4ad92b7770bdec359e2d06c0d37e68ef567a24b08af538

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page