Skip to main content

开箱即用的 Dramatiq Worker 启动包

Project description

Dramatiq Worker Starter

开箱即用的 Dramatiq Worker 启动包,简化 Dramatiq Worker 的配置和启动流程。

特性

  • 开箱即用 - 最少配置即可启动 Worker
  • 可扩展 - 支持自定义中间件、队列、Actor
  • 类型安全 - 完整的类型提示
  • 配置灵活 - 通过参数传入配置
  • 内置中间件:
    • TimingMiddleware - 记录任务执行耗时
    • WorkerInfoMiddleware - Worker 心跳上报
    • ReadableRedisBackend - 可读的结果键名

安装

pip install dramatiq-worker-starter

或从源码安装:

git clone <repository-url>
cd dramatiq-worker-starter
pip install -e .

快速开始

定义 Actor

from dramatiq_worker_starter import ActorBase

@ActorBase.actor(queue_name="default")
def hello_task(name: str) -> str:
    return f"Hello, {name}!"

启动 Worker

from dramatiq_worker_starter import ActorBase, init_broker, run_worker
from dramatiq_worker_starter.utils import setup_logging

# 导入 Actor(必须在使用前导入)
from my_actors import hello_task

# 配置日志
setup_logging()

# 初始化 Broker
broker = init_broker(
    redis_host="localhost",
    redis_port=6379,
    redis_db=0,
    redis_db_result=1,
)

# 启动 Worker
if __name__ == "__main__":
    run_worker(broker)

发送任务

from dramatiq_worker_starter import ActorBase, init_broker

# 初始化 Broker(与 Worker 使用相同的配置)
broker = init_broker(
    redis_host="localhost",
    redis_port=6379,
    redis_db=0,
    redis_db_result=1,
)

# 发送任务
message = ActorBase.send("hello_task", "Alice")
print(f"Task sent with message_id: {message.message_id}")

# 延迟发送任务(5秒后执行)
delayed_message = ActorBase.send_with_options("hello_task", "Bob", delay=5000)

配置

init_broker 参数

broker = init_broker(
    redis_host: str = "localhost",
    redis_port: int = 6379,
    redis_db: int = 0,
    redis_password: str = "",
    redis_db_result: int = 1,
    namespace: str = "dramatiq-result",
    heartbeat_interval: int = 30,
    worker_ttl: int = 120,
    custom_middleware: list | None = None,
)

模块说明

ActorBase

Actor 基类,提供便捷的 Actor 定义方式:

from dramatiq_worker_starter import ActorBase

@ActorBase.actor(
    queue_name="custom",
    max_retries=3,
    min_backoff=1000,
    max_backoff=30000,
)
def my_task(data: str) -> str:
    return data.upper()

Worker

Worker 启动器,支持自定义配置:

from dramatiq_worker_starter import Worker

worker = Worker(
    broker_instance=broker,
    queues=["default", "custom"],
    worker_threads=4,
    worker_timeout=3600000,
)
worker.start()

Middleware

TimingMiddleware

记录任务执行耗时:

from dramatiq_worker_starter import init_broker, TimingMiddleware

broker = init_broker(custom_middleware=[TimingMiddleware()])

WorkerInfoMiddleware

Worker 心跳上报:

from dramatiq_worker_starter import init_broker, WorkerInfoMiddleware

broker = init_broker(
    custom_middleware=[WorkerInfoMiddleware(heartbeat_interval=15)]
)

自定义中间件

import dramatiq
from dramatiq.middleware import Middleware

class CustomMiddleware(Middleware):
    def before_process_message(self, broker, message):
        print(f"Task started: {message.actor_name}")

    def after_process_message(self, broker, message, *, result=None, exception=None):
        if exception:
            print(f"Task failed: {exception}")
        else:
            print(f"Task completed: {message.actor_name}")

broker = init_broker(custom_middleware=[CustomMiddleware()])

项目结构

dramatiq-worker-starter/
├── src/
│   └── dramatiq_worker_starter/
│       ├── __init__.py           # 包导出
│       ├── broker.py             # Broker 初始化
│       ├── middleware.py         # 内置中间件
│       ├── worker.py             # Worker 启动器
│       ├── actors/               # Actor 模块
│       │   ├── __init__.py
│       │   └── base.py           # Actor 基类
│       └── utils/                # 工具模块
│           ├── __init__.py
│           └── logger.py         # 日志工具
├── examples/                     # 使用示例
│   ├── simple/
│   │   ├── actors.py
│   │   ├── main.py
│   │   └── tasks.py
│   └── advanced/
│       ├── custom_middleware.py
│       ├── main.py
│       └── tasks.py
└── tests/                        # 测试用例

示例

Simple Example

启动 Worker:

python -m examples.simple.main

发送任务:

python -m examples.simple.tasks

Advanced Example

启动 Worker(自定义配置):

python -m examples.advanced.main

发送任务并查询结果:

python -m examples.advanced.tasks

Redis 数据结构

Worker 信息

  • dramatiq:workers:active (ZSET): 活跃 Worker 集合
  • dramatiq:worker:{worker_id}:actors (SET): Worker 支持的 Actor 列表
  • dramatiq:worker:{worker_id}:info (HASH): Worker 详细信息

结果存储

  • dramatiq-result:{queue_name}:{actor_name}:{message_id} (LIST): 任务结果

结果格式:

{
  "result": { /* 实际结果 */ },
  "_timing": {
    "start_datetime": "2026-02-12T12:00:00.000000",
    "end_datetime": "2026-02-12T12:00:05.000000",
    "duration_ms": 5000,
    "queue_name": "default",
    "actor_name": "hello_task",
    "worker_id": "abc12345",
    "exception": null
  }
}

依赖

  • dramatiq[redis,watch]>=2.0.1
  • pydantic>=2.12.5

许可证

MIT License

贡献

欢迎提交 Issue 和 Pull Request!

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

dramatiq_worker_starter-0.1.0.tar.gz (13.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

dramatiq_worker_starter-0.1.0-py3-none-any.whl (14.1 kB view details)

Uploaded Python 3

File details

Details for the file dramatiq_worker_starter-0.1.0.tar.gz.

File metadata

  • Download URL: dramatiq_worker_starter-0.1.0.tar.gz
  • Upload date:
  • Size: 13.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.9.10 {"installer":{"name":"uv","version":"0.9.10"},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"macOS","version":null,"id":null,"libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for dramatiq_worker_starter-0.1.0.tar.gz
Algorithm Hash digest
SHA256 de1a7c0334c9d76f991defc1e4701a1e6cee85b0251069183c6f11d5e6a71576
MD5 fa9e5f8cc7796e5637cd43c1765c5594
BLAKE2b-256 616e686949bdce4c08a793fe83a831c701df54785fa749f6028ab1ba162321d1

See more details on using hashes here.

File details

Details for the file dramatiq_worker_starter-0.1.0-py3-none-any.whl.

File metadata

  • Download URL: dramatiq_worker_starter-0.1.0-py3-none-any.whl
  • Upload date:
  • Size: 14.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.9.10 {"installer":{"name":"uv","version":"0.9.10"},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"macOS","version":null,"id":null,"libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for dramatiq_worker_starter-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 5059a7858288d8abfb528480aff3847f20559a4098ac2f88dfbbd86d870a436a
MD5 46451d880927a63bc1edfb50709fcdbe
BLAKE2b-256 a08865f922e57640311b53aa0cb86efb9a9a8d7d398cab3b1ddf4c3916fb8f26

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page