Skip to main content

A powerful async task processing kit based on RabbitMQ with Coroutine, Thread, and Process support.

Project description

async-task-kit

A powerful async task processing kit based on RabbitMQ with Coroutine, Thread, and Process support.

Installation

pip install async-task-kit

Features

  • RabbitMQ Client: Robust connection pooling and delay/dead-letter queue support.
  • Multiple Consumer Models: Support for Coroutine (asyncio), Thread, and Process consumers depending on your workload (I/O-bound vs CPU-bound).
  • Extensible Processor: Easily define your task logic by inheriting TaskProcessor.
  • Built-in Logger & EnvLoader: Useful utilities for production-ready applications.

Quick Start

1. Configuration (.env)

Use the built-in EnvLoader to manage your environment variables. Create a .env file:

RABBITMQ_URL=amqp://guest:guest@localhost/
TASK_IDS=demo_task

# You can also configure specific task settings using the {TASK_ID}_ prefix
DEMO_TASK_QUEUE_NAME=my_demo_queue
DEMO_TASK_CONCURRENCY=3

2. Define your Task Processor (demo_processor.py)

import logging
from async_task_kit import TaskProcessor

logger = logging.getLogger(__name__)

class DemoProcessor(TaskProcessor):
    async def process(self, task: dict):
        logger.info(f"Processing task: {task}")
        # Return any truthy value (e.g., dict, object, True) for success and pass to callback. 
        # Return None or False to trigger retry.
        return {"status": "ok", "processed_data": task}

    async def callback(self, task: dict, result: any):
        logger.info(f"Task completed with result: {result}")

3. Main Consumer Application (main.py)

A production-ready setup with signal handling for graceful shutdown.

import asyncio
import logging
import signal
from typing import List, Type

from demo_processor import DemoProcessor

# -------------- 只需要改这一行来切换并发模型 --------------
from async_task_kit import CoroutineConsumer as Consumer
# from async_task_kit import ThreadConsumer as Consumer
# from async_task_kit import ProcessConsumer as Consumer
# --------------------------------------------------------

from async_task_kit import TaskProcessor, EnvLoader, setup_logger

# Initialize logger
setup_logger()
logger = logging.getLogger(__name__)

# Register your processors
TASK_REGISTRY: dict[str, Type[TaskProcessor]] = {
    "demo_task": DemoProcessor,
}

consumers: List[Consumer] = []

async def run_all_consumers(amqp_url: str, task_ids: List[str]):
    tasks = []
    for task_id in task_ids:
        if task_id not in TASK_REGISTRY:
            continue

        processor_cls = TASK_REGISTRY[task_id]
        processor = processor_cls(task_id=task_id)

        consumer = Consumer(
            amqp_url=amqp_url,
            queue_name=processor.queue_name,
            processor=processor,
            concurrency=processor.concurrency,
        )
        consumers.append(consumer)
        tasks.append(consumer.start())

        logger.info(f"🚀 启动任务 [{task_id}] | queue={processor.queue_name} | 并发={processor.concurrency}")

    await asyncio.gather(*tasks)

async def shutdown_all():
    logger.info("🛑 优雅关闭所有消费者...")
    for consumer in consumers:
        await consumer.stop()
    logger.info("✅ 所有消费者已关闭")

def handle_exit_signal(*args, **kwargs):
    asyncio.create_task(shutdown_all())

async def main():
    env = EnvLoader()
    amqp_url = env.get("RABBITMQ_URL")
    task_ids_str = env.get("TASK_IDS", "").strip()

    if not task_ids_str:
        logger.warning("⚠️ 未配置 TASK_IDS")
        return

    task_ids = [t.strip() for t in task_ids_str.split(",") if t.strip()]
    valid_tasks = [t for t in task_ids if t in TASK_REGISTRY]

    loop = asyncio.get_running_loop()
    for sig in (signal.SIGINT, signal.SIGTERM):
        loop.add_signal_handler(sig, handle_exit_signal)

    await run_all_consumers(amqp_url, valid_tasks)

if __name__ == "__main__":
    try:
        asyncio.run(main())
    except KeyboardInterrupt:
        logger.info("👋 服务已安全退出")

4. Publishing Tasks (publisher.py)

import asyncio
import logging
from async_task_kit import RabbitMQ, setup_logger

setup_logger()
logger = logging.getLogger(__name__)

async def publish():
    rmq = RabbitMQ("amqp://guest:guest@localhost/")
    await rmq.init()
    
    await rmq.push("my_demo_queue", {"message": "Hello from async-task-kit!"})
    logger.info("Task published successfully.")
    
    await rmq.close()

if __name__ == "__main__":
    asyncio.run(publish())

License

MIT

Contact & Support

If you have any questions, suggestions, or need help with this library, feel free to reach out!

WeChat (微信): realwrtoff
Email: realwrtoff@gmail.com

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

async_task_kit-0.1.0.tar.gz (4.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

async_task_kit-0.1.0-py3-none-any.whl (4.5 kB view details)

Uploaded Python 3

File details

Details for the file async_task_kit-0.1.0.tar.gz.

File metadata

  • Download URL: async_task_kit-0.1.0.tar.gz
  • Upload date:
  • Size: 4.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.9

File hashes

Hashes for async_task_kit-0.1.0.tar.gz
Algorithm Hash digest
SHA256 76d0a5aa9e64f860714cd1569025d28f497d56c6993b10a0cd1a7cab03fd1767
MD5 e676717ea24dd6a3cd30b61f4c93602c
BLAKE2b-256 e4f9a4e6809f922f6c5503a4b0e6c260b17395c9218fdd5fb364e876bd6cdc61

See more details on using hashes here.

File details

Details for the file async_task_kit-0.1.0-py3-none-any.whl.

File metadata

  • Download URL: async_task_kit-0.1.0-py3-none-any.whl
  • Upload date:
  • Size: 4.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.9

File hashes

Hashes for async_task_kit-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 79e26d509cc30f26e4c2fd863415a5f21a9bf220a6cd4523ccd18f996b1ae4b6
MD5 cbf6dc20c340b957ef0bf70ea7e48575
BLAKE2b-256 c235317f82bb522653caa5da21ff6628b6a5845d104f835e13bd244a9e9558c6

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page