Bidirectional Pydantic serialization for Celery, Dramatiq, and Arq
Project description
queuebridge
Bidirectional Pydantic serialization for Celery, Dramatiq, and Arq. One shared wire codec: pass models on enqueue, get models back from results.
Celery 5.5+ pydantic=True only validates on the worker. Callers still model_dump() before .delay(), and .get() returns a dict. Dramatiq chokes on models and UUIDs. Arq defaults to pickle. queuebridge fixes all three with a thin codec + backend adapters.
Install
pip install queuebridge
Extras:
pip install queuebridge[celery] # Celery + Kombu
pip install queuebridge[dramatiq] # Dramatiq
pip install queuebridge[arq] # Arq + msgpack
pip install queuebridge[all] # all backends
Requires Python 3.10+ and Pydantic v2.
Documentation: https://queuebridge.readthedocs.io
Usage
Celery
from celery import Celery
from queuebridge.celery import register_queuebridge, typed_result
from myapp.models import OrderCreate, OrderResult
app = Celery("orders", broker="redis://localhost:6379/0")
register_queuebridge(app)
@app.task(pydantic=True)
def process_order(order: OrderCreate) -> OrderResult:
return OrderResult(id=order.id, status="processed")
ar = process_order.delay(OrderCreate(id=1, sku="ABC"))
result = typed_result(ar, OrderResult).get(timeout=10)
Dramatiq
import dramatiq
from pydantic import validate_call
from queuebridge.dramatiq import register_queuebridge
from myapp.models import OrderCreate
register_queuebridge()
@dramatiq.actor
@validate_call
def process(order: OrderCreate):
print(order)
process.send(OrderCreate(id=1, sku="ABC"))
Arq
from arq.connections import RedisSettings
from pydantic import validate_call
from queuebridge.arq import get_serializer_pair, qb_task, typed_result
from myapp.models import OrderCreate, OrderResult
serialize, deserialize = get_serializer_pair()
@qb_task
@validate_call
async def process_order(ctx, order: OrderCreate) -> OrderResult:
return OrderResult(id=order.id, status="ok")
class WorkerSettings:
functions = [process_order]
redis_settings = RedisSettings()
job_serializer = serialize
job_deserializer = deserialize
API
encode(value, *, tag_models=True)
Recursively transform a Python value into a JSON-serializable structure.
value
Required
Type: Any
The value to encode: Pydantic models, nested containers, UUID, datetime, Decimal, Enum, etc.
tag_models
Type: boolean
Default: true
When true, BaseModel instances are wrapped in a __qb__ envelope with a fully-qualified type name. When false, models are dumped with model_dump(mode="json") only.
from queuebridge import encode, decode
from myapp.models import OrderCreate
wire = encode(OrderCreate(id=1, sku="ABC"))
restored = decode(wire, OrderCreate)
decode(value, hint=Any, *, strict=False)
Recursively decode a wire value back to Python using an optional type hint.
value
Required
Type: Any
Wire value: primitives, lists, dicts, or __qb__ envelopes.
hint
Type: Any
Default: Any
Type hint used for validation. TypeAdapter(hint).validate_python() is used when the hint is concrete.
strict
Type: boolean
Default: false
When true, raise QueuebridgeDecodeError if the value cannot be decoded.
decode_wire(value)
Recursively unwrap __qb__ envelopes without type hints. Used internally by Dramatiq's decoder.
Type: Any -> Any
register_queuebridge(app, *, strict=False) (Celery)
Register the queuebridge-json Kombu serializer on a Celery app. Idempotent: safe to call twice.
app
Required
Type: celery.Celery
strict
Type: boolean
Default: false
Reserved for future strict decode behavior.
Sets task_serializer, result_serializer, and accept_content on the app.
typed_result(async_result, return_type) (Celery)
Wrap a Celery AsyncResult so .get() returns a Pydantic model instead of a dict.
async_result
Required
Type: celery.result.AsyncResult
return_type
Required
Type: type[T]
Returns TypedAsyncResult[T], which proxies .id, .state, .ready(), etc.
Celery cannot safely monkey-patch
AsyncResult.get()globally. Usetyped_result()on the client.
register_queuebridge(broker=None) (Dramatiq)
Install QueuebridgeEncoder via dramatiq.set_encoder(). Call once at process startup.
broker
Type: dramatiq.Broker | None
Default: None
If provided, also calls dramatiq.set_broker(broker).
get_serializer_pair() (Arq)
Returns (serialize, deserialize) callables for job_serializer / job_deserializer.
serialize, deserialize = get_serializer_pair()
Uses msgpack over queuebridge-encoded dicts. Set on both WorkerSettings and create_pool().
qb_task(fn) (Arq)
Decorator that decodes wire args/kwargs using function type hints before your async task runs.
Apply outside @validate_call:
@qb_task
@validate_call
async def process_order(ctx, order: OrderCreate) -> OrderResult:
...
typed_result(job, return_type) (Arq)
result = await typed_result(job, OrderResult)
Decode the job.result() payload into a Pydantic model.
Wire format
Non-JSON-native values use a tagged envelope:
{
"__qb__": {
"t": "myapp.models.OrderCreate",
"v": 1,
"d": {"id": 1, "sku": "ABC"}
}
}
| Python type | Encode | Decode |
|---|---|---|
BaseModel |
envelope + model_dump(mode="json") |
model_validate or FQN import |
UUID, datetime, Decimal, Enum |
tagged envelope | builtin dispatch |
list, dict, set, tuple |
recurse | recurse via hint |
| Primitives | pass through | pass through |
A plain dict + OrderCreate hint still validates. Tags are for ambiguity, not required when hints are known.
Why not Celery pydantic=True alone?
Producer Worker Client
------ ------ ------
.delay(model) FAIL> pydantic=True validates .get() -> dict
(model_dump() required) args on worker only
- celery#9442: models not JSON-serializable on enqueue
- dramatiq#660: no Pydantic support
- arq#497: pickle default, Pydantic requested
Comparison
| Solution | Celery | Dramatiq | Arq | Typed .get() |
|---|---|---|---|---|
Celery pydantic=True |
worker only | n/a | n/a | no |
| Blog / msgpack hacks | partial | partial | partial | varies |
| queuebridge | yes | yes | yes | yes |
Security
Deserialization resolves types by fully-qualified name (import_fqn). Only deserialize from brokers you trust.
ALLOWED_MODULE_PREFIXES allowlisting is planned for v0.2.
Examples
| Path | Description |
|---|---|
examples/celery_fastapi/ |
FastAPI enqueue + typed result polling |
examples/dramatiq_example/ |
Dramatiq + validate_call |
examples/arq_example/ |
Arq worker with custom serializers |
examples/smoke_test_complex.py |
End-to-end smoke test (no Redis) |
pypi_verify/run_complex.py |
PyPI install verification script |
Related
- Celery Pydantic docs: worker-only validation
- Arq custom serializers: msgpack hook point
- Dramatiq encoders:
set_encoder()extension point
License
MIT. See LICENSE.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file queuebridge-0.1.1.tar.gz.
File metadata
- Download URL: queuebridge-0.1.1.tar.gz
- Upload date:
- Size: 27.1 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
6f11f95c10a796f9e1496037ddc3b22aef361918ff10c818f1979b9c806d58cc
|
|
| MD5 |
b10f14dd83390a6f4db21a0089497153
|
|
| BLAKE2b-256 |
c483ccd776d95db1be05bf6453a0126a2297e060ca9447eac5ee127fe30ab498
|
Provenance
The following attestation bundles were made for queuebridge-0.1.1.tar.gz:
Publisher:
publish.yml on false200/queuebridge
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
queuebridge-0.1.1.tar.gz -
Subject digest:
6f11f95c10a796f9e1496037ddc3b22aef361918ff10c818f1979b9c806d58cc - Sigstore transparency entry: 1951403622
- Sigstore integration time:
-
Permalink:
false200/queuebridge@9006ceaa7b5f57469ff1321598997b55d2e507e1 -
Branch / Tag:
refs/tags/v0.1.1 - Owner: https://github.com/false200
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@9006ceaa7b5f57469ff1321598997b55d2e507e1 -
Trigger Event:
push
-
Statement type:
File details
Details for the file queuebridge-0.1.1-py3-none-any.whl.
File metadata
- Download URL: queuebridge-0.1.1-py3-none-any.whl
- Upload date:
- Size: 16.0 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
0de45a10685a7522e29f98baca52cfa570e9147429c393deaed53b72fc3f960a
|
|
| MD5 |
aad0de5b258da3d5b752978bab599343
|
|
| BLAKE2b-256 |
313148769ba60360b7af118dbd2d838988e3f7a78940d935a7c747edb533b851
|
Provenance
The following attestation bundles were made for queuebridge-0.1.1-py3-none-any.whl:
Publisher:
publish.yml on false200/queuebridge
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
queuebridge-0.1.1-py3-none-any.whl -
Subject digest:
0de45a10685a7522e29f98baca52cfa570e9147429c393deaed53b72fc3f960a - Sigstore transparency entry: 1951404315
- Sigstore integration time:
-
Permalink:
false200/queuebridge@9006ceaa7b5f57469ff1321598997b55d2e507e1 -
Branch / Tag:
refs/tags/v0.1.1 - Owner: https://github.com/false200
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@9006ceaa7b5f57469ff1321598997b55d2e507e1 -
Trigger Event:
push
-
Statement type: