Skip to main content

A queue framework

Project description

Romeways

by: CenturyBoys

This project has as goal help developers to not reimplemented default queue consumer behaviour.

Basics

Romeways works with two basic concepts queue handler and queue connector. The queue connector is a queue consumer and can be spawned in a separate process or in async worker. The queue handler is the callback function that will be called for each retrieved message.

Here you can see all implemented consumer:

Queue Type Install using extra description
multiprocessing.Queue memory here
Apache Kafka kafka here

How to install extra packages?

poetry add romeways -E memory
OR
pip install 'romeways[memory]'

Configuration

Queue connector config

The queue connector config is all configurations that you need to be able to retrieve messages from the queue.

Bellow are the romeways.GenericConnectorConfig implementation. This class can be inheritance to allow extra configurations.

Params:

  • connector_name: str For what connector this queue must be delivered
from dataclasses import dataclass


@dataclass(slots=True, frozen=True)
class GenericConnectorConfig:
    """
    connector_name: str Connector name
    """
    connector_name: str

Queue handler config

When you register a queue consumer you are setting configs and a callback handler for each message that this queue receives.

Bellow are the romeways.GenericQueueConfig implementation. This class can be inheritance to allow extra configurations.

Params:

  • connector_name: str For what connector this queue must be delivered
  • frequency: float Time in seconds for retrieve messages from queue
  • max_chunk_size: int Max quantity for messages that one retrieve will get
  • sequential: bool If the handler call must be sequential or in asyncio.gather
from dataclasses import dataclass


@dataclass(slots=True, frozen=True)
class GenericQueueConfig:
    """
    connector_name: str For what connector this queue must be delivered
    frequency: float Time in seconds for retrieve messages from queue
    max_chunk_size: int Max quantity for messages that one retrieve will get
    sequential: bool If the handler call must be sequential or in asyncio.gather
    """
    connector_name: str
    frequency: float
    max_chunk_size: int
    sequential: bool

Resend on error

Romeways allow you to resend the message to the queue if something in your handler do not perform correctly. For that your code need tho raise the romeways.ResendException exception, the message will be resent to the same queue and the romeways.Message.rw_resend_times parameter will be raized

Spawn a process

Romeways can run each connector in a separate process or in async workers for that use the parameter spawn_process to configure that.

Example

For this example we are using the extra package memory

from multiprocessing import Queue

import romeways

# Config the connector
queue = Queue()

# Create a queue config
config_q = romeways.MemoryQueueConfig(
    connector_name="memory-dev1", 
    queue=queue
)

# Register a controller/consumer for the queue name
@romeways.queue_consumer(queue_name="queue.payment.done", config=config_q)
async def controller(message: romeways.Message):
    print(message)

config_p = romeways.MemoryConnectorConfig(connector_name="memory-dev1")

# Register a connector
romeways.connector_register(
    connector=romeways.MemoryQueueConnector, config=config_p, spawn_process=True
)

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

rome_ways-0.2.0.tar.gz (12.4 kB view details)

Uploaded Source

Built Distribution

rome_ways-0.2.0-py3-none-any.whl (21.1 kB view details)

Uploaded Python 3

File details

Details for the file rome_ways-0.2.0.tar.gz.

File metadata

  • Download URL: rome_ways-0.2.0.tar.gz
  • Upload date:
  • Size: 12.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.7.1 CPython/3.12.0 Linux/6.1.0-13-amd64

File hashes

Hashes for rome_ways-0.2.0.tar.gz
Algorithm Hash digest
SHA256 15c196a1769e3e036d24b5fddeb5234124a0025b13683200bebe52c21ad7646c
MD5 6f077202be95c05425425e74cf234b9d
BLAKE2b-256 8bbba85bfa01ddefbd48c0c4196f6e9996a17ba26b8f8dfe0f1a644c8b5b0502

See more details on using hashes here.

File details

Details for the file rome_ways-0.2.0-py3-none-any.whl.

File metadata

  • Download URL: rome_ways-0.2.0-py3-none-any.whl
  • Upload date:
  • Size: 21.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.7.1 CPython/3.12.0 Linux/6.1.0-13-amd64

File hashes

Hashes for rome_ways-0.2.0-py3-none-any.whl
Algorithm Hash digest
SHA256 c7f89bc7c9d4e32001a2f20ccd57a121626950205c8123942197791dea522c5b
MD5 04fdf8a42b30002d50f1733581cc9ec1
BLAKE2b-256 fb006daee2b4bf585c1b9179605a08482e6731ba625abeeeac9eb9e342645917

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page