Skip to main content

Airflow provider for RabbitMQ with sync/async messaging.

Project description

Apache Airflow Provider for RabbitMQ

PyPI version License

Overview

The Apache Airflow Provider for RabbitMQ enables seamless integration with RabbitMQ, allowing you to build workflows that publish and consume messages from RabbitMQ queues. This provider includes custom hooks and operators to simplify interactions with RabbitMQ in your Airflow DAGs.


Features

  • Publish messages to RabbitMQ queues.
  • Consume messages from RabbitMQ queues.
  • Full support for RabbitMQ connection management in Airflow.

Installation

To install the provider, use pip:

pip install apache-airflow-provider-rabbitmq

Note: This provider requires Apache Airflow 2.0 or later.


Configuration

Add a RabbitMQ Connection in Airflow

  1. Navigate to Admin > Connections in the Airflow UI.
  2. Click on Create to add a new connection.
  3. Configure the following fields:
    • Conn Id: rabbitmq_default (or a custom ID)
    • Conn Type: RabbitMQ
    • Host: <RabbitMQ server hostname or IP>
    • Login: <RabbitMQ username>
    • Password: <RabbitMQ password>
    • Port: 5672 (default RabbitMQ port)

You can now reference this connection in your DAGs using the connection ID.


Usage

Example DAG: Publish and Consume Messages

Here’s an example DAG demonstrating how to use the RabbitMQ operator:

from airflow import DAG
from datetime import datetime
from airflow.providers.rabbitmq.operators.rabbitmq import RabbitMQOperator

def process_message(message):
    print(f"Received message: {message}")

with DAG(
    dag_id="example_rabbitmq_dag",
    default_args={"start_date": datetime(2023, 1, 1)},
    schedule_interval=None,
    catchup=False,
) as dag:

    publish_task = RabbitMQOperator(
        task_id="publish_message",
        rabbitmq_conn_id="rabbitmq_default",
        queue="example_queue",
        message="Hello, RabbitMQ!",
        mode="publish",
    )

    consume_task = RabbitMQOperator(
        task_id="consume_message",
        rabbitmq_conn_id="rabbitmq_default",
        queue="example_queue",
        mode="consume",
        callback=process_message,
    )

    publish_task >> consume_task

Key Features

  • publish mode: Sends messages to a RabbitMQ queue.
  • consume mode: Consumes messages from a RabbitMQ queue and processes them using a callback.

Development

Prerequisites

  • Python 3.12 or later
  • Apache Airflow 2.3 or later
  • RabbitMQ server (local or remote)

Setting Up for Development

  1. Clone the repository:

    git clone https://github.com/mustafazidan/apache-airflow-provider-rabbitmq.git
    cd apache-airflow-provider-rabbitmq
    
  2. Install the library in editable mode:

    uv sync
    
  3. Install development dependencies:

    uv sync --extras development
    

Running Tests

This provider uses pytest for testing.

Run all tests:

pytest

Run unit tests only:

pytest tests/unit/

Run integration tests only:

pytest tests/integration/

Run tests with coverage:

pytest --cov=airflow.providers.rabbitmq

Run tests with coverage and generate HTML report:

pytest --cov=airflow.providers.rabbitmq --cov-report=html

Run a specific test file:

pytest tests/unit/hooks/test_rabbitmq_hook.py

Run a specific test:

pytest tests/unit/hooks/test_rabbitmq_hook.py::TestRabbitMQHook::test_init

For more information about testing, see the tests README.

Linting and Formatting

This project uses pylint for linting.

Run the linter:

pylint src/ tests/

Contributing

We welcome contributions to the project! To contribute:

  1. Fork the repository.
  2. Create a feature branch:
    git checkout -b my-feature-branch
    
  3. Make changes and commit them:
    git commit -m "Add my new feature"
    
  4. Push the branch to your fork:
    git push origin my-feature-branch
    
  5. Open a pull request on the main repository.

License

This project is licensed under the Apache License 2.0.


Support

If you encounter any issues, please open an issue on GitHub.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

apache_airflow_provider_rabbitmq-2.0.0.tar.gz (169.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

File details

Details for the file apache_airflow_provider_rabbitmq-2.0.0.tar.gz.

File metadata

File hashes

Hashes for apache_airflow_provider_rabbitmq-2.0.0.tar.gz
Algorithm Hash digest
SHA256 3a15ce143303fb5c6872e661d884f747ac1b333679527f9878f45d525f8ecaa1
MD5 66d009dd758dd604401b84e977e350b4
BLAKE2b-256 e3e4ea34a282cfc45e08e7092dc8949208f51273bb4f639e80427c9b9c8786e9

See more details on using hashes here.

Provenance

The following attestation bundles were made for apache_airflow_provider_rabbitmq-2.0.0.tar.gz:

Publisher: release.yml on mustafa-zidan/apache-airflow-providers-rabbitmq

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file apache_airflow_provider_rabbitmq-2.0.0-py3-none-any.whl.

File metadata

File hashes

Hashes for apache_airflow_provider_rabbitmq-2.0.0-py3-none-any.whl
Algorithm Hash digest
SHA256 c878a92408ac5dfb50c7f9832b700a302b32bf1f06ebe9f4b1129a33263bc2cc
MD5 5a89c1e6dab06210f681a72f63cedf35
BLAKE2b-256 12adb1dd07f44f574697fb68a7edc4a341854695ad3f7607b7d0b9b5578a228f

See more details on using hashes here.

Provenance

The following attestation bundles were made for apache_airflow_provider_rabbitmq-2.0.0-py3-none-any.whl:

Publisher: release.yml on mustafa-zidan/apache-airflow-providers-rabbitmq

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page