Skip to main content

prefect-airbyte contains tasks for interacting with Airbyte instances

Project description

prefect-airbyte

PyPI

Welcome!

prefect-airbyte is a collection of prebuilt Prefect tasks and flows that can be used to quickly construct Prefect flows to interact with Airbyte.

Getting Started

Python setup

Requires an installation of Python 3.7+

We recommend using a Python virtual environment manager such as pipenv, conda or virtualenv.

These tasks are designed to work with Prefect 2.0. For more information about how to use Prefect, please refer to the Prefect documentation.

Airbyte setup

See the airbyte documention on how to get your own instance.

Installation

Install prefect-airbyte

pip install prefect-airbyte

A list of available blocks in prefect-airbyte and their setup instructions can be found here.

Examples

Create an AirbyteServer block and save it

from prefect_airbyte.server import AirbyteServer

# running airbyte locally at http://localhost:8000 with default auth
local_airbyte_server = AirbyteServer()

# running airbyte remotely at http://<someIP>:<somePort> as user `Marvin`
remote_airbyte_server = AirbyteServer(
    username="Marvin",
    password="DontPanic42",
    server_host="42.42.42.42",
    server_port="4242"
)

local_airbyte_server.save("my-local-airbyte-server")

remote_airbyte_server.save("my-remote-airbyte-server")

Trigger a defined connection sync

from prefect import flow
from prefect_airbyte.server import AirbyteServer
from prefect_airbyte.connections import AirbyteConnection
from prefect_airbyte.flows import run_connection_sync

server = AirbyteServer(server_host="localhost", server_port=8000)

connection = AirbyteConnection(
    airbyte_server=server,
    connection_id="e1b2078f-882a-4f50-9942-cfe34b2d825b",
    status_updates=True,
)

@flow
def airbyte_syncs():
    # do some setup

    sync_result = run_connection_sync(
        airbyte_connection=connection,
    )

    # do some other things, like trigger DBT based on number of records synced
    print(f'Number of Records Synced: {sync_result.records_synced}')
❯ python airbyte_syncs.py
03:46:03 | prefect.engine - Created flow run 'thick-seahorse' for flow 'example_trigger_sync_flow'
03:46:03 | Flow run 'thick-seahorse' - Using task runner 'ConcurrentTaskRunner'
03:46:03 | Flow run 'thick-seahorse' - Created task run 'trigger_sync-35f0e9c2-0' for task 'trigger_sync'
03:46:03 | prefect - trigger airbyte connection: e1b2078f-882a-4f50-9942-cfe34b2d825b, poll interval 3 seconds
03:46:03 | prefect - pending
03:46:06 | prefect - running
03:46:09 | prefect - running
03:46:12 | prefect - running
03:46:16 | prefect - running
03:46:19 | prefect - running
03:46:22 | prefect - Job 26 succeeded.
03:46:22 | Task run 'trigger_sync-35f0e9c2-0' - Finished in state Completed(None)
03:46:22 | Flow run 'thick-seahorse' - Finished in state Completed('All states completed.')

Export an Airbyte instance's configuration

NOTE: The API endpoint corresponding to this task is no longer supported by open-source Airbyte versions as of v0.40.7. Check out the Octavia CLI docs for more info.

import gzip

from prefect import flow, task
from prefect_airbyte.configuration import export_configuration
from prefect_airbyte.server import AirbyteServer

@task
def zip_and_write_somewhere(
      airbyte_config: bytearray,
      somewhere: str,
):
    with gzip.open(somewhere, 'wb') as f:
        f.write(airbyte_config)

@flow
def example_export_configuration_flow(filepath: str):

    # Run other tasks and subflows here

    airbyte_config = export_configuration(
        airbyte_server=AirbyteServer.load("my-airbyte-server-block")
    )

    zip_and_write_somewhere(
        somewhere=filepath,
        airbyte_config=airbyte_config
    )

if __name__ == "__main__":
    example_export_configuration_flow('*://**/my_destination.gz')

Use with_options to customize options on any existing task or flow

from prefect import flow
from prefect_airbyte.connections import AirbyteConnection
from prefect_airbyte.flows import run_connection_sync

custom_run_connection_sync = run_connection_sync.with_options(
    name="Custom Airbyte Sync Flow",
    retries=2,
    retry_delay_seconds=10,
)
 
 @flow
 def some_airbyte_flow():
    custom_run_connection_sync(
        airbyte_connection=AirbyteConnection.load("my-airbyte-connection-block")
    )
 
 some_airbyte_flow()

For more tips on how to use tasks and flows in a Collection, check out Using Collections!

Resources

If you encounter and bugs while using prefect-airbyte, feel free to open an issue in the prefect-airbyte repository.

If you have any questions or issues while using prefect-airbyte, you can find help in either the Prefect Discourse forum or the Prefect Slack community

Feel free to star or watch prefect-airbyte for updates too!

Contribute

If you'd like to help contribute to fix an issue or add a feature to prefect-airbyte, please propose changes through a pull request from a fork of the repository.

Contribution Steps:

  1. Fork the repository
  2. Clone the forked repository
  3. Install the repository and its dependencies:
pip install -e ".[dev]"
  1. Make desired changes.
  2. Add tests.
  3. Insert an entry to CHANGELOG.md
  4. Install pre-commit to perform quality checks prior to commit:
 pre-commit install
  1. git commit, git push, and create a pull request.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

prefect-airbyte-0.3.2.tar.gz (38.2 kB view details)

Uploaded Source

Built Distribution

prefect_airbyte-0.3.2-py3-none-any.whl (19.7 kB view details)

Uploaded Python 3

File details

Details for the file prefect-airbyte-0.3.2.tar.gz.

File metadata

  • Download URL: prefect-airbyte-0.3.2.tar.gz
  • Upload date:
  • Size: 38.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.2 CPython/3.11.6

File hashes

Hashes for prefect-airbyte-0.3.2.tar.gz
Algorithm Hash digest
SHA256 dac130a05ba9dee3c31060c78212b2d3b731189baff6b7c1a2f7c1aa7cfba1fd
MD5 2b1561bee4f5fe0582ffde87381b8481
BLAKE2b-256 e340464e47b85f78d0a57ba7f7c5b45823801ce9d4527f2c6cd2c50d098958d3

See more details on using hashes here.

File details

Details for the file prefect_airbyte-0.3.2-py3-none-any.whl.

File metadata

File hashes

Hashes for prefect_airbyte-0.3.2-py3-none-any.whl
Algorithm Hash digest
SHA256 58d0140f0dbce6a508ed54d115d5d8df825456cf5886c4e50fface76ce4aa602
MD5 982a984f0e4823e567a75e836a7f9fc6
BLAKE2b-256 ee7cf412eed92a19453d2b3fb287801024d1770aa9c09efebea22d893d4e3e8a

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page