Airflow provider for RabbitMQ with sync/async messaging.
Project description
Apache Airflow Provider for RabbitMQ
Overview
The Apache Airflow Provider for RabbitMQ enables seamless integration with RabbitMQ, allowing you to build workflows that publish and consume messages from RabbitMQ queues. This provider includes custom hooks and operators to simplify interactions with RabbitMQ in your Airflow DAGs.
Features
- Publish messages to RabbitMQ exchanges/queues.
- Wait for messages in a queue using an Airflow Sensor.
- RabbitMQ connection management via Airflow Connections (URI or host/login/password/port/schema).
Installation
To install the provider, use pip:
pip install apache-airflow-provider-rabbitmq
Note: Supports Python 3.10+ and Apache Airflow 2.8.0+ (including 3.x).
Configuration
Add a RabbitMQ Connection in Airflow
- Navigate to Admin > Connections in the Airflow UI.
- Click on Create to add a new connection.
- Configure the following fields:
- Conn Id:
rabbitmq_default(or a custom ID) - Conn Type:
RabbitMQ(conn type key:rabbitmq) - Host:
<RabbitMQ server hostname or IP> - Login:
<RabbitMQ username> - Password:
<RabbitMQ password> - Port:
5672(default RabbitMQ port) - Schema:
<vhost>(optional; maps to RabbitMQ virtual host) - Extras (JSON): Optionally provide
{ "connection_uri": "amqp://user:pass@host:5672/vhost" }to override the URI.
- Conn Id:
You can now reference this connection in your DAGs using the connection ID.
Usage
Example: Publish a message (Operator)
from airflow import DAG
from datetime import datetime
from airflow.providers.rabbitmq.operators.rabbitmq_producer import RabbitMQProducerOperator
with DAG(
dag_id="example_rabbitmq_producer",
start_date=datetime(2024, 1, 1),
schedule=None,
catchup=False,
):
publish_message = RabbitMQProducerOperator(
task_id="publish_message",
message="Hello, RabbitMQ!",
exchange="amq.direct",
routing_key="example",
conn_id="rabbitmq_default",
# use_async=True,
)
Example: Wait for a message (Sensor)
from airflow import DAG
from datetime import datetime
from airflow.providers.rabbitmq.sensors.rabbitmq_sensor import RabbitMQSensor
with DAG(
dag_id="example_rabbitmq_sensor",
start_date=datetime(2024, 1, 1),
schedule=None,
catchup=False,
):
wait_for_message = RabbitMQSensor(
task_id="wait_for_message",
queue="example_queue",
conn_id="rabbitmq_default",
poke_interval=30,
timeout=10 * 60,
)
Development
Prerequisites
- Python 3.10 or later
- Apache Airflow 2.8.0 or later
- Docker (required for integration tests)
- RabbitMQ server (optional, integration tests use Docker)
Setting Up for Development
-
Clone the repository:
git clone https://github.com/mustafa-zidan/apache-airflow-providers-rabbitmq.git cd apache-airflow-providers-rabbitmq
-
Install the library in editable mode:
uv sync -
Install development dependencies:
uv sync --extras development
Running Tests
This provider uses pytest for testing. We recommend using uv to manage your environment and run tests.
Run all tests:
uv run pytest
Run unit tests only:
uv run pytest tests/unit/
Run integration tests only (requires Docker):
uv run pytest tests/integration/
Multi-version Testing with Tox
To test across multiple Python and Airflow versions locally, you can use tox directly (it is included in the development dependencies).
Python version management
Tox needs the target Python interpreters to be available on your PATH. Use pyenv (macOS/Linux) or pyenv-win (Windows) to install and manage multiple Python versions:
# Install pyenv (macOS via Homebrew)
brew install pyenv
# Install the required Python versions
pyenv install 3.10 3.11 3.12
# Make all versions available in the project directory
pyenv local 3.10 3.11 3.12
Running tox
# Run unit tests for all supported Python × Airflow combinations
uv run tox
# Run unit tests for a specific environment (Python 3.12 + Airflow 2.10)
uv run tox -e py312-airflow210
# Run integration tests for Airflow 2.10 (requires Docker)
uv run tox -e py312-airflow210-integration
# Run integration tests for Airflow 3.1 (requires Docker)
uv run tox -e py312-airflow31-integration
Unit test environments: py{310,311,312}-airflow{28,29,210,30,31}
Integration test environments (require Docker): py312-airflow{210,31}-integration
Linting, Typing and Formatting
We use several tools to maintain code quality:
# Run all checks
uv run black .
uv run isort .
uv run flake8 src/ tests/
uv run mypy src/ tests/
uv run pylint src/ tests/
Contributing
We welcome contributions to the project! To contribute:
- Fork the repository.
- Create a feature branch:
git checkout -b my-feature-branch
- Make changes and commit them:
git commit -m "Add my new feature"
- Push the branch to your fork:
git push origin my-feature-branch
- Open a pull request on the main repository.
License
This project is licensed under the Apache License 2.0.
Support
If you encounter any issues, please open an issue on GitHub.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file apache_airflow_provider_rabbitmq-2.1.0.tar.gz.
File metadata
- Download URL: apache_airflow_provider_rabbitmq-2.1.0.tar.gz
- Upload date:
- Size: 258.9 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
e2b14258cc67ec2c3cc79017a382a5f97629b5266d01cb7d3ce2237fee73ae7c
|
|
| MD5 |
8bee332af746181e70a8596dd3919023
|
|
| BLAKE2b-256 |
05a650b4bcac9b12615071b5abe7dbe2ae3310ea85805a045643354dfaa2d92d
|
Provenance
The following attestation bundles were made for apache_airflow_provider_rabbitmq-2.1.0.tar.gz:
Publisher:
release.yml on mustafa-zidan/apache-airflow-providers-rabbitmq
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
apache_airflow_provider_rabbitmq-2.1.0.tar.gz -
Subject digest:
e2b14258cc67ec2c3cc79017a382a5f97629b5266d01cb7d3ce2237fee73ae7c - Sigstore transparency entry: 975833121
- Sigstore integration time:
-
Permalink:
mustafa-zidan/apache-airflow-providers-rabbitmq@28443b4748be85dcfe91dd61edf156a0ca5457d1 -
Branch / Tag:
refs/tags/v2.1.0 - Owner: https://github.com/mustafa-zidan
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release.yml@28443b4748be85dcfe91dd61edf156a0ca5457d1 -
Trigger Event:
push
-
Statement type:
File details
Details for the file apache_airflow_provider_rabbitmq-2.1.0-py3-none-any.whl.
File metadata
- Download URL: apache_airflow_provider_rabbitmq-2.1.0-py3-none-any.whl
- Upload date:
- Size: 17.9 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
8cf793806b9ed26e14e08b894373c63c1a9f2adc10754e38a6c9dee7f42e0ee8
|
|
| MD5 |
c693e572b08410ed960ca2bc1ee18aa8
|
|
| BLAKE2b-256 |
81962a7be7518acc1f4987f8e5c7b75a440110f984399f2cf1caf58d7beb4cf9
|
Provenance
The following attestation bundles were made for apache_airflow_provider_rabbitmq-2.1.0-py3-none-any.whl:
Publisher:
release.yml on mustafa-zidan/apache-airflow-providers-rabbitmq
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
apache_airflow_provider_rabbitmq-2.1.0-py3-none-any.whl -
Subject digest:
8cf793806b9ed26e14e08b894373c63c1a9f2adc10754e38a6c9dee7f42e0ee8 - Sigstore transparency entry: 975833124
- Sigstore integration time:
-
Permalink:
mustafa-zidan/apache-airflow-providers-rabbitmq@28443b4748be85dcfe91dd61edf156a0ca5457d1 -
Branch / Tag:
refs/tags/v2.1.0 - Owner: https://github.com/mustafa-zidan
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release.yml@28443b4748be85dcfe91dd61edf156a0ca5457d1 -
Trigger Event:
push
-
Statement type: