No project description provided
Project description
Weni EDA
weni-eda is a Python library designed to simplify the use of Event-Driven Architecture (EDA). It provides an interface that seamlessly integrates with the Django framework and RabbitMQ messaging service. The design is scalable and intended to support various integrations in the future.
Features
- Easy integration with Django.
- Support for RabbitMQ.
- Simplified event handling and message dispatching.
- Scalable design to accommodate future integrations with other frameworks and messaging services.
- Two backends available:
eda(using amqp library) andpika_eda(using pika library with SSL support). - Automatic message acknowledgment: Messages are automatically acknowledged when processing completes successfully.
- Built-in JSON parsing: Access parsed message body via
self.bodyproperty with automatic caching. - Structured logging: Built-in logger per consumer with contextual information.
Installation
To install the library, use pip:
pip install weni-eda
Configuration
Django Integration
-
Add weni-eda to your Django project:
Addweni.eda.django.eda_appto yourINSTALLED_APPSinsettings.py:# settings.py INSTALLED_APPS = [ # ... other installed apps 'weni.eda.django.eda_app', ]
-
Environment Variables for weni-eda Configuration
The following environment variables are used to configure the weni-eda library. Here is a detailed explanation of each variable:
Variable Name Examples Description EDA_CONSUMERS_HANDLE"example.event_driven.handle.handle_consumers"Specifies the handler module for consumer events. EDA_BROKER_HOST"localhost"The hostname or IP address of the message broker server. EDA_VIRTUAL_HOST"/"The virtual host to use when connecting to the broker. EDA_BROKER_PORT5672The port number on which the message broker is listening. EDA_BROKER_USER"guest"The username for authenticating with the message broker. EDA_BROKER_PASSWORD"guest"The password for authenticating with the message broker. -
Creating your event consumers
We provide an abstract class that facilitates the consumption of messages. To use it, you need to inherit it and declare theconsumemethod as follows:from weni.eda.django.consumers import EDAConsumer import amqp class ExampleConsumer(EDAConsumer): def consume(self, message: amqp.Message): # Access parsed body directly via self.body (automatically parsed and cached) data = self.body # Process your message user_id = data.get("user_id") action = data.get("action") # Log using the built-in logger self.logger.info(f"Processing action {action} for user {user_id}") # Message is automatically acknowledged if consume() completes without exception # You can also manually call self.ack() if needed
Key Features:
self.body: Automatically parses and caches the JSON message body as adict. No need to manually parse!self.logger: Built-in logger instance (eda.{ConsumerName}) for structured logging.self.message_body: Access the raw message body as bytes.self.delivery_tag: Access the message delivery tag.- Auto-ack: Messages are automatically acknowledged when
consume()completes successfully. If an exception is raised, the message is automatically rejected. self.ack(): Manually acknowledge the message (usually not needed due to auto-ack).
-
Registering your event handlers:
theEDA_CONSUMERS_HANDLEvariable indicates the function that will be called when the consumer starts. this function will be responsible for mapping the messages to their respective consumers. The function must be declared as follows:import amqp from .example_consumer import ExampleConsumer def handle_consumers(channel: amqp.Channel): channel.basic_consume("example-queue", callback=ExampleConsumer().handle)
This indicates that any message arriving at the
example-queuequeue will be dispatched to theExampleConsumerconsumer and will fall into itsconsumemethod. -
Starting to consume the queues
To start consuming messages from the queue, you need to run theedaconsumecommand as follows:python manage.py edaconsume
From then on, all messages that arrive in the queues where your application is written will be dispatched to their respective consumers.
Pika EDA (SSL Support)
pika_eda is an alternative backend that uses the pika library instead of amqp. It provides the same functionality as eda but with additional features:
- SSL/TLS support: Secure connections with automatic SSL certificate verification
- Multiple handles: Support for multiple consumer handlers in a single application
- Enhanced security: Proper hostname verification to prevent MITM attacks
Configuration
-
Add pika_eda to your Django project:
Addweni.pika_eda.django.pika_eda_appto yourINSTALLED_APPSinsettings.py:# settings.py INSTALLED_APPS = [ # ... other installed apps 'weni.pika_eda.django.pika_eda_app', ]
-
Environment Variables for pika_eda Configuration
The following environment variables are used to configure the pika_eda library:
Variable Name Examples Description PIKA_EDA_CONSUMERS_HANDLE"example.event_driven.handle.handle_consumers"or["handle1", "handle2"]Specifies the handler module(s) for consumer events. Can be a string or a list of strings for multiple handles. PIKA_EDA_BROKER_HOST"localhost"The hostname or IP address of the message broker server. PIKA_EDA_VIRTUAL_HOST"/"The virtual host to use when connecting to the broker. PIKA_EDA_BROKER_PORT5671(SSL) or5672(non-SSL)The port number on which the message broker is listening. PIKA_EDA_BROKER_USER"guest"The username for authenticating with the message broker. PIKA_EDA_BROKER_PASSWORD"guest"The password for authenticating with the message broker. PIKA_EDA_SSL_ENABLEDTrueEnable SSL/TLS connections (optional). PIKA_EDA_SSL_CERT_PATH"/path/to/cert.pem"Path to client certificate file (optional, for mutual auth). PIKA_EDA_SSL_KEY_PATH"/path/to/key.pem"Path to client private key file (optional, for mutual auth). PIKA_EDA_SSL_CA_CERTS"/path/to/ca.pem"Path to CA certificate file (optional). PIKA_EDA_SSL_SERVER_HOSTNAME"rabbitmq.example.com"Server hostname for SSL verification (optional, defaults to PIKA_EDA_BROKER_HOST). -
Creating your event consumers
We provide an abstract class that facilitates the consumption of messages. To use it, you need to inherit it and declare theconsumemethod as follows:from weni.pika_eda.django.consumers import PikaEDAConsumer import pika class ExampleConsumer(PikaEDAConsumer): def consume(self, channel: pika.channel.Channel, method: pika.spec.Basic.Deliver, properties: pika.spec.BasicProperties, body: bytes): # Access parsed body directly via self.body (automatically parsed and cached) data = self.body # Process your message user_id = data.get("user_id") action = data.get("action") # Log using the built-in logger self.logger.info(f"Processing action {action} for user {user_id}") # Message is automatically acknowledged if consume() completes without exception # You can also manually call self.ack() if needed
Key Features:
self.body: Automatically parses and caches the JSON message body as adict. No need to manually parse!self.logger: Built-in logger instance (pika_eda.{ConsumerName}) for structured logging.self.message_body: Access the raw message body as bytes.self.delivery_tag: Access the message delivery tag.- Auto-ack: Messages are automatically acknowledged when
consume()completes successfully. If an exception is raised, the message is automatically rejected. self.ack(): Manually acknowledge the message (usually not needed due to auto-ack).
-
Registering your event handlers:
ThePIKA_EDA_CONSUMERS_HANDLEvariable indicates the function(s) that will be called when the consumer starts. This function will be responsible for mapping the messages to their respective consumers. The function must be declared as follows:import pika from .example_consumer import ExampleConsumer def handle_consumers(channel: pika.channel.Channel): channel.basic_consume("example-queue", on_message_callback=ExampleConsumer().handle)
This indicates that any message arriving at the
example-queuequeue will be dispatched to theExampleConsumerconsumer and will fall into itsconsumemethod.Multiple Handles: If you need multiple consumer handlers, you can set
PIKA_EDA_CONSUMERS_HANDLEas a list:# settings.py PIKA_EDA_CONSUMERS_HANDLE = [ "app1.handlers.handle_consumers", "app2.handlers.handle_consumers", ]
Then specify which handle to use when running the command:
python manage.py pikaconsume --handle-index 0 # Uses first handle python manage.py pikaconsume --handle-index 1 # Uses second handle
-
Starting to consume the queues
To start consuming messages from the queue, you need to run thepikaconsumecommand as follows:python manage.py pikaconsume
If using multiple handles, specify the handle index:
python manage.py pikaconsume --handle-index 0
From then on, all messages that arrive in the queues where your application is written will be dispatched to their respective consumers.
SSL Configuration Example
For secure connections with SSL:
# settings.py
PIKA_EDA_BROKER_HOST = "rabbitmq.example.com"
PIKA_EDA_BROKER_PORT = 5671 # SSL port
PIKA_EDA_BROKER_USER = "myuser"
PIKA_EDA_BROKER_PASSWORD = "mypassword"
PIKA_EDA_SSL_ENABLED = True
PIKA_EDA_SSL_CA_CERTS = "/path/to/ca_certificate.pem"
PIKA_EDA_SSL_SERVER_HOSTNAME = "rabbitmq.example.com" # Optional, defaults to PIKA_EDA_BROKER_HOST
Security Note: The library automatically uses the broker hostname for SSL verification if PIKA_EDA_SSL_SERVER_HOSTNAME is not explicitly set, ensuring proper certificate verification and preventing MITM attacks.
Advanced Features
Automatic Message Acknowledgment
By default, messages are automatically acknowledged when the consume() method completes successfully. If an exception is raised during processing, the message is automatically rejected (not requeued).
Example:
class MyConsumer(EDAConsumer):
def consume(self, message: amqp.Message):
data = self.body
# Process message...
# If this completes without exception, message is auto-acknowledged
# If an exception is raised, message is auto-rejected
To disable auto-ack for a specific consumer, you can set _auto_ack_enabled = False in your consumer class:
class ManualAckConsumer(EDAConsumer):
_auto_ack_enabled = False
def consume(self, message: amqp.Message):
# Process message...
# Manually control when to ack
if some_condition:
self.ack()
Structured Logging
Each consumer has a built-in logger that provides structured logging with contextual information:
class MyConsumer(EDAConsumer):
def consume(self, message: amqp.Message):
# Use different log levels
self.logger.debug("Debug information")
self.logger.info("Processing message")
self.logger.warning("Warning message")
self.logger.error("Error occurred")
# Logger name: eda.MyConsumer (or pika_eda.MyConsumer for PikaEDA)
When an exception occurs, the library automatically logs it with full context:
- Consumer name
- Error message
- Message body (decoded)
- Full stack trace
Message Body Parsing
The self.body property automatically parses JSON messages and caches the result:
class MyConsumer(EDAConsumer):
def consume(self, message: amqp.Message):
# First access parses and caches
data = self.body # Returns dict
# Subsequent accesses use cached value (no re-parsing)
user_id = self.body.get("user_id")
action = self.body.get("action")
# Access raw body if needed
raw_body = self.message_body # Returns bytes
Error Handling
The library handles errors automatically:
class MyConsumer(EDAConsumer):
def consume(self, message: amqp.Message):
# If an exception is raised here, the message is automatically:
# 1. Rejected (not requeued)
# 2. Logged with full context
# 3. Exception is re-raised (you can catch it if needed)
data = self.body
if not data.get("required_field"):
raise ValueError("Missing required field") # Auto-rejected and logged
Consumer Properties
Available properties on both EDAConsumer and PikaEDAConsumer:
self.body: Parsed message body asdict(cached)self.message_body: Raw message body asbytesself.delivery_tag: Message delivery tagself.logger: Logger instance for structured loggingself.ack(): Manually acknowledge the message
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file weni_eda-0.1.2a1.tar.gz.
File metadata
- Download URL: weni_eda-0.1.2a1.tar.gz
- Upload date:
- Size: 13.4 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/2.2.1 CPython/3.14.0 Darwin/25.1.0
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
f39dc33a6d1eeba18ecf6684525d74856b87ce822ba5be3eac0a368430c9a44e
|
|
| MD5 |
de858300366377d51035b1cd6a837dd1
|
|
| BLAKE2b-256 |
6ebcb917f1f25eda7f13b11dbe464c15db20497f3cfafa1983895beccee5183f
|
File details
Details for the file weni_eda-0.1.2a1-py3-none-any.whl.
File metadata
- Download URL: weni_eda-0.1.2a1-py3-none-any.whl
- Upload date:
- Size: 22.0 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/2.2.1 CPython/3.14.0 Darwin/25.1.0
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
9dc293dce8d49c431ddeed895c4bf21a1c61bf9d2a7f56fc42ccc0476825a734
|
|
| MD5 |
5db246475f64e7d8db286cf5ee2b8fca
|
|
| BLAKE2b-256 |
afa977ab2a46d8d71e223f48ba76862f5e8fa8317baa2150a5f4a9f539668177
|