Skip to main content

Eager broker for dramatiq

Project description

dramatiq-eager-broker

An eager broker for Dramatiq that executes tasks synchronously and immediately, without queuing. Perfect for testing and development environments.

Features

  • Synchronous task execution
  • No message broker required (Redis, RabbitMQ, etc.)
  • Pipeline support
  • Middleware support
  • Drop-in replacement for testing

Installation

pip install dramatiq-eager-broker

Or with uv:

uv add dramatiq-eager-broker

Usage

import dramatiq
from dramatiq_eager_broker import EagerBroker

broker = EagerBroker(middleware=[])
dramatiq.set_broker(broker)

@dramatiq.actor
def send_email(email, message):
    print(f"Sending email to {email}: {message}")

# Tasks are executed immediately and synchronously
send_email.send("user@example.com", "Hello!")

Testing Example

import dramatiq
import pytest
from dramatiq_eager_broker import EagerBroker


@pytest.fixture
def eager_broker():
    broker = EagerBroker(middleware=[])
    dramatiq.set_broker(broker)
    yield broker
    dramatiq.set_broker(None)


def test_my_actor(eager_broker):
    results = []

    @dramatiq.actor
    def my_task(value):
        results.append(value)

    my_task.send("test")
    assert results == ["test"]

Pipeline Support

The eager broker supports Dramatiq pipelines:

from dramatiq.middleware import Pipelines

@pytest.fixture
def eager_broker():
    broker = EagerBroker(middleware=[Pipelines()])
    dramatiq.set_broker(broker)
    yield broker
    dramatiq.set_broker(None)

@dramatiq.actor
def add(x, y):
    return x + y

@dramatiq.actor
def multiply(result, factor):
    return result * factor

# Create and execute a pipeline
pipeline = add.message(2, 3) | multiply.message(factor=10)
broker.enqueue(pipeline.messages[0])

Results Support

The eager broker supports Dramatiq get_result() if the Results middleware is loaded:

from dramatiq.middleware import Pipelines
from dramatiq.results import Results
from dramatiq.results.backends import StubBackend

@pytest.fixture
def eager_broker():
    broker = EagerBroker(middleware=[Pipelines(), Results(backend=StubBackend())])
    dramatiq.set_broker(broker)
    yield broker
    dramatiq.set_broker(None)

@dramatiq.actor(store_result=True)
def add(x, y):
    return x + y

@dramatiq.actor
def multiply(result, factor):
    return result * factor


# Create and execute a pipeline
pipeline = add.message(2, 3) | multiply.message(factor=10)
broker.enqueue(pipeline.messages[0])

assert pipeline.get_result() == 60

Results with exceptions

If the task should be executed synchronously but the actor should not raise an exception, use fail_fast=False. This mimics the conventional Dramatiq behavior which stuffs the exception into the message.

from dramatiq.middleware import Pipelines, Retries
from dramatiq.results import Results, ResultFailure
from dramatiq.results.backends import StubBackend

@pytest.fixture
def eager_broker():
    broker = EagerBroker(middleware=[Pipelines(), Retries(), Results(backend=StubBackend())], fail_fast=False)
    dramatiq.set_broker(broker)
    yield broker
    dramatiq.set_broker(None)

@dramatiq.actor(store_result=True, throws=ValueError)
def add(x, y):
    if x < y:
        raise ValueError("x < y")

message = add.send(2, 3)
with pytest.raises(ResultFailure):
    message.get_result()

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

dramatiq_eager_broker-0.3.0.tar.gz (2.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

dramatiq_eager_broker-0.3.0-py3-none-any.whl (3.1 kB view details)

Uploaded Python 3

File details

Details for the file dramatiq_eager_broker-0.3.0.tar.gz.

File metadata

File hashes

Hashes for dramatiq_eager_broker-0.3.0.tar.gz
Algorithm Hash digest
SHA256 c5f1491195df2299251275c1390f1c33fac26ba8aff9feec5496555441df9a02
MD5 ff50cd57d8dea7c37048288a92cd9d0e
BLAKE2b-256 9182eab853b2822803ff9b5b7b17ab8737524c8ed888b9070612cc4ebd821f86

See more details on using hashes here.

File details

Details for the file dramatiq_eager_broker-0.3.0-py3-none-any.whl.

File metadata

File hashes

Hashes for dramatiq_eager_broker-0.3.0-py3-none-any.whl
Algorithm Hash digest
SHA256 332cfee6d6b598ad322b0e768eb05a97c591e8c365a0d275ce64fad2bdfb4402
MD5 6615e88f0b242ff1304ffdd623d4f673
BLAKE2b-256 1e9cbdfdeae6c63dfbdf6688784471ac3ecf7a6071d7e0600d4856a81f4e01f9

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page