开箱即用的 Dramatiq Worker 启动包
Project description
Dramatiq Worker Starter
开箱即用的 Dramatiq Worker 启动包,简化 Dramatiq Worker 的配置和启动流程。
特性
- 开箱即用 - 最少配置即可启动 Worker
- 可扩展 - 支持自定义中间件、队列、Actor
- 类型安全 - 完整的类型提示
- 配置灵活 - 通过参数传入配置
- 内置中间件:
TimingMiddleware- 记录任务执行耗时WorkerInfoMiddleware- Worker 心跳上报ReadableRedisBackend- 可读的结果键名
安装
pip install dramatiq-worker-starter
或从源码安装:
git clone <repository-url>
cd dramatiq-worker-starter
pip install -e .
快速开始
定义 Actor
from dramatiq_worker_starter import ActorBase
@ActorBase.actor(queue_name="default")
def hello_task(name: str) -> str:
return f"Hello, {name}!"
启动 Worker
from dramatiq_worker_starter import ActorBase, init_broker, run_worker
from dramatiq_worker_starter.utils import setup_logging
# 导入 Actor(必须在使用前导入)
from my_actors import hello_task
# 配置日志
setup_logging()
# 初始化 Broker
broker = init_broker(
redis_host="localhost",
redis_port=6379,
redis_db=0,
redis_db_result=1,
)
# 启动 Worker
if __name__ == "__main__":
run_worker(broker)
发送任务
from dramatiq_worker_starter import ActorBase, init_broker
# 初始化 Broker(与 Worker 使用相同的配置)
broker = init_broker(
redis_host="localhost",
redis_port=6379,
redis_db=0,
redis_db_result=1,
)
# 发送任务
message = ActorBase.send("hello_task", "Alice")
print(f"Task sent with message_id: {message.message_id}")
# 延迟发送任务(5秒后执行)
delayed_message = ActorBase.send_with_options("hello_task", "Bob", delay=5000)
配置
init_broker 参数
broker = init_broker(
redis_host: str = "localhost",
redis_port: int = 6379,
redis_db: int = 0,
redis_password: str = "",
redis_db_result: int = 1,
namespace: str = "dramatiq-result",
heartbeat_interval: int = 30,
worker_ttl: int = 120,
custom_middleware: list | None = None,
)
模块说明
ActorBase
Actor 基类,提供便捷的 Actor 定义方式:
from dramatiq_worker_starter import ActorBase
@ActorBase.actor(
queue_name="custom",
max_retries=3,
min_backoff=1000,
max_backoff=30000,
)
def my_task(data: str) -> str:
return data.upper()
Worker
Worker 启动器,支持自定义配置:
from dramatiq_worker_starter import Worker
worker = Worker(
broker_instance=broker,
queues=["default", "custom"],
worker_threads=4,
worker_timeout=3600000,
)
worker.start()
Middleware
TimingMiddleware
记录任务执行耗时:
from dramatiq_worker_starter import init_broker, TimingMiddleware
broker = init_broker(custom_middleware=[TimingMiddleware()])
WorkerInfoMiddleware
Worker 心跳上报:
from dramatiq_worker_starter import init_broker, WorkerInfoMiddleware
broker = init_broker(
custom_middleware=[WorkerInfoMiddleware(heartbeat_interval=15)]
)
自定义中间件
import dramatiq
from dramatiq.middleware import Middleware
class CustomMiddleware(Middleware):
def before_process_message(self, broker, message):
print(f"Task started: {message.actor_name}")
def after_process_message(self, broker, message, *, result=None, exception=None):
if exception:
print(f"Task failed: {exception}")
else:
print(f"Task completed: {message.actor_name}")
broker = init_broker(custom_middleware=[CustomMiddleware()])
项目结构
dramatiq-worker-starter/
├── src/
│ └── dramatiq_worker_starter/
│ ├── __init__.py # 包导出
│ ├── broker.py # Broker 初始化
│ ├── middleware.py # 内置中间件
│ ├── worker.py # Worker 启动器
│ ├── actors/ # Actor 模块
│ │ ├── __init__.py
│ │ └── base.py # Actor 基类
│ └── utils/ # 工具模块
│ ├── __init__.py
│ └── logger.py # 日志工具
├── examples/ # 使用示例
│ ├── simple/
│ │ ├── actors.py
│ │ ├── main.py
│ │ └── tasks.py
│ └── advanced/
│ ├── custom_middleware.py
│ ├── main.py
│ └── tasks.py
└── tests/ # 测试用例
示例
Simple Example
启动 Worker:
python -m examples.simple.main
发送任务:
python -m examples.simple.tasks
Advanced Example
启动 Worker(自定义配置):
python -m examples.advanced.main
发送任务并查询结果:
python -m examples.advanced.tasks
Redis 数据结构
Worker 信息
dramatiq:workers:active(ZSET): 活跃 Worker 集合dramatiq:worker:{worker_id}:actors(SET): Worker 支持的 Actor 列表dramatiq:worker:{worker_id}:info(HASH): Worker 详细信息
结果存储
dramatiq-result:{queue_name}:{actor_name}:{message_id}(LIST): 任务结果
结果格式:
{
"result": { /* 实际结果 */ },
"_timing": {
"start_datetime": "2026-02-12T12:00:00.000000",
"end_datetime": "2026-02-12T12:00:05.000000",
"duration_ms": 5000,
"queue_name": "default",
"actor_name": "hello_task",
"worker_id": "abc12345",
"exception": null
}
}
依赖
dramatiq[redis,watch]>=2.0.1pydantic>=2.12.5
许可证
MIT License
贡献
欢迎提交 Issue 和 Pull Request!
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file dramatiq_worker_starter-0.1.0.tar.gz.
File metadata
- Download URL: dramatiq_worker_starter-0.1.0.tar.gz
- Upload date:
- Size: 13.7 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.9.10 {"installer":{"name":"uv","version":"0.9.10"},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"macOS","version":null,"id":null,"libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
de1a7c0334c9d76f991defc1e4701a1e6cee85b0251069183c6f11d5e6a71576
|
|
| MD5 |
fa9e5f8cc7796e5637cd43c1765c5594
|
|
| BLAKE2b-256 |
616e686949bdce4c08a793fe83a831c701df54785fa749f6028ab1ba162321d1
|
File details
Details for the file dramatiq_worker_starter-0.1.0-py3-none-any.whl.
File metadata
- Download URL: dramatiq_worker_starter-0.1.0-py3-none-any.whl
- Upload date:
- Size: 14.1 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.9.10 {"installer":{"name":"uv","version":"0.9.10"},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"macOS","version":null,"id":null,"libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
5059a7858288d8abfb528480aff3847f20559a4098ac2f88dfbbd86d870a436a
|
|
| MD5 |
46451d880927a63bc1edfb50709fcdbe
|
|
| BLAKE2b-256 |
a08865f922e57640311b53aa0cb86efb9a9a8d7d398cab3b1ddf4c3916fb8f26
|