Skip to main content

Airflow provider for RabbitMQ with sync/async messaging.

Project description

Apache Airflow Provider for RabbitMQ

PyPI version License

Overview

The Apache Airflow Provider for RabbitMQ enables seamless integration with RabbitMQ, allowing you to build workflows that publish and consume messages from RabbitMQ queues. This provider includes custom hooks and operators to simplify interactions with RabbitMQ in your Airflow DAGs.


Features

  • Publish messages to RabbitMQ exchanges/queues.
  • Wait for messages in a queue using an Airflow Sensor.
  • RabbitMQ connection management via Airflow Connections (URI or host/login/password/port/schema).

Installation

To install the provider, use pip:

pip install apache-airflow-provider-rabbitmq

Note: Supports Python 3.10+ and Apache Airflow 2.8.0+ (including 3.x).


Configuration

Add a RabbitMQ Connection in Airflow

  1. Navigate to Admin > Connections in the Airflow UI.
  2. Click on Create to add a new connection.
  3. Configure the following fields:
    • Conn Id: rabbitmq_default (or a custom ID)
    • Conn Type: RabbitMQ (conn type key: rabbitmq)
    • Host: <RabbitMQ server hostname or IP>
    • Login: <RabbitMQ username>
    • Password: <RabbitMQ password>
    • Port: 5672 (default RabbitMQ port)
    • Schema: <vhost> (optional; maps to RabbitMQ virtual host)
    • Extras (JSON): Optionally provide { "connection_uri": "amqp://user:pass@host:5672/vhost" } to override the URI.

You can now reference this connection in your DAGs using the connection ID.


Usage

Example: Publish a message (Operator)

from airflow import DAG
from datetime import datetime
from airflow.providers.rabbitmq.operators.rabbitmq_producer import RabbitMQProducerOperator

with DAG(
    dag_id="example_rabbitmq_producer",
    start_date=datetime(2024, 1, 1),
    schedule=None,
    catchup=False,
):
    publish_message = RabbitMQProducerOperator(
        task_id="publish_message",
        message="Hello, RabbitMQ!",
        exchange="amq.direct",
        routing_key="example",
        conn_id="rabbitmq_default",
        # use_async=True,
    )

Example: Wait for a message (Sensor)

from airflow import DAG
from datetime import datetime
from airflow.providers.rabbitmq.sensors.rabbitmq_sensor import RabbitMQSensor

with DAG(
    dag_id="example_rabbitmq_sensor",
    start_date=datetime(2024, 1, 1),
    schedule=None,
    catchup=False,
):
    wait_for_message = RabbitMQSensor(
        task_id="wait_for_message",
        queue="example_queue",
        conn_id="rabbitmq_default",
        poke_interval=30,
        timeout=10 * 60,
    )

Development

Prerequisites

  • Python 3.10 or later
  • Apache Airflow 2.8.0 or later
  • Docker (required for integration tests)
  • RabbitMQ server (optional, integration tests use Docker)

Setting Up for Development

  1. Clone the repository:

    git clone https://github.com/mustafa-zidan/apache-airflow-providers-rabbitmq.git
    cd apache-airflow-providers-rabbitmq
    
  2. Install the library in editable mode:

    uv sync
    
  3. Install development dependencies:

    uv sync --extras development
    

Running Tests

This provider uses pytest for testing. We recommend using uv to manage your environment and run tests.

Run all tests:

uv run pytest

Run unit tests only:

uv run pytest tests/unit/

Run integration tests only (requires Docker):

uv run pytest tests/integration/

Multi-version Testing with Tox

To test across multiple Python and Airflow versions locally, you can use tox directly (it is included in the development dependencies).

Python version management

Tox needs the target Python interpreters to be available on your PATH. Use pyenv (macOS/Linux) or pyenv-win (Windows) to install and manage multiple Python versions:

# Install pyenv (macOS via Homebrew)
brew install pyenv

# Install the required Python versions
pyenv install 3.10 3.11 3.12

# Make all versions available in the project directory
pyenv local 3.10 3.11 3.12

Running tox

# Run unit tests for all supported Python × Airflow combinations
uv run tox

# Run unit tests for a specific environment (Python 3.12 + Airflow 2.10)
uv run tox -e py312-airflow210

# Run integration tests for Airflow 2.10 (requires Docker)
uv run tox -e py312-airflow210-integration

# Run integration tests for Airflow 3.1 (requires Docker)
uv run tox -e py312-airflow31-integration

Unit test environments: py{310,311,312}-airflow{28,29,210,30,31}

Integration test environments (require Docker): py312-airflow{210,31}-integration

Linting, Typing and Formatting

We use several tools to maintain code quality:

# Run all checks
uv run black .
uv run isort .
uv run flake8 src/ tests/
uv run mypy src/ tests/
uv run pylint src/ tests/

Contributing

We welcome contributions to the project! To contribute:

  1. Fork the repository.
  2. Create a feature branch:
    git checkout -b my-feature-branch
    
  3. Make changes and commit them:
    git commit -m "Add my new feature"
    
  4. Push the branch to your fork:
    git push origin my-feature-branch
    
  5. Open a pull request on the main repository.

License

This project is licensed under the Apache License 2.0.


Support

If you encounter any issues, please open an issue on GitHub.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

apache_airflow_provider_rabbitmq-2.1.0.tar.gz (258.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

File details

Details for the file apache_airflow_provider_rabbitmq-2.1.0.tar.gz.

File metadata

File hashes

Hashes for apache_airflow_provider_rabbitmq-2.1.0.tar.gz
Algorithm Hash digest
SHA256 e2b14258cc67ec2c3cc79017a382a5f97629b5266d01cb7d3ce2237fee73ae7c
MD5 8bee332af746181e70a8596dd3919023
BLAKE2b-256 05a650b4bcac9b12615071b5abe7dbe2ae3310ea85805a045643354dfaa2d92d

See more details on using hashes here.

Provenance

The following attestation bundles were made for apache_airflow_provider_rabbitmq-2.1.0.tar.gz:

Publisher: release.yml on mustafa-zidan/apache-airflow-providers-rabbitmq

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file apache_airflow_provider_rabbitmq-2.1.0-py3-none-any.whl.

File metadata

File hashes

Hashes for apache_airflow_provider_rabbitmq-2.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 8cf793806b9ed26e14e08b894373c63c1a9f2adc10754e38a6c9dee7f42e0ee8
MD5 c693e572b08410ed960ca2bc1ee18aa8
BLAKE2b-256 81962a7be7518acc1f4987f8e5c7b75a440110f984399f2cf1caf58d7beb4cf9

See more details on using hashes here.

Provenance

The following attestation bundles were made for apache_airflow_provider_rabbitmq-2.1.0-py3-none-any.whl:

Publisher: release.yml on mustafa-zidan/apache-airflow-providers-rabbitmq

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page