Skip to main content

A powerful async task processing kit based on RabbitMQ with Coroutine, Thread, and Process support.

Project description

async-task-kit

A powerful async task processing kit based on RabbitMQ with Coroutine, Thread, and Process support.

Installation

pip install async-task-kit

Features

  • RabbitMQ Client: Robust connection pooling and delay/dead-letter queue support.
  • Multiple Consumer Models: Support for Coroutine (asyncio), Thread, and Process consumers depending on your workload (I/O-bound vs CPU-bound).
  • Extensible Processor: Easily define your task logic by inheriting TaskProcessor.
  • Built-in Logger & EnvLoader: Useful utilities for production-ready applications.

Quick Start

1. Configuration (.env)

Use the built-in EnvLoader to manage your environment variables. Create a .env file:

RABBITMQ_URL=amqp://guest:guest@localhost/
TASK_IDS=demo_task

# You can also configure specific task settings using the {TASK_ID}_ prefix
DEMO_TASK_QUEUE_NAME=my_demo_queue
DEMO_TASK_CONCURRENCY=3

2. Define your Task Processor (demo_processor.py)

import logging
from async_task_kit import TaskProcessor

logger = logging.getLogger(__name__)

class DemoProcessor(TaskProcessor):
    async def process(self, task: dict):
        logger.info(f"Processing task: {task}")
        # Return any truthy value (e.g., dict, object, True) for success and pass to callback. 
        # Return None or False to trigger retry.
        return {"status": "ok", "processed_data": task}

    async def callback(self, task: dict, result: any):
        logger.info(f"Task completed with result: {result}")

3. Main Consumer Application (main.py)

A production-ready setup with signal handling for graceful shutdown.

import asyncio
import logging
import signal
from typing import List, Type

from demo_processor import DemoProcessor

# -------------- 只需要改这一行来切换并发模型 --------------
from async_task_kit import CoroutineConsumer as Consumer
# from async_task_kit import ThreadConsumer as Consumer
# from async_task_kit import ProcessConsumer as Consumer
# --------------------------------------------------------

from async_task_kit import TaskProcessor, EnvLoader, setup_logger

# Initialize logger
setup_logger()
logger = logging.getLogger(__name__)

# Register your processors
TASK_REGISTRY: dict[str, Type[TaskProcessor]] = {
    "demo_task": DemoProcessor,
}

consumers: List[Consumer] = []

async def run_all_consumers(amqp_url: str, task_ids: List[str]):
    tasks = []
    for task_id in task_ids:
        if task_id not in TASK_REGISTRY:
            continue

        processor_cls = TASK_REGISTRY[task_id]
        processor = processor_cls(task_id=task_id)

        consumer = Consumer(
            amqp_url=amqp_url,
            queue_name=processor.queue_name,
            processor=processor,
            concurrency=processor.concurrency,
        )
        consumers.append(consumer)
        tasks.append(consumer.start())

        logger.info(f"🚀 启动任务 [{task_id}] | queue={processor.queue_name} | 并发={processor.concurrency}")

    await asyncio.gather(*tasks)

async def shutdown_all():
    logger.info("🛑 优雅关闭所有消费者...")
    for consumer in consumers:
        await consumer.stop()
    logger.info("✅ 所有消费者已关闭")

def handle_exit_signal(*args, **kwargs):
    asyncio.create_task(shutdown_all())

async def main():
    env = EnvLoader()
    amqp_url = env.get("RABBITMQ_URL")
    task_ids_str = env.get("TASK_IDS", "").strip()

    if not task_ids_str:
        logger.warning("⚠️ 未配置 TASK_IDS")
        return

    task_ids = [t.strip() for t in task_ids_str.split(",") if t.strip()]
    valid_tasks = [t for t in task_ids if t in TASK_REGISTRY]

    loop = asyncio.get_running_loop()
    for sig in (signal.SIGINT, signal.SIGTERM):
        loop.add_signal_handler(sig, handle_exit_signal)

    await run_all_consumers(amqp_url, valid_tasks)

if __name__ == "__main__":
    try:
        asyncio.run(main())
    except KeyboardInterrupt:
        logger.info("👋 服务已安全退出")

4. Publishing Tasks (publisher.py)

import asyncio
import logging
from async_task_kit import RabbitMQ, setup_logger

setup_logger()
logger = logging.getLogger(__name__)

async def publish():
    rmq = RabbitMQ("amqp://guest:guest@localhost/")
    await rmq.init()
    
    await rmq.push("my_demo_queue", {"message": "Hello from async-task-kit!"})
    logger.info("Task published successfully.")
    
    await rmq.close()

if __name__ == "__main__":
    asyncio.run(publish())

License

MIT

Contact & Support

If you have any questions, suggestions, or need help with this library, feel free to reach out!

WeChat (微信): realwrtoff
Email: realwrtoff@gmail.com

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

async_task_kit-0.1.1.tar.gz (10.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

async_task_kit-0.1.1-py3-none-any.whl (12.9 kB view details)

Uploaded Python 3

File details

Details for the file async_task_kit-0.1.1.tar.gz.

File metadata

  • Download URL: async_task_kit-0.1.1.tar.gz
  • Upload date:
  • Size: 10.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.9

File hashes

Hashes for async_task_kit-0.1.1.tar.gz
Algorithm Hash digest
SHA256 4fdfa3b3ca1dc4a00c143ac128e3d889ad709748fc1490cb89128292aaa1e61e
MD5 638697df5046440ef6229f9cc684409c
BLAKE2b-256 5a0e2c001236ba2642cb7655d8842c26ba67c0818e902606a30c7d6468977322

See more details on using hashes here.

File details

Details for the file async_task_kit-0.1.1-py3-none-any.whl.

File metadata

  • Download URL: async_task_kit-0.1.1-py3-none-any.whl
  • Upload date:
  • Size: 12.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.9

File hashes

Hashes for async_task_kit-0.1.1-py3-none-any.whl
Algorithm Hash digest
SHA256 21716d3cb7739d7027c4cf7a17753ecab8ae92a4bd5729d3ff3a8e6c3d6d58e7
MD5 eada10aee453c38fc8bdcdcae76c3fee
BLAKE2b-256 da8496978047dbeb26ac7d0619bd486d635293caba1c93f35e68a398e53efc7f

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page