A powerful async task processing kit based on RabbitMQ with Coroutine, Thread, and Process support.
Project description
async-task-kit
A powerful async task processing kit based on RabbitMQ with Coroutine, Thread, and Process support.
Installation
pip install async-task-kit
Features
- RabbitMQ Client: Robust connection pooling and delay/dead-letter queue support.
- Multiple Consumer Models: Support for Coroutine (asyncio), Thread, and Process consumers depending on your workload (I/O-bound vs CPU-bound).
- Extensible Processor: Easily define your task logic by inheriting
TaskProcessor. - Built-in Logger & EnvLoader: Useful utilities for production-ready applications.
Quick Start
1. Configuration (.env)
Use the built-in EnvLoader to manage your environment variables. Create a .env file:
RABBITMQ_URL=amqp://guest:guest@localhost/
TASK_IDS=demo_task
# You can also configure specific task settings using the {TASK_ID}_ prefix
DEMO_TASK_QUEUE_NAME=my_demo_queue
DEMO_TASK_CONCURRENCY=3
2. Define your Task Processor (demo_processor.py)
import logging
from async_task_kit import TaskProcessor
logger = logging.getLogger(__name__)
class DemoProcessor(TaskProcessor):
async def process(self, task: dict):
logger.info(f"Processing task: {task}")
# Return any truthy value (e.g., dict, object, True) for success and pass to callback.
# Return None or False to trigger retry.
return {"status": "ok", "processed_data": task}
async def callback(self, task: dict, result: any):
logger.info(f"Task completed with result: {result}")
3. Main Consumer Application (main.py)
A production-ready setup with signal handling for graceful shutdown.
import asyncio
import logging
import signal
from typing import List, Type
from demo_processor import DemoProcessor
# -------------- 只需要改这一行来切换并发模型 --------------
from async_task_kit import CoroutineConsumer as Consumer
# from async_task_kit import ThreadConsumer as Consumer
# from async_task_kit import ProcessConsumer as Consumer
# --------------------------------------------------------
from async_task_kit import TaskProcessor, EnvLoader, setup_logger
# Initialize logger
setup_logger()
logger = logging.getLogger(__name__)
# Register your processors
TASK_REGISTRY: dict[str, Type[TaskProcessor]] = {
"demo_task": DemoProcessor,
}
consumers: List[Consumer] = []
async def run_all_consumers(amqp_url: str, task_ids: List[str]):
tasks = []
for task_id in task_ids:
if task_id not in TASK_REGISTRY:
continue
processor_cls = TASK_REGISTRY[task_id]
processor = processor_cls(task_id=task_id)
consumer = Consumer(
amqp_url=amqp_url,
queue_name=processor.queue_name,
processor=processor,
concurrency=processor.concurrency,
)
consumers.append(consumer)
tasks.append(consumer.start())
logger.info(f"🚀 启动任务 [{task_id}] | queue={processor.queue_name} | 并发={processor.concurrency}")
await asyncio.gather(*tasks)
async def shutdown_all():
logger.info("🛑 优雅关闭所有消费者...")
for consumer in consumers:
await consumer.stop()
logger.info("✅ 所有消费者已关闭")
def handle_exit_signal(*args, **kwargs):
asyncio.create_task(shutdown_all())
async def main():
env = EnvLoader()
amqp_url = env.get("RABBITMQ_URL")
task_ids_str = env.get("TASK_IDS", "").strip()
if not task_ids_str:
logger.warning("⚠️ 未配置 TASK_IDS")
return
task_ids = [t.strip() for t in task_ids_str.split(",") if t.strip()]
valid_tasks = [t for t in task_ids if t in TASK_REGISTRY]
loop = asyncio.get_running_loop()
for sig in (signal.SIGINT, signal.SIGTERM):
loop.add_signal_handler(sig, handle_exit_signal)
await run_all_consumers(amqp_url, valid_tasks)
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
logger.info("👋 服务已安全退出")
4. Publishing Tasks (publisher.py)
import asyncio
import logging
from async_task_kit import RabbitMQ, setup_logger
setup_logger()
logger = logging.getLogger(__name__)
async def publish():
rmq = RabbitMQ("amqp://guest:guest@localhost/")
await rmq.init()
await rmq.push("my_demo_queue", {"message": "Hello from async-task-kit!"})
logger.info("Task published successfully.")
await rmq.close()
if __name__ == "__main__":
asyncio.run(publish())
License
MIT
Contact & Support
If you have any questions, suggestions, or need help with this library, feel free to reach out!
WeChat (微信): realwrtoff
Email: realwrtoff@gmail.com
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file async_task_kit-0.1.0.tar.gz.
File metadata
- Download URL: async_task_kit-0.1.0.tar.gz
- Upload date:
- Size: 4.3 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.9
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
76d0a5aa9e64f860714cd1569025d28f497d56c6993b10a0cd1a7cab03fd1767
|
|
| MD5 |
e676717ea24dd6a3cd30b61f4c93602c
|
|
| BLAKE2b-256 |
e4f9a4e6809f922f6c5503a4b0e6c260b17395c9218fdd5fb364e876bd6cdc61
|
File details
Details for the file async_task_kit-0.1.0-py3-none-any.whl.
File metadata
- Download URL: async_task_kit-0.1.0-py3-none-any.whl
- Upload date:
- Size: 4.5 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.9
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
79e26d509cc30f26e4c2fd863415a5f21a9bf220a6cd4523ccd18f996b1ae4b6
|
|
| MD5 |
cbf6dc20c340b957ef0bf70ea7e48575
|
|
| BLAKE2b-256 |
c235317f82bb522653caa5da21ff6628b6a5845d104f835e13bd244a9e9558c6
|