Skip to main content

Apache Airflow Providers containing 3rd party integrations supported natively in Airflow

Project description

static/banner.svg

Astronomer Cosmos fury

A framework for generating Apache Airflow DAGs from other tools and frameworks.

Current support for:
  • dbt

Coming soon:
  • Jupyter

  • Hex

  • And more…open an issue if you have a request!

Principles

Astronomer Cosmos is a package to parse and render third-party workflows as Airflow DAGs, Airflow TaskGroups, or individual tasks.

dbt_dag.png

Cosmos contains providers for third-party tools, and each provider can be deconstructed into the following components:

  • parsers: These are mostly hidden from the end user and are responsible for extracting the workflow from the provider and converting it into Task and Group objects. These are executed whenever the Airflow Scheduler heartbeats, allowing us to dynamically render the dependency graph of the workflow.

  • operators: These represent the “user interface” of Cosmos – lightweight classes the user can import and implement in their DAG to define their target behavior. They are responsible for executing the tasks in the workflow.

Cosmos operates on a few guiding principles:

  • Dynamic: Cosmos generates DAGs dynamically, meaning that the dependency graph of the workflow is generated at runtime. This allows users to update their workflows without having to restart Airflow.

  • Flexible: Cosmos is not opinionated in that it does not enforce a specific rendering method for third-party systems; users can decide whether they’d like to render their workflow as a DAG, TaskGroup, or individual task.

  • Extensible: Cosmos is designed to be extensible. Users can add their own parsers and operators to support their own workflows.

  • Modular: Cosmos is designed to be modular. Users can install only the dependencies they need for their workflows.

Quickstart

Clone this repository to set up a local environment. Then, head over to our astronomer-cosmos/examples directory and follow its README!

Installation

Install and update using pip:

General Installation

pip install astronomer-cosmos

Note that this only installs dependencies for the core provider. Read below for more info on how to install specific providers.

Database Specific Installation (dbt)

To only install the dependencies for a specific databases, specify it in the extra argument as dbt.<database>. For example, for postgres run:

pip install 'astronomer-cosmos[dbt.postgres]'
Extras

Extra Name

Installation Command

Dependencies

core

pip install astronomer-cosmos

apache-airflow, pydantic, Jinja2

dbt.all

pip install 'astronomer-cosmos[dbt.all]'

astronomer-cosmos, dbt-core, dbt-bigquery, dbt-redshift, dbt-snowflake, dbt-postgres

dbt.postgres

pip install 'astronomer-cosmos[dbt.postgres]'

astronomer-cosmos, dbt-core, dbt-postgres

dbt.bigquery

pip install 'astronomer-cosmos[dbt.bigquery]'

astronomer-cosmos, dbt-core, dbt-bigquery

dbt.redshift

pip install 'astronomer-cosmos[dbt.redshift]'

astronomer-cosmos, dbt-core, dbt-redshift

dbt.snowflake

pip install 'astronomer-cosmos[dbt.snowflake]'

astronomer-cosmos, dbt-core, dbt-snowflake

Example Usage

Imagine we have dbt projects located at ./dbt/{{DBT_PROJECT_NAME}}. We can render these projects as a Airflow DAGs using the DbtDag class:

from pendulum import datetime
from airflow import DAG
from cosmos.providers.dbt.dag import DbtDag

# dag for the project jaffle_shop
jaffle_shop = DbtDag(
    dbt_project_name="jaffle_shop",
    conn_id="airflow_db",
    dbt_args={
        "schema": "public",
    },
    dag_id="jaffle_shop",
    start_date=datetime(2022, 11, 27),
)

Simiarly, we can render these projects as Airflow TaskGroups using the DbtTaskGroup class. Here’s an example with the jaffle_shop project:

from pendulum import datetime

from airflow import DAG
from airflow.operators.empty import EmptyOperator
from cosmos.providers.dbt.task_group import DbtTaskGroup


with DAG(
    dag_id="extract_dag",
    start_date=datetime(2022, 11, 27),
    schedule="@daily",
) as dag:

    e1 = EmptyOperator(task_id="ingestion_workflow")

    dbt_tg = DbtTaskGroup(
        group_id="dbt_tg",
        dbt_project_name="jaffle_shop",
        conn_id="airflow_db",
        dbt_args={
            "schema": "public",
        },
        dag=dag,
    )

    e2 = EmptyOperator(task_id="some_extraction")

    e1 >> dbt_tg >> e2

Changelog

We follow Semantic Versioning for releases. Check CHANGELOG.rst for the latest changes.

Contributing Guide

All contributions, bug reports, bug fixes, documentation improvements, enhancements are welcome.

A detailed overview an how to contribute can be found in the Contributing Guide.

As contributors and maintainers to this project, you are expected to abide by the Contributor Code of Conduct.

License

Apache License 2.0

Project details


Release history Release notifications | RSS feed

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

astronomer-cosmos-0.0.7.tar.gz (14.7 kB view hashes)

Uploaded Source

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page