Skip to main content

A Fivetran async provider for Apache Airflow

Project description

Fivetran Async Provider for Apache Airflow

pre-commit Ruff

This package provides an async operator, sensor and hook that integrates Fivetran into Apache Airflow. FivetranSensor allows you to monitor a Fivetran sync job for completion before running downstream processes. FivetranOperator submits a Fivetran sync job and polls for its status on the triggerer. Since an async sensor or operator frees up worker slot while polling is happening on the triggerer, they consume less resources when compared to traditional "sync" sensors and operators.

Fivetran automates your data pipeline, and Airflow automates your data processing.

Installation

Prerequisites: An environment running apache-airflow.

pip install airflow-provider-fivetran-async

Configuration

In the Airflow user interface, configure a Connection for Fivetran. Most of the Connection config fields will be left blank. Configure the following fields:

  • Conn Id: fivetran
  • Conn Type: Fivetran
  • Login: Fivetran API Key
  • Password: Fivetran API Secret

Find the Fivetran API Key and Secret in the Fivetran Account Settings, under the API Config section. See our documentation for more information on Fivetran API Authentication.

The sensor assumes the Conn Id is set to fivetran, however if you are managing multiple Fivetran accounts, you can set this to anything you like. See the DAG in examples to see how to specify a custom Conn Id.

Modules

Fivetran Operator Async

from fivetran_provider_async.operators import FivetranOperator

FivetranOperator submits a Fivetran sync job and monitors it on trigger for completion.

FivetranOperator requires that you specify the connector_id of the Fivetran connector you wish to trigger. You can find connector_id in the Settings page of the connector you configured in the Fivetran dashboard.

The FivetranOperator will wait for the sync to complete so long as wait_for_completion=True (this is the default). It is recommended that you run in deferrable mode (this is also the default). If wait_for_completion=False, the operator will return the timestamp for the last sync.

Import into your DAG via:

Fivetran Sensor Async

from fivetran_provider_async.sensors import FivetranSensor

FivetranSensor monitors a Fivetran sync job for completion. Monitoring with FivetranSensor allows you to trigger downstream processes only when the Fivetran sync jobs have completed, ensuring data consistency.

FivetranSensor requires that you specify the connector_id of the Fivetran connector you want to wait for. You can find connector_id in the Settings page of the connector you configured in the Fivetran dashboard.

You can use multiple instances of FivetranSensor to monitor multiple Fivetran connectors.

FivetranSensor is most commonly useful in two scenarios:

  1. Fivetran is using a separate scheduler than the Airflow scheduler.
  2. You set wait_for_completion=False in the FivetranOperator, and you need to await the FivetranOperator task later. (You may want to do this if you want to arrange your DAG such that some tasks are dependent on starting a sync and other tasks are dependent on completing a sync).

If you are doing the 1st pattern, you may find it useful to set the completed_after_time to data_interval_end, or data_interval_end with some buffer:

fivetran_sensor = FivetranSensor(
    task_id="wait_for_fivetran_externally_scheduled_sync",
    connector_id="bronzing_largely",
    poke_interval=5,
    completed_after_time="{{ data_interval_end + macros.timedelta(minutes=1) }}",
)

If you are doing the 2nd pattern, you can use XComs to pass the target completed time to the sensor:

fivetran_op = FivetranOperator(
    task_id="fivetran_sync_my_db",
    connector_id="bronzing_largely",
    wait_for_completion=False,
)

fivetran_sensor = FivetranSensor(
    task_id="wait_for_fivetran_db_sync",
    connector_id="bronzing_largely",
    poke_interval=5,
    completed_after_time="{{ task_instance.xcom_pull('fivetran_sync_op', key='return_value') }}",
)

fivetran_op >> fivetran_sensor

You may also specify the FivetranSensor without a completed_after_time. In this case, the sensor will make note of when the last completed time was, and will wait for a new completed time.

Examples

See the examples directory for an example DAG.

Issues

Please submit issues and pull requests in our official repo: https://github.com/astronomer/airflow-provider-fivetran-async

We are happy to hear from you. Please email any feedback to the authors at humans@astronomer.io.

Security Policy

Check the project's Security Policy to learn how to report security vulnerabilities in airflow-provider-fivetran-async and how security issues reported to the airflow-provider-fivetran-async security team are handled.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

airflow_provider_fivetran_async-2.4.0a1.tar.gz (21.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

File details

Details for the file airflow_provider_fivetran_async-2.4.0a1.tar.gz.

File metadata

File hashes

Hashes for airflow_provider_fivetran_async-2.4.0a1.tar.gz
Algorithm Hash digest
SHA256 f1d309084bf6a4da674fca90a6b9dedafb1fb26a345cbc21ff9e6acbc7b7988b
MD5 b78dea9f029d3b87f5886e15b4b0961f
BLAKE2b-256 6edd76e137878b3da10125e792748902ae43bacb64fd724387af9f37b763eb2e

See more details on using hashes here.

Provenance

The following attestation bundles were made for airflow_provider_fivetran_async-2.4.0a1.tar.gz:

Publisher: ci.yml on astronomer/airflow-provider-fivetran-async

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file airflow_provider_fivetran_async-2.4.0a1-py3-none-any.whl.

File metadata

File hashes

Hashes for airflow_provider_fivetran_async-2.4.0a1-py3-none-any.whl
Algorithm Hash digest
SHA256 eac4d7988f1338574c470d02a4c3f1f1385e39d479066e5f265911ab1bfc9f99
MD5 15b1f967fb49998934a5c13da1ca2c47
BLAKE2b-256 c42036738cf5caaae625744351b14439e1b73710b400f40918de7af72d18ba54

See more details on using hashes here.

Provenance

The following attestation bundles were made for airflow_provider_fivetran_async-2.4.0a1-py3-none-any.whl:

Publisher: ci.yml on astronomer/airflow-provider-fivetran-async

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page