Skip to main content

Bidirectional Pydantic serialization for Celery, Dramatiq, and Arq

Project description

queuebridge

PyPI version Python CI License: MIT

Bidirectional Pydantic serialization for Celery, Dramatiq, and Arq. One shared wire codec: pass models on enqueue, get models back from results.

Celery 5.5+ pydantic=True only validates on the worker. Callers still model_dump() before .delay(), and .get() returns a dict. Dramatiq chokes on models and UUIDs. Arq defaults to pickle. queuebridge fixes all three with a thin codec + backend adapters.

Install

pip install queuebridge

Extras:

pip install queuebridge[celery]     # Celery + Kombu
pip install queuebridge[dramatiq]   # Dramatiq
pip install queuebridge[arq]        # Arq + msgpack
pip install queuebridge[all]        # all backends

Requires Python 3.10+ and Pydantic v2.

Documentation: https://queuebridge.readthedocs.io

Usage

Celery

from celery import Celery
from queuebridge.celery import register_queuebridge, typed_result
from myapp.models import OrderCreate, OrderResult

app = Celery("orders", broker="redis://localhost:6379/0")
register_queuebridge(app)

@app.task(pydantic=True)
def process_order(order: OrderCreate) -> OrderResult:
    return OrderResult(id=order.id, status="processed")

ar = process_order.delay(OrderCreate(id=1, sku="ABC"))
result = typed_result(ar, OrderResult).get(timeout=10)

Dramatiq

import dramatiq
from pydantic import validate_call
from queuebridge.dramatiq import register_queuebridge
from myapp.models import OrderCreate

register_queuebridge()

@dramatiq.actor
@validate_call
def process(order: OrderCreate):
    print(order)

process.send(OrderCreate(id=1, sku="ABC"))

Arq

from arq.connections import RedisSettings
from pydantic import validate_call
from queuebridge.arq import get_serializer_pair, qb_task, typed_result
from myapp.models import OrderCreate, OrderResult

serialize, deserialize = get_serializer_pair()

@qb_task
@validate_call
async def process_order(ctx, order: OrderCreate) -> OrderResult:
    return OrderResult(id=order.id, status="ok")

class WorkerSettings:
    functions = [process_order]
    redis_settings = RedisSettings()
    job_serializer = serialize
    job_deserializer = deserialize

API

encode(value, *, tag_models=True)

Recursively transform a Python value into a JSON-serializable structure.

value

Required
Type: Any

The value to encode: Pydantic models, nested containers, UUID, datetime, Decimal, Enum, etc.

tag_models

Type: boolean
Default: true

When true, BaseModel instances are wrapped in a __qb__ envelope with a fully-qualified type name. When false, models are dumped with model_dump(mode="json") only.

from queuebridge import encode, decode
from myapp.models import OrderCreate

wire = encode(OrderCreate(id=1, sku="ABC"))
restored = decode(wire, OrderCreate)

decode(value, hint=Any, *, strict=False)

Recursively decode a wire value back to Python using an optional type hint.

value

Required
Type: Any

Wire value: primitives, lists, dicts, or __qb__ envelopes.

hint

Type: Any
Default: Any

Type hint used for validation. TypeAdapter(hint).validate_python() is used when the hint is concrete.

strict

Type: boolean
Default: false

When true, raise QueuebridgeDecodeError if the value cannot be decoded.


decode_wire(value)

Recursively unwrap __qb__ envelopes without type hints. Used internally by Dramatiq's decoder.

Type: Any -> Any


register_queuebridge(app, *, strict=False) (Celery)

Register the queuebridge-json Kombu serializer on a Celery app. Idempotent: safe to call twice.

app

Required
Type: celery.Celery

strict

Type: boolean
Default: false

Reserved for future strict decode behavior.

Sets task_serializer, result_serializer, and accept_content on the app.


typed_result(async_result, return_type) (Celery)

Wrap a Celery AsyncResult so .get() returns a Pydantic model instead of a dict.

async_result

Required
Type: celery.result.AsyncResult

return_type

Required
Type: type[T]

Returns TypedAsyncResult[T], which proxies .id, .state, .ready(), etc.

Celery cannot safely monkey-patch AsyncResult.get() globally. Use typed_result() on the client.


register_queuebridge(broker=None) (Dramatiq)

Install QueuebridgeEncoder via dramatiq.set_encoder(). Call once at process startup.

broker

Type: dramatiq.Broker | None
Default: None

If provided, also calls dramatiq.set_broker(broker).


get_serializer_pair() (Arq)

Returns (serialize, deserialize) callables for job_serializer / job_deserializer.

serialize, deserialize = get_serializer_pair()

Uses msgpack over queuebridge-encoded dicts. Set on both WorkerSettings and create_pool().


qb_task(fn) (Arq)

Decorator that decodes wire args/kwargs using function type hints before your async task runs.

Apply outside @validate_call:

@qb_task
@validate_call
async def process_order(ctx, order: OrderCreate) -> OrderResult:
    ...

typed_result(job, return_type) (Arq)

result = await typed_result(job, OrderResult)

Decode the job.result() payload into a Pydantic model.

Wire format

Non-JSON-native values use a tagged envelope:

{
  "__qb__": {
    "t": "myapp.models.OrderCreate",
    "v": 1,
    "d": {"id": 1, "sku": "ABC"}
  }
}
Python type Encode Decode
BaseModel envelope + model_dump(mode="json") model_validate or FQN import
UUID, datetime, Decimal, Enum tagged envelope builtin dispatch
list, dict, set, tuple recurse recurse via hint
Primitives pass through pass through

A plain dict + OrderCreate hint still validates. Tags are for ambiguity, not required when hints are known.

Why not Celery pydantic=True alone?

Producer                    Worker                      Client
------                      ------                      ------
.delay(model)  FAIL>        pydantic=True validates     .get() -> dict
(model_dump() required)     args on worker only

Comparison

Solution Celery Dramatiq Arq Typed .get()
Celery pydantic=True worker only n/a n/a no
Blog / msgpack hacks partial partial partial varies
queuebridge yes yes yes yes

Security

Deserialization resolves types by fully-qualified name (import_fqn). Only deserialize from brokers you trust.

ALLOWED_MODULE_PREFIXES allowlisting is planned for v0.2.

Examples

Path Description
examples/celery_fastapi/ FastAPI enqueue + typed result polling
examples/dramatiq_example/ Dramatiq + validate_call
examples/arq_example/ Arq worker with custom serializers
examples/smoke_test_complex.py End-to-end smoke test (no Redis)
pypi_verify/run_complex.py PyPI install verification script

Related

License

MIT. See LICENSE.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

queuebridge-0.1.1.tar.gz (27.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

queuebridge-0.1.1-py3-none-any.whl (16.0 kB view details)

Uploaded Python 3

File details

Details for the file queuebridge-0.1.1.tar.gz.

File metadata

  • Download URL: queuebridge-0.1.1.tar.gz
  • Upload date:
  • Size: 27.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for queuebridge-0.1.1.tar.gz
Algorithm Hash digest
SHA256 6f11f95c10a796f9e1496037ddc3b22aef361918ff10c818f1979b9c806d58cc
MD5 b10f14dd83390a6f4db21a0089497153
BLAKE2b-256 c483ccd776d95db1be05bf6453a0126a2297e060ca9447eac5ee127fe30ab498

See more details on using hashes here.

Provenance

The following attestation bundles were made for queuebridge-0.1.1.tar.gz:

Publisher: publish.yml on false200/queuebridge

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file queuebridge-0.1.1-py3-none-any.whl.

File metadata

  • Download URL: queuebridge-0.1.1-py3-none-any.whl
  • Upload date:
  • Size: 16.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for queuebridge-0.1.1-py3-none-any.whl
Algorithm Hash digest
SHA256 0de45a10685a7522e29f98baca52cfa570e9147429c393deaed53b72fc3f960a
MD5 aad0de5b258da3d5b752978bab599343
BLAKE2b-256 313148769ba60360b7af118dbd2d838988e3f7a78940d935a7c747edb533b851

See more details on using hashes here.

Provenance

The following attestation bundles were made for queuebridge-0.1.1-py3-none-any.whl:

Publisher: publish.yml on false200/queuebridge

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page