Airflow provider for RabbitMQ with sync/async messaging.
Project description
Apache Airflow Provider for RabbitMQ
Overview
The Apache Airflow Provider for RabbitMQ enables seamless integration with RabbitMQ, allowing you to build workflows that publish and consume messages from RabbitMQ queues. This provider includes custom hooks and operators to simplify interactions with RabbitMQ in your Airflow DAGs.
Features
- Publish messages to RabbitMQ queues.
- Consume messages from RabbitMQ queues.
- Full support for RabbitMQ connection management in Airflow.
Installation
To install the provider, use pip
:
pip install apache-airflow-provider-rabbitmq
Note: This provider requires Apache Airflow 2.0 or later.
Configuration
Add a RabbitMQ Connection in Airflow
- Navigate to Admin > Connections in the Airflow UI.
- Click on Create to add a new connection.
- Configure the following fields:
- Conn Id:
rabbitmq_default
(or a custom ID) - Conn Type:
RabbitMQ
- Host:
<RabbitMQ server hostname or IP>
- Login:
<RabbitMQ username>
- Password:
<RabbitMQ password>
- Port:
5672
(default RabbitMQ port)
- Conn Id:
You can now reference this connection in your DAGs using the connection ID.
Usage
Example DAG: Publish and Consume Messages
Here’s an example DAG demonstrating how to use the RabbitMQ operator:
from airflow import DAG
from datetime import datetime
from airflow.providers.rabbitmq.operators.rabbitmq import RabbitMQOperator
def process_message(message):
print(f"Received message: {message}")
with DAG(
dag_id="example_rabbitmq_dag",
default_args={"start_date": datetime(2023, 1, 1)},
schedule_interval=None,
catchup=False,
) as dag:
publish_task = RabbitMQOperator(
task_id="publish_message",
rabbitmq_conn_id="rabbitmq_default",
queue="example_queue",
message="Hello, RabbitMQ!",
mode="publish",
)
consume_task = RabbitMQOperator(
task_id="consume_message",
rabbitmq_conn_id="rabbitmq_default",
queue="example_queue",
mode="consume",
callback=process_message,
)
publish_task >> consume_task
Key Features
publish
mode: Sends messages to a RabbitMQ queue.consume
mode: Consumes messages from a RabbitMQ queue and processes them using a callback.
Development
Prerequisites
- Python 3.12 or later
- Apache Airflow 2.3 or later
- RabbitMQ server (local or remote)
Setting Up for Development
-
Clone the repository:
git clone https://github.com/mustafazidan/apache-airflow-provider-rabbitmq.git cd apache-airflow-provider-rabbitmq
-
Install the library in editable mode:
uv sync
-
Install development dependencies:
uv sync --extras development
Running Tests
This provider uses pytest
for testing.
Run all tests:
pytest
Run unit tests only:
pytest tests/unit/
Run integration tests only:
pytest tests/integration/
Run tests with coverage:
pytest --cov=airflow.providers.rabbitmq
Run tests with coverage and generate HTML report:
pytest --cov=airflow.providers.rabbitmq --cov-report=html
Run a specific test file:
pytest tests/unit/hooks/test_rabbitmq_hook.py
Run a specific test:
pytest tests/unit/hooks/test_rabbitmq_hook.py::TestRabbitMQHook::test_init
For more information about testing, see the tests README.
Linting and Formatting
This project uses pylint
for linting.
Run the linter:
pylint src/ tests/
Contributing
We welcome contributions to the project! To contribute:
- Fork the repository.
- Create a feature branch:
git checkout -b my-feature-branch
- Make changes and commit them:
git commit -m "Add my new feature"
- Push the branch to your fork:
git push origin my-feature-branch
- Open a pull request on the main repository.
License
This project is licensed under the Apache License 2.0.
Support
If you encounter any issues, please open an issue on GitHub.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file apache_airflow_provider_rabbitmq-0.0.1.tar.gz
.
File metadata
- Download URL: apache_airflow_provider_rabbitmq-0.0.1.tar.gz
- Upload date:
- Size: 103.8 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.6.9
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 |
b33823187590cd4fd74ea7c46141474fcfea0624f0d10ca201ef3d58b5b5d329
|
|
MD5 |
4eecb9b9cdd9463863eab3ba7bdd082a
|
|
BLAKE2b-256 |
901e1bdcf10cf176b418ea008db03162926c660e9a1e303ad4f9324d584b5347
|
File details
Details for the file apache_airflow_provider_rabbitmq-0.0.1-py3-none-any.whl
.
File metadata
- Download URL: apache_airflow_provider_rabbitmq-0.0.1-py3-none-any.whl
- Upload date:
- Size: 17.1 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.6.9
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 |
f1c15b61f7ee1d5e6cf81ec049d7adb46b0ede43ca1d206f23cc49558f902dc6
|
|
MD5 |
2f61c4dc482eb381221b6b7fbbfdb169
|
|
BLAKE2b-256 |
146b598f7ed7a5ecd6412198dbe8d6f4fe0f2f06603496c4db649593914a1a0b
|