Skip to main content

Bidirectional Pydantic serialization for Celery, Dramatiq, and Arq

Project description

queuebridge

Pass Pydantic models to .delay() / .send() / enqueue_job() — get models back from results.

Bidirectional Pydantic typing for Celery, Dramatiq, and Arq with one shared wire codec.

The problem

Celery 5.5+ added pydantic=True, but it only validates on the worker:

  • Callers must still model_dump() before .delay() — passing a model raises TypeError: Object of type X is not JSON serializable (celery#9442)
  • .get() returns a dict, not your model

Dramatiq's default JSON encoder fails on models, UUIDs, and datetimes (dramatiq#660).

Arq defaults to pickle with no Pydantic story (arq#497).

Producer                    Worker                      Client
────────                    ──────                      ──────
.delay(model)  ──X──>      pydantic=True validates     .get() → dict
(model_dump() required)     args on worker only

queuebridge fixes the producer side and client-side result decoding with a shared __qb__ tagged wire format.

Install

pip install queuebridge[celery]     # Celery + Kombu
pip install queuebridge[dramatiq]   # Dramatiq
pip install queuebridge[arq]        # Arq + msgpack
pip install queuebridge[all]        # everything

Quickstart

Celery

from celery import Celery
from queuebridge.celery import register_queuebridge, typed_result
from myapp.models import OrderCreate, OrderResult

app = Celery("orders", broker="redis://localhost:6379/0")
register_queuebridge(app)

@app.task(pydantic=True)
def process_order(order: OrderCreate) -> OrderResult:
    return OrderResult(id=order.id, status="processed")

# Enqueue with a model directly
ar = process_order.delay(OrderCreate(id=1, sku="ABC"))

# Get a model back (not a dict)
result = typed_result(ar, OrderResult).get(timeout=10)

Note: Celery cannot safely monkey-patch AsyncResult.get() globally. Use typed_result() for typed client results.

Dramatiq

import dramatiq
from pydantic import validate_call
from queuebridge.dramatiq import register_queuebridge
from myapp.models import OrderCreate

register_queuebridge()

@dramatiq.actor
@validate_call
def process(order: OrderCreate):
    print(order)

process.send(OrderCreate(id=1, sku="ABC"))

Arq

from arq.connections import RedisSettings
from pydantic import validate_call
from queuebridge.arq import get_serializer_pair, qb_task, typed_result
from myapp.models import OrderCreate, OrderResult

serialize, deserialize = get_serializer_pair()

@qb_task
@validate_call
async def process_order(ctx, order: OrderCreate) -> OrderResult:
    return OrderResult(id=order.id, status="ok")

class WorkerSettings:
    functions = [process_order]
    redis_settings = RedisSettings()
    job_serializer = serialize
    job_deserializer = deserialize

Wire format

Non-JSON-native values are wrapped in a tagged envelope:

{
  "__qb__": {
    "t": "myapp.models.OrderCreate",
    "v": 1,
    "d": {"id": 1, "sku": "ABC"}
  }
}

Decode uses function type hints (TypeAdapter) when tags are absent — a plain dict + OrderCreate hint still validates.

Security

Deserialization resolves types by fully-qualified name (import_fqn). Only deserialize from brokers you trust. Module allowlisting is planned for v0.2.

Comparison

Solution Celery Dramatiq Arq Bidirectional .get()
Celery pydantic=True worker only no
Blog / msgpack hacks partial partial partial varies
queuebridge yes yes yes yes (typed_result)

Roadmap

  • allowed_modules security filter on register_queuebridge()
  • Optional pickle extra
  • Chord / chain signature support

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

queuebridge-0.1.0.tar.gz (13.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

queuebridge-0.1.0-py3-none-any.whl (11.6 kB view details)

Uploaded Python 3

File details

Details for the file queuebridge-0.1.0.tar.gz.

File metadata

  • Download URL: queuebridge-0.1.0.tar.gz
  • Upload date:
  • Size: 13.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for queuebridge-0.1.0.tar.gz
Algorithm Hash digest
SHA256 aabef639ff05a5d2826fd59083ab6fc7125eed96e73cb122dd22abecbf473985
MD5 2bb47bf5a1f52eb6503a70bacea42df8
BLAKE2b-256 2a1c5326c3b77eba145fcbe05c4cc33ac499b202178ac1174e99afeb35efbba2

See more details on using hashes here.

Provenance

The following attestation bundles were made for queuebridge-0.1.0.tar.gz:

Publisher: publish.yml on false200/queuebridge

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file queuebridge-0.1.0-py3-none-any.whl.

File metadata

  • Download URL: queuebridge-0.1.0-py3-none-any.whl
  • Upload date:
  • Size: 11.6 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for queuebridge-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 150095864b3fed0b9a58956a7e00d4f301eca883b64a4221cf0ccc0700f11600
MD5 bae815db05b55d82261e5c086af33ee7
BLAKE2b-256 313e839fd185d5fe9cfaf47b209786fd65931d0965098886230807da4c40373d

See more details on using hashes here.

Provenance

The following attestation bundles were made for queuebridge-0.1.0-py3-none-any.whl:

Publisher: publish.yml on false200/queuebridge

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page