Skip to main content

Bidirectional Pydantic serialization for Celery, Dramatiq, and Arq

Project description

queuebridge

PyPI version Python CI License: MIT

Bidirectional Pydantic serialization for Celery, Dramatiq, and Arq. One shared wire codec: pass models on enqueue, get models back from results.

Celery 5.5+ pydantic=True only validates on the worker. Callers still model_dump() before .delay(), and .get() returns a dict. Dramatiq chokes on models and UUIDs. Arq defaults to pickle. queuebridge fixes all three with a thin codec + backend adapters.

Install

pip install queuebridge

Extras:

pip install queuebridge[celery]     # Celery + Kombu
pip install queuebridge[dramatiq]   # Dramatiq
pip install queuebridge[arq]        # Arq + msgpack
pip install queuebridge[all]        # all backends

Requires Python 3.10+ and Pydantic v2.

Documentation: https://queuebridge.readthedocs.io

Usage

Celery

from celery import Celery
from queuebridge.celery import register_queuebridge, typed_result
from myapp.models import OrderCreate, OrderResult

app = Celery("orders", broker="redis://localhost:6379/0")
register_queuebridge(app)

@app.task(pydantic=True)
def process_order(order: OrderCreate) -> OrderResult:
    return OrderResult(id=order.id, status="processed")

ar = process_order.delay(OrderCreate(id=1, sku="ABC"))
result = typed_result(ar, OrderResult).get(timeout=10)

Dramatiq

import dramatiq
from pydantic import validate_call
from queuebridge.dramatiq import register_queuebridge
from myapp.models import OrderCreate

register_queuebridge()

@dramatiq.actor
@validate_call
def process(order: OrderCreate):
    print(order)

process.send(OrderCreate(id=1, sku="ABC"))

Arq

from arq.connections import RedisSettings
from pydantic import validate_call
from queuebridge.arq import get_serializer_pair, qb_task, typed_result
from myapp.models import OrderCreate, OrderResult

serialize, deserialize = get_serializer_pair()

@qb_task
@validate_call
async def process_order(ctx, order: OrderCreate) -> OrderResult:
    return OrderResult(id=order.id, status="ok")

class WorkerSettings:
    functions = [process_order]
    redis_settings = RedisSettings()
    job_serializer = serialize
    job_deserializer = deserialize

API

encode(value, *, tag_models=True)

Recursively transform a Python value into a JSON-serializable structure.

value

Required
Type: Any

The value to encode: Pydantic models, nested containers, UUID, datetime, Decimal, Enum, etc.

tag_models

Type: boolean
Default: true

When true, BaseModel instances are wrapped in a __qb__ envelope with a fully-qualified type name. When false, models are dumped with model_dump(mode="json") only.

from queuebridge import encode, decode
from myapp.models import OrderCreate

wire = encode(OrderCreate(id=1, sku="ABC"))
restored = decode(wire, OrderCreate)

decode(value, hint=Any, *, strict=False)

Recursively decode a wire value back to Python using an optional type hint.

value

Required
Type: Any

Wire value: primitives, lists, dicts, or __qb__ envelopes.

hint

Type: Any
Default: Any

Type hint used for validation. TypeAdapter(hint).validate_python() is used when the hint is concrete.

strict

Type: boolean
Default: false

When true, raise QueuebridgeDecodeError if the value cannot be decoded.


decode_wire(value)

Recursively unwrap __qb__ envelopes without type hints. Used internally by Dramatiq's decoder.

Type: Any -> Any


register_queuebridge(app, *, strict=False) (Celery)

Register the queuebridge-json Kombu serializer on a Celery app. Idempotent: safe to call twice.

app

Required
Type: celery.Celery

strict

Type: boolean
Default: false

Reserved for future strict decode behavior.

Sets task_serializer, result_serializer, and accept_content on the app.


typed_result(async_result, return_type) (Celery)

Wrap a Celery AsyncResult so .get() returns a Pydantic model instead of a dict.

async_result

Required
Type: celery.result.AsyncResult

return_type

Required
Type: type[T]

Returns TypedAsyncResult[T], which proxies .id, .state, .ready(), etc.

Celery cannot safely monkey-patch AsyncResult.get() globally. Use typed_result() on the client.


register_queuebridge(broker=None) (Dramatiq)

Install QueuebridgeEncoder via dramatiq.set_encoder(). Call once at process startup.

broker

Type: dramatiq.Broker | None
Default: None

If provided, also calls dramatiq.set_broker(broker).


get_serializer_pair() (Arq)

Returns (serialize, deserialize) callables for job_serializer / job_deserializer.

serialize, deserialize = get_serializer_pair()

Uses msgpack over queuebridge-encoded dicts. Set on both WorkerSettings and create_pool().


qb_task(fn) (Arq)

Decorator that decodes wire args/kwargs using function type hints before your async task runs.

Apply outside @validate_call:

@qb_task
@validate_call
async def process_order(ctx, order: OrderCreate) -> OrderResult:
    ...

typed_result(job, return_type) (Arq)

result = await typed_result(job, OrderResult)

Decode the job.result() payload into a Pydantic model.

Wire format

Non-JSON-native values use a tagged envelope:

{
  "__qb__": {
    "t": "myapp.models.OrderCreate",
    "v": 1,
    "d": {"id": 1, "sku": "ABC"}
  }
}
Python type Encode Decode
BaseModel envelope + model_dump(mode="json") model_validate or FQN import
UUID, datetime, Decimal, Enum tagged envelope builtin dispatch
list, dict, set, tuple recurse recurse via hint
Primitives pass through pass through

A plain dict + OrderCreate hint still validates. Tags are for ambiguity, not required when hints are known.

Why not Celery pydantic=True alone?

Producer                    Worker                      Client
------                      ------                      ------
.delay(model)  FAIL>        pydantic=True validates     .get() -> dict
(model_dump() required)     args on worker only

Comparison

Solution Celery Dramatiq Arq Typed .get()
Celery pydantic=True worker only n/a n/a no
Blog / msgpack hacks partial partial partial varies
queuebridge yes yes yes yes

Security

Deserialization resolves types by fully-qualified name (import_fqn). Only deserialize from brokers you trust.

ALLOWED_MODULE_PREFIXES allowlisting is planned for v0.2.

Examples

Path Description
examples/celery_fastapi/ FastAPI enqueue + typed result polling
examples/dramatiq_example/ Dramatiq + validate_call
examples/arq_example/ Arq worker with custom serializers
examples/smoke_test_complex.py End-to-end smoke test (no Redis)
pypi_verify/run_complex.py PyPI install verification script

Related

License

MIT. See LICENSE.

Community

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

queuebridge-0.1.2.tar.gz (30.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

queuebridge-0.1.2-py3-none-any.whl (16.2 kB view details)

Uploaded Python 3

File details

Details for the file queuebridge-0.1.2.tar.gz.

File metadata

  • Download URL: queuebridge-0.1.2.tar.gz
  • Upload date:
  • Size: 30.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for queuebridge-0.1.2.tar.gz
Algorithm Hash digest
SHA256 b41bff99785cfeccce3976e927109d258321a13c7986fd5cb23a21e492683688
MD5 50c514184a5ddc27d77176adbe64208d
BLAKE2b-256 a3f8c86c44b279ddb782376ed1e81f26bbe73b99560170dfe8057883e75de9fa

See more details on using hashes here.

Provenance

The following attestation bundles were made for queuebridge-0.1.2.tar.gz:

Publisher: publish.yml on false200/queuebridge

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file queuebridge-0.1.2-py3-none-any.whl.

File metadata

  • Download URL: queuebridge-0.1.2-py3-none-any.whl
  • Upload date:
  • Size: 16.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for queuebridge-0.1.2-py3-none-any.whl
Algorithm Hash digest
SHA256 913805b336c78114f82405c6cb5a9a7927164bda1094357f93e1112944f89e6b
MD5 dad58c25e4c163b1915405a95442d2a8
BLAKE2b-256 da3efd408c1efb630e7f7047076e7cd9f4c995d481f2292b57b9ea96a51c85ad

See more details on using hashes here.

Provenance

The following attestation bundles were made for queuebridge-0.1.2-py3-none-any.whl:

Publisher: publish.yml on false200/queuebridge

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page