A queue framework
Project description
Romeways
This project has as goal help developers to not reimplemented default queue consumer behaviour.
Basics
Romeways works with two basic concepts queue handler and queue connector. The queue connector is a queue consumer and can be spawned in a separate process or in async worker. The queue handler is the callback function that will be called for each retrieved message.
Here you can see all implemented consumer:
| Queue Type | Install using extra | description |
|---|---|---|
| multiprocessing.Queue | memory | here |
| Apache Kafka | kafka | here |
How to install extra packages?
poetry add romeways -E memory
OR
pip install 'romeways[memory]'
Configuration
Queue connector config
The queue connector config is all configurations that you need to be able to retrieve messages from the queue.
Bellow are the romeways.GenericConnectorConfig implementation. This class can be inheritance to allow extra configurations.
Params:
connector_name: strFor what connector this queue must be delivered
from dataclasses import dataclass
@dataclass(slots=True, frozen=True)
class GenericConnectorConfig:
"""
connector_name: str Connector name
"""
connector_name: str
Queue handler config
When you register a queue consumer you are setting configs and a callback handler for each message that this queue receives.
Bellow are the romeways.GenericQueueConfig implementation. This class can be inheritance to allow extra configurations.
Params:
connector_name: strFor what connector this queue must be deliveredfrequency: floatTime in seconds for retrieve messages from queuemax_chunk_size: intMax quantity for messages that one retrieve will getsequential: boolIf the handler call must be sequential or in asyncio.gather
from dataclasses import dataclass
@dataclass(slots=True, frozen=True)
class GenericQueueConfig:
"""
connector_name: str For what connector this queue must be delivered
frequency: float Time in seconds for retrieve messages from queue
max_chunk_size: int Max quantity for messages that one retrieve will get
sequential: bool If the handler call must be sequential or in asyncio.gather
"""
connector_name: str
frequency: float
max_chunk_size: int
sequential: bool
Resend on error
Romeways allow you to resend the message to the queue if something in your handler do not perform correctly. For that your code need tho raise the romeways.ResendException exception, the message will be resent to the same queue and the romeways.Message.rw_resend_times parameter will be raized
Spawn a process
Romeways can run each connector in a separate process or in async workers for that use the parameter spawn_process to configure that.
Example
For this example we are using the extra package memory
from multiprocessing import Queue
import romeways
# Config the connector
queue = Queue()
# Create a queue config
config_q = romeways.MemoryQueueConfig(
connector_name="memory-dev1",
queue=queue
)
# Register a controller/consumer for the queue name
@romeways.queue_consumer(queue_name="queue.payment.done", config=config_q)
async def controller(message: romeways.Message):
print(message)
config_p = romeways.MemoryConnectorConfig(connector_name="memory-dev1")
# Register a connector
romeways.connector_register(
connector=romeways.MemoryQueueConnector, config=config_p, spawn_process=True
)
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file rome_ways-0.2.1.tar.gz.
File metadata
- Download URL: rome_ways-0.2.1.tar.gz
- Upload date:
- Size: 11.1 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/2.1.2 CPython/3.13.3 Linux/6.1.0-38-amd64
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
3074348680b14a0b6acf577bc255105140261772b98e2eecc3de0e5fa0599960
|
|
| MD5 |
f71572ef8a55153da341ef8bc36b38d5
|
|
| BLAKE2b-256 |
599a555b633eae0940b71184b98e045af54a4a583628c3cdf0cc20295af2c7fd
|
File details
Details for the file rome_ways-0.2.1-py3-none-any.whl.
File metadata
- Download URL: rome_ways-0.2.1-py3-none-any.whl
- Upload date:
- Size: 21.1 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/2.1.2 CPython/3.13.3 Linux/6.1.0-38-amd64
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
7c622b599fc0d25f9fe74856cca8e24b1556837e1624310dececfd8f5344caac
|
|
| MD5 |
d607f74c4d87dbd45db58f4c36ade3db
|
|
| BLAKE2b-256 |
588a7a9cfd4fe53db48d6bb2a38154a3568e4376725d66e202bce3c97351e623
|