Skip to main content

Dramatiq with Asyncio support and some other goodies

Project description

Dramatiq with Asyncio

Dramatiq is a background task processing library for Python with a focus on simplicity, reliability and performance.

This package extends dramatiq to provide the following:

  1. Support for Asyncio ( issue #238 )
  2. Message scheduling support ( scheduling cookbook )

Getting Started

The quickest way to get started is by playing around with the worker heartbeat example.

The Code

Async Middleware

Broker

To provide async support for your actors all you need to do is add the AsyncMiddleware to your broker.

RabbitMQ Broker

import dramatiq

from dramatiq.brokers.rabbitmq import RabbitmqBroker


rabbitmq_broker = RabbitmqBroker(host="rabbitmq")
rabbitmq_broker.add_middleware(AsyncMiddleware())  # <--- Here
dramatiq.set_broker(rabbitmq_broker)

Redis Broker

import dramatiq
import dramatiq

from dramatiq.brokers.redis import RedisBroker


redis_broker = RedisBroker(host="redis")
redis_broker.add_middleware(AsyncMiddleware()) # <--- Here
dramatiq.set_broker(redis_broker)

Startup and Shutdown Events

To startup and shutdown any resources the AsyncMiddleware provides two hooks:

  1. Before the event loop is started
  2. After the event loop is stopped To allow for standing up or tearing down of shared async resources

Example

from async_dramatiq.middleware import AsyncMiddleware

async def startup() -> None:
    """This function should contain your resource initialization code."""
    pass

async def shutdown() -> None:
    """This function should contain your resource teardown code."""
    pass


class MyAsyncMiddleware(AsyncMiddleware):
    def before_async_worker_thread_startup(
        self, _: RabbitmqBroker, thread: AsyncWorker, **kwargs: dict[str, Any]
    ) -> None:
        thread.event_loop.run_until_complete(startup())

    def after_async_worker_thread_shutdown(
        self, _: RabbitmqBroker, thread: AsyncWorker, **kwargs: dict[str, Any]
    ) -> None:
        thread.event_loop.run_until_complete(shutdown())
        thread.event_loop.close()

Async Actor

The async actor, async_dramatiq_actor, acts as a thin wrapper around the dramatiq actor providing a variety of new functionality.

Interval Jobs

Run a job at some interval

@async_dramatiq_actor(interval=timedelta(seconds=5))
def run_every_5_seconds() -> None:
    pass

Cron Jobs

Run a job on a crontab ( See https://crontab.guru/. )

@async_dramatiq_actor(interval="0 0 * * *")
def run_at_midnight() -> None:
  pass

Running

The Scheduler

Checkout run_scheduler.py for an example on running the scheduler.

Dramatiq Worker

Check out the offical guide dramatiq

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

async_dramatiq-0.1.6.tar.gz (10.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

async_dramatiq-0.1.6-py3-none-any.whl (10.0 kB view details)

Uploaded Python 3

File details

Details for the file async_dramatiq-0.1.6.tar.gz.

File metadata

  • Download URL: async_dramatiq-0.1.6.tar.gz
  • Upload date:
  • Size: 10.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.4.2 CPython/3.11.3 Darwin/21.6.0

File hashes

Hashes for async_dramatiq-0.1.6.tar.gz
Algorithm Hash digest
SHA256 2a46e1cf3d8d5fe967ecaf1c5352c967caaadf4198b9194382ebf16183d656a1
MD5 864df5fbc40306045dc8e0c9ab27824c
BLAKE2b-256 4c5e0e70ed664601f9b707d80e01ea6036f9e585cb5cc52e87afce9ec8800ab7

See more details on using hashes here.

File details

Details for the file async_dramatiq-0.1.6-py3-none-any.whl.

File metadata

  • Download URL: async_dramatiq-0.1.6-py3-none-any.whl
  • Upload date:
  • Size: 10.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.4.2 CPython/3.11.3 Darwin/21.6.0

File hashes

Hashes for async_dramatiq-0.1.6-py3-none-any.whl
Algorithm Hash digest
SHA256 86fc6d489fddc86c407da37db78865a86d17d2c6c54bede7f50a532890818f68
MD5 bc651e0fef7b4558a9fca002f5d8f736
BLAKE2b-256 7d17af40a4f6b191f31abe8959a8ddaaf4fbab813bd02de33a2d2edf2995bd49

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page