Skip to main content

Dramatiq with Asyncio support and some other goodies

Project description

Quick Start

Background

Dramatiq is a background task-processing library for Python with a focus on simplicity, reliability and performance.

This package, async-dramatiq, extends Dramatiq to provide the following:

  1. Support for Asyncio ( issue #238 )
  2. Message scheduling support ( scheduling cookbook )

Setup

To provide async support for your actors all you need to do is add the AsyncMiddleware to your broker.

RabbitMQ Broker

import dramatiq
from dramatiq.brokers.rabbitmq import RabbitmqBroker

rabbitmq_broker = RabbitmqBroker(host="rabbitmq")
rabbitmq_broker.add_middleware(AsyncMiddleware())  # <--- Here
dramatiq.set_broker(rabbitmq_broker)

Redis Broker

import dramatiq
from dramatiq.brokers.redis import RedisBroker

redis_broker = RedisBroker(host="redis")
redis_broker.add_middleware(AsyncMiddleware()) # <--- Here
dramatiq.set_broker(redis_broker)

Running

The Scheduler

We leverage apscheduler as our scheduling system. Check out run_scheduler.py for an example of running this scheduler.

Dramatiq Worker

For more details check out the official guide to dramatiq or docker-compose.yaml for a specific example.

Example

Play around with worker-heartbeat-example. A functioning and featured example implementation.


Async Middleware

AsyncMiddleware will start a AsyncWorker which will be used to run the event loop. This event loop will be shared across the Worker threads.

Startup and Shutdown Events

To startup and shutdown any resources the AsyncMiddleware provides two hooks:

  1. Before the event loop is started
  2. After the event loop is stopped To allow for standing up or tearing down of shared async resources

Example

from async_dramatiq.middleware import AsyncMiddleware

async def startup() -> None:
    """This function should contain your resource initialization code."""
    pass

async def shutdown() -> None:
    """This function should contain your resource teardown code."""
    pass


class MyAsyncMiddleware(AsyncMiddleware):
    def before_async_worker_thread_startup(
        self, _: RabbitmqBroker, thread: AsyncWorker, **kwargs: dict[str, Any]
    ) -> None:
        thread.event_loop.run_until_complete(startup())

    def after_async_worker_thread_shutdown(
        self, _: RabbitmqBroker, thread: AsyncWorker, **kwargs: dict[str, Any]
    ) -> None:
        thread.event_loop.run_until_complete(shutdown())
        thread.event_loop.close()

Async Actor

The async actor, async_actor, acts as a thin wrapper around the Dramatiq actor providing a variety of new functionality.

Interval Jobs

Run a job at some interval

@async_actor(interval=timedelta(seconds=5))
def run_every_5_seconds() -> None:
    pass

Cron Jobs

Run a job on a crontab ( See https://crontab.guru/. )

@async_actor(interval="0 0 * * *")
def run_at_midnight() -> None:
  pass

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

async_dramatiq-0.1.8.tar.gz (10.8 kB view details)

Uploaded Source

Built Distribution

async_dramatiq-0.1.8-py3-none-any.whl (10.1 kB view details)

Uploaded Python 3

File details

Details for the file async_dramatiq-0.1.8.tar.gz.

File metadata

  • Download URL: async_dramatiq-0.1.8.tar.gz
  • Upload date:
  • Size: 10.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.4.2 CPython/3.11.3 Darwin/21.6.0

File hashes

Hashes for async_dramatiq-0.1.8.tar.gz
Algorithm Hash digest
SHA256 0e26ef0ee6c2697b85997007aab1ebf705e547b9b3edf1034a6bcf06477fab1a
MD5 3311861755b3337589c3faa90c4a8465
BLAKE2b-256 c3c399860188e308663b20da2ebf4e69f9dadc7c6d5710bce054d870458dce8c

See more details on using hashes here.

File details

Details for the file async_dramatiq-0.1.8-py3-none-any.whl.

File metadata

  • Download URL: async_dramatiq-0.1.8-py3-none-any.whl
  • Upload date:
  • Size: 10.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.4.2 CPython/3.11.3 Darwin/21.6.0

File hashes

Hashes for async_dramatiq-0.1.8-py3-none-any.whl
Algorithm Hash digest
SHA256 7306b40a31fb182970c0509d2c097fcb132ddb3d6a77860408e07f135cb35fc1
MD5 407cab696715380ee17dc0f7de07ace0
BLAKE2b-256 5021eb416537db4ce38af133b2e264f6fcd6133d816cf8d324f9b0271c576a65

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page