Skip to main content

开箱即用的 Dramatiq Worker 启动包

Project description

Dramatiq Worker Starter

开箱即用的 Dramatiq Worker 启动包,简化 Dramatiq Worker 的配置和启动流程。

特性

  • 开箱即用 - 最少配置即可启动 Worker
  • 可扩展 - 支持自定义中间件、队列、Actor
  • 类型安全 - 完整的类型提示
  • 配置灵活 - 通过参数传入配置
  • 内置中间件:
    • TimingMiddleware - 记录任务执行耗时
    • WorkerInfoMiddleware - Worker 心跳上报
    • ReadableRedisBackend - 可读的结果键名

安装

pip install dramatiq-worker-starter

或从源码安装:

git clone <repository-url>
cd dramatiq-worker-starter
pip install -e .

快速开始

定义 Actor

from dramatiq_worker_starter import ActorBase

@ActorBase.actor(queue_name="default")
def hello_task(name: str) -> str:
    return f"Hello, {name}!"

启动 Worker

from dramatiq_worker_starter import ActorBase, init_broker, run_worker
from dramatiq_worker_starter.utils import setup_logging

# 导入 Actor(必须在使用前导入)
from my_actors import hello_task

# 配置日志
setup_logging()

# 初始化 Broker
broker = init_broker(
    redis_host="localhost",
    redis_port=6379,
    redis_db=0,
    redis_db_result=1,
)

# 启动 Worker
if __name__ == "__main__":
    run_worker(broker)

发送任务

from dramatiq_worker_starter import ActorBase, init_broker

# 初始化 Broker(与 Worker 使用相同的配置)
broker = init_broker(
    redis_host="localhost",
    redis_port=6379,
    redis_db=0,
    redis_db_result=1,
)

# 发送任务
message = ActorBase.send("hello_task", "Alice")
print(f"Task sent with message_id: {message.message_id}")

# 延迟发送任务(5秒后执行)
delayed_message = ActorBase.send_with_options("hello_task", "Bob", delay=5000)

配置

init_broker 参数

broker = init_broker(
    redis_host: str = "localhost",
    redis_port: int = 6379,
    redis_db: int = 0,
    redis_password: str = "",
    redis_db_result: int = 1,
    namespace: str = "dramatiq-result",
    heartbeat_interval: int = 30,
    worker_ttl: int = 120,
    custom_middleware: list | None = None,
)

模块说明

ActorBase

Actor 基类,提供便捷的 Actor 定义方式:

from dramatiq_worker_starter import ActorBase

@ActorBase.actor(
    queue_name="custom",
    max_retries=3,
    min_backoff=1000,
    max_backoff=30000,
)
def my_task(data: str) -> str:
    return data.upper()

Worker

Worker 启动器,支持自定义配置:

from dramatiq_worker_starter import Worker

worker = Worker(
    broker_instance=broker,
    queues=["default", "custom"],
    worker_threads=4,
    worker_timeout=3600000,
)
worker.start()

Middleware

TimingMiddleware

记录任务执行耗时:

from dramatiq_worker_starter import init_broker, TimingMiddleware

broker = init_broker(custom_middleware=[TimingMiddleware()])

WorkerInfoMiddleware

Worker 心跳上报:

from dramatiq_worker_starter import init_broker, WorkerInfoMiddleware

broker = init_broker(
    custom_middleware=[WorkerInfoMiddleware(heartbeat_interval=15)]
)

自定义中间件

import dramatiq
from dramatiq.middleware import Middleware

class CustomMiddleware(Middleware):
    def before_process_message(self, broker, message):
        print(f"Task started: {message.actor_name}")

    def after_process_message(self, broker, message, *, result=None, exception=None):
        if exception:
            print(f"Task failed: {exception}")
        else:
            print(f"Task completed: {message.actor_name}")

broker = init_broker(custom_middleware=[CustomMiddleware()])

项目结构

dramatiq-worker-starter/
├── src/
│   └── dramatiq_worker_starter/
│       ├── __init__.py           # 包导出
│       ├── broker.py             # Broker 初始化
│       ├── middleware.py         # 内置中间件
│       ├── worker.py             # Worker 启动器
│       ├── actors/               # Actor 模块
│       │   ├── __init__.py
│       │   └── base.py           # Actor 基类
│       └── utils/                # 工具模块
│           ├── __init__.py
│           └── logger.py         # 日志工具
├── examples/                     # 使用示例
│   ├── simple/
│   │   ├── actors.py
│   │   ├── main.py
│   │   └── tasks.py
│   └── advanced/
│       ├── custom_middleware.py
│       ├── main.py
│       └── tasks.py
└── tests/                        # 测试用例

示例

Simple Example

启动 Worker:

python -m examples.simple.main

发送任务:

python -m examples.simple.tasks

Advanced Example

启动 Worker(自定义配置):

python -m examples.advanced.main

发送任务并查询结果:

python -m examples.advanced.tasks

Redis 数据结构

Worker 信息

  • dramatiq:workers:active (ZSET): 活跃 Worker 集合
  • dramatiq:worker:{worker_id}:actors (SET): Worker 支持的 Actor 列表
  • dramatiq:worker:{worker_id}:info (HASH): Worker 详细信息

结果存储

  • dramatiq-result:{queue_name}:{actor_name}:{message_id} (LIST): 任务结果

结果格式:

{
  "result": { /* 实际结果 */ },
  "_timing": {
    "start_datetime": "2026-02-12T12:00:00.000000",
    "end_datetime": "2026-02-12T12:00:05.000000",
    "duration_ms": 5000,
    "queue_name": "default",
    "actor_name": "hello_task",
    "worker_id": "abc12345",
    "exception": null
  }
}

依赖

  • dramatiq[redis,watch]>=2.0.1
  • pydantic>=2.12.5

许可证

MIT License

贡献

欢迎提交 Issue 和 Pull Request!

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

dramatiq_worker_starter-0.1.1a1.tar.gz (13.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

dramatiq_worker_starter-0.1.1a1-py3-none-any.whl (14.1 kB view details)

Uploaded Python 3

File details

Details for the file dramatiq_worker_starter-0.1.1a1.tar.gz.

File metadata

  • Download URL: dramatiq_worker_starter-0.1.1a1.tar.gz
  • Upload date:
  • Size: 13.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.9.10 {"installer":{"name":"uv","version":"0.9.10"},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"macOS","version":null,"id":null,"libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for dramatiq_worker_starter-0.1.1a1.tar.gz
Algorithm Hash digest
SHA256 2099761e0e18cac026894224a953a1986767c589ddb7395318a0d439e25899e9
MD5 89730ebe72178a14565a27930050f6aa
BLAKE2b-256 7cc576f718d44307088d96417724829408d882ea3a76b4085a16457715c7d678

See more details on using hashes here.

File details

Details for the file dramatiq_worker_starter-0.1.1a1-py3-none-any.whl.

File metadata

  • Download URL: dramatiq_worker_starter-0.1.1a1-py3-none-any.whl
  • Upload date:
  • Size: 14.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.9.10 {"installer":{"name":"uv","version":"0.9.10"},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"macOS","version":null,"id":null,"libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for dramatiq_worker_starter-0.1.1a1-py3-none-any.whl
Algorithm Hash digest
SHA256 08a23bab04452610ae01cc4dad3c1b48902efaeee78d4b27edae4c9b469103a6
MD5 60fd047710192db5ab8f198c58b40ccd
BLAKE2b-256 9ba9c66e6e5934288884a7d637570fbaf07494d57b56ed1922bffd6324a2fb8d

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page