Skip to main content

This module provides a framework for handling and processing asynchronous tasks through message queues.

Project description

Message Queue Tasks

This module provides a framework for handling and processing asynchronous tasks through message queues.


Overview

  • Utilizes the aio_pika library for AMQP message processing.
  • Offers easy-to-use decorators and methods for defining tasks, producing and consuming them.
  • Built with asynchronous patterns to allow seamless scalability.

Requirements

  • Python 3.11+
  • aio_pika library
  • RabbitMQ or other AMQP broker

Usage

1. Defining a Task

To define a task, instantiate the MqTasks object and use the task decorator:

import mqtasks

mq_tasks = MqTasks(amqp_connection="amqp://localhost", queue_name="my_queue")

@mq_tasks.task(name="my_task")
def my_task_function(ctx: MqTaskContext):
    # Process task
    pass

mq_tasks.run()

2. Sending a Task

Instantiate MqTasksClient and use it to send a task:

import asyncio
import mqtasks

client = MqTasksClient(
    loop=asyncio.get_event_loop(),
    amqp_connection="amqp://localhost"
)

# define channel
channel = await client.queue(queue_name="my_queue")

#
result = await channel.request_task_async(task_name="my_task", body={"message": "hello world"})

Examples:

How to start example:

  • Work dir: ./example
    • cd ./example
  • Start RabbitMQ
    1. docker pull rabbitmq:management
    2. docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:management
  • Access the RabbitMQ management web UI:
    • http://localhost:15672/
  • By default, the credentials are:
    • Username: guest
    • Password: guest
  • Server
    • python example_server.py
  • Client
    • python example_client.py

Release

make a release/x.x.x branch, up the version and commit

change the minor version 0.X.0

./scripts/release.sh minor

change the patch version 0.0.X

./scripts/release.sh patch

merge the release/x.x.x branch into the master and the develop branch

./scripts/merge.sh

License

MIT License

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

mqtasks-0.2.3.tar.gz (12.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

mqtasks-0.2.3-py3-none-any.whl (16.4 kB view details)

Uploaded Python 3

File details

Details for the file mqtasks-0.2.3.tar.gz.

File metadata

  • Download URL: mqtasks-0.2.3.tar.gz
  • Upload date:
  • Size: 12.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.12.8

File hashes

Hashes for mqtasks-0.2.3.tar.gz
Algorithm Hash digest
SHA256 46143d8270d18cd64ed19747b27739a7cbf5f81cef655b274c71d22ac97524a1
MD5 4a15b0fc4367ee7e9528856d1887048e
BLAKE2b-256 4ad23835eb7942fc2ff9dda41d404222c85164294721133983f8447ba0be7661

See more details on using hashes here.

File details

Details for the file mqtasks-0.2.3-py3-none-any.whl.

File metadata

  • Download URL: mqtasks-0.2.3-py3-none-any.whl
  • Upload date:
  • Size: 16.4 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.12.8

File hashes

Hashes for mqtasks-0.2.3-py3-none-any.whl
Algorithm Hash digest
SHA256 1c89e9c2ef7c137408d6d140e09cc1e63ab7a628ce6fbfbeb17b31845413fe0b
MD5 8c98829dfd458976e060df7ed1b5a328
BLAKE2b-256 6be9fe054df3e723971060036b9d699ec9681412c29114a7ccd880717f0761fa

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page