Skip to main content

Airflow provider for RabbitMQ with sync/async messaging.

Project description

Apache Airflow Provider for RabbitMQ

PyPI version License

Overview

The Apache Airflow Provider for RabbitMQ enables seamless integration with RabbitMQ, allowing you to build workflows that publish and consume messages from RabbitMQ queues. This provider includes custom hooks and operators to simplify interactions with RabbitMQ in your Airflow DAGs.


Features

  • Publish messages to RabbitMQ queues.
  • Consume messages from RabbitMQ queues.
  • Full support for RabbitMQ connection management in Airflow.

Installation

To install the provider, use pip:

pip install apache-airflow-provider-rabbitmq

Note: This provider requires Apache Airflow 2.0 or later.


Configuration

Add a RabbitMQ Connection in Airflow

  1. Navigate to Admin > Connections in the Airflow UI.
  2. Click on Create to add a new connection.
  3. Configure the following fields:
    • Conn Id: rabbitmq_default (or a custom ID)
    • Conn Type: RabbitMQ
    • Host: <RabbitMQ server hostname or IP>
    • Login: <RabbitMQ username>
    • Password: <RabbitMQ password>
    • Port: 5672 (default RabbitMQ port)

You can now reference this connection in your DAGs using the connection ID.


Usage

Example DAG: Publish and Consume Messages

Here’s an example DAG demonstrating how to use the RabbitMQ operator:

from airflow import DAG
from datetime import datetime
from airflow.providers.rabbitmq.operators.rabbitmq import RabbitMQOperator

def process_message(message):
    print(f"Received message: {message}")

with DAG(
    dag_id="example_rabbitmq_dag",
    default_args={"start_date": datetime(2023, 1, 1)},
    schedule_interval=None,
    catchup=False,
) as dag:

    publish_task = RabbitMQOperator(
        task_id="publish_message",
        rabbitmq_conn_id="rabbitmq_default",
        queue="example_queue",
        message="Hello, RabbitMQ!",
        mode="publish",
    )

    consume_task = RabbitMQOperator(
        task_id="consume_message",
        rabbitmq_conn_id="rabbitmq_default",
        queue="example_queue",
        mode="consume",
        callback=process_message,
    )

    publish_task >> consume_task

Key Features

  • publish mode: Sends messages to a RabbitMQ queue.
  • consume mode: Consumes messages from a RabbitMQ queue and processes them using a callback.

Development

Prerequisites

  • Python 3.12 or later
  • Apache Airflow 2.3 or later
  • RabbitMQ server (local or remote)

Setting Up for Development

  1. Clone the repository:

    git clone https://github.com/mustafazidan/apache-airflow-provider-rabbitmq.git
    cd apache-airflow-provider-rabbitmq
    
  2. Install the library in editable mode:

    uv sync
    
  3. Install development dependencies:

    uv sync --extras development
    

Running Tests

This provider uses pytest for testing.

Run all tests:

pytest

Run unit tests only:

pytest tests/unit/

Run integration tests only:

pytest tests/integration/

Run tests with coverage:

pytest --cov=airflow.providers.rabbitmq

Run tests with coverage and generate HTML report:

pytest --cov=airflow.providers.rabbitmq --cov-report=html

Run a specific test file:

pytest tests/unit/hooks/test_rabbitmq_hook.py

Run a specific test:

pytest tests/unit/hooks/test_rabbitmq_hook.py::TestRabbitMQHook::test_init

For more information about testing, see the tests README.

Linting and Formatting

This project uses pylint for linting.

Run the linter:

pylint src/ tests/

Contributing

We welcome contributions to the project! To contribute:

  1. Fork the repository.
  2. Create a feature branch:
    git checkout -b my-feature-branch
    
  3. Make changes and commit them:
    git commit -m "Add my new feature"
    
  4. Push the branch to your fork:
    git push origin my-feature-branch
    
  5. Open a pull request on the main repository.

License

This project is licensed under the Apache License 2.0.


Support

If you encounter any issues, please open an issue on GitHub.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

apache_airflow_provider_rabbitmq-0.0.1.tar.gz (103.8 kB view details)

Uploaded Source

Built Distribution

File details

Details for the file apache_airflow_provider_rabbitmq-0.0.1.tar.gz.

File metadata

File hashes

Hashes for apache_airflow_provider_rabbitmq-0.0.1.tar.gz
Algorithm Hash digest
SHA256 b33823187590cd4fd74ea7c46141474fcfea0624f0d10ca201ef3d58b5b5d329
MD5 4eecb9b9cdd9463863eab3ba7bdd082a
BLAKE2b-256 901e1bdcf10cf176b418ea008db03162926c660e9a1e303ad4f9324d584b5347

See more details on using hashes here.

File details

Details for the file apache_airflow_provider_rabbitmq-0.0.1-py3-none-any.whl.

File metadata

File hashes

Hashes for apache_airflow_provider_rabbitmq-0.0.1-py3-none-any.whl
Algorithm Hash digest
SHA256 f1c15b61f7ee1d5e6cf81ec049d7adb46b0ede43ca1d206f23cc49558f902dc6
MD5 2f61c4dc482eb381221b6b7fbbfdb169
BLAKE2b-256 146b598f7ed7a5ecd6412198dbe8d6f4fe0f2f06603496c4db649593914a1a0b

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page