Bidirectional Pydantic serialization for Celery, Dramatiq, and Arq
Project description
queuebridge
Pass Pydantic models to .delay() / .send() / enqueue_job() — get models back from results.
Bidirectional Pydantic typing for Celery, Dramatiq, and Arq with one shared wire codec.
The problem
Celery 5.5+ added pydantic=True, but it only validates on the worker:
- Callers must still
model_dump()before.delay()— passing a model raisesTypeError: Object of type X is not JSON serializable(celery#9442) .get()returns adict, not your model
Dramatiq's default JSON encoder fails on models, UUIDs, and datetimes (dramatiq#660).
Arq defaults to pickle with no Pydantic story (arq#497).
Producer Worker Client
──────── ────── ──────
.delay(model) ──X──> pydantic=True validates .get() → dict
(model_dump() required) args on worker only
queuebridge fixes the producer side and client-side result decoding with a shared __qb__ tagged wire format.
Install
pip install queuebridge[celery] # Celery + Kombu
pip install queuebridge[dramatiq] # Dramatiq
pip install queuebridge[arq] # Arq + msgpack
pip install queuebridge[all] # everything
Quickstart
Celery
from celery import Celery
from queuebridge.celery import register_queuebridge, typed_result
from myapp.models import OrderCreate, OrderResult
app = Celery("orders", broker="redis://localhost:6379/0")
register_queuebridge(app)
@app.task(pydantic=True)
def process_order(order: OrderCreate) -> OrderResult:
return OrderResult(id=order.id, status="processed")
# Enqueue with a model directly
ar = process_order.delay(OrderCreate(id=1, sku="ABC"))
# Get a model back (not a dict)
result = typed_result(ar, OrderResult).get(timeout=10)
Note: Celery cannot safely monkey-patch
AsyncResult.get()globally. Usetyped_result()for typed client results.
Dramatiq
import dramatiq
from pydantic import validate_call
from queuebridge.dramatiq import register_queuebridge
from myapp.models import OrderCreate
register_queuebridge()
@dramatiq.actor
@validate_call
def process(order: OrderCreate):
print(order)
process.send(OrderCreate(id=1, sku="ABC"))
Arq
from arq.connections import RedisSettings
from pydantic import validate_call
from queuebridge.arq import get_serializer_pair, qb_task, typed_result
from myapp.models import OrderCreate, OrderResult
serialize, deserialize = get_serializer_pair()
@qb_task
@validate_call
async def process_order(ctx, order: OrderCreate) -> OrderResult:
return OrderResult(id=order.id, status="ok")
class WorkerSettings:
functions = [process_order]
redis_settings = RedisSettings()
job_serializer = serialize
job_deserializer = deserialize
Wire format
Non-JSON-native values are wrapped in a tagged envelope:
{
"__qb__": {
"t": "myapp.models.OrderCreate",
"v": 1,
"d": {"id": 1, "sku": "ABC"}
}
}
Decode uses function type hints (TypeAdapter) when tags are absent — a plain dict + OrderCreate hint still validates.
Security
Deserialization resolves types by fully-qualified name (import_fqn). Only deserialize from brokers you trust. Module allowlisting is planned for v0.2.
Comparison
| Solution | Celery | Dramatiq | Arq | Bidirectional .get() |
|---|---|---|---|---|
Celery pydantic=True |
worker only | — | — | no |
| Blog / msgpack hacks | partial | partial | partial | varies |
| queuebridge | yes | yes | yes | yes (typed_result) |
Roadmap
allowed_modulessecurity filter onregister_queuebridge()- Optional pickle extra
- Chord / chain signature support
License
MIT
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file queuebridge-0.1.0.tar.gz.
File metadata
- Download URL: queuebridge-0.1.0.tar.gz
- Upload date:
- Size: 13.2 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
aabef639ff05a5d2826fd59083ab6fc7125eed96e73cb122dd22abecbf473985
|
|
| MD5 |
2bb47bf5a1f52eb6503a70bacea42df8
|
|
| BLAKE2b-256 |
2a1c5326c3b77eba145fcbe05c4cc33ac499b202178ac1174e99afeb35efbba2
|
Provenance
The following attestation bundles were made for queuebridge-0.1.0.tar.gz:
Publisher:
publish.yml on false200/queuebridge
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
queuebridge-0.1.0.tar.gz -
Subject digest:
aabef639ff05a5d2826fd59083ab6fc7125eed96e73cb122dd22abecbf473985 - Sigstore transparency entry: 1951051197
- Sigstore integration time:
-
Permalink:
false200/queuebridge@81486481b688ef19fb574d37e86b5cdf17a50081 -
Branch / Tag:
refs/heads/main - Owner: https://github.com/false200
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@81486481b688ef19fb574d37e86b5cdf17a50081 -
Trigger Event:
workflow_dispatch
-
Statement type:
File details
Details for the file queuebridge-0.1.0-py3-none-any.whl.
File metadata
- Download URL: queuebridge-0.1.0-py3-none-any.whl
- Upload date:
- Size: 11.6 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
150095864b3fed0b9a58956a7e00d4f301eca883b64a4221cf0ccc0700f11600
|
|
| MD5 |
bae815db05b55d82261e5c086af33ee7
|
|
| BLAKE2b-256 |
313e839fd185d5fe9cfaf47b209786fd65931d0965098886230807da4c40373d
|
Provenance
The following attestation bundles were made for queuebridge-0.1.0-py3-none-any.whl:
Publisher:
publish.yml on false200/queuebridge
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
queuebridge-0.1.0-py3-none-any.whl -
Subject digest:
150095864b3fed0b9a58956a7e00d4f301eca883b64a4221cf0ccc0700f11600 - Sigstore transparency entry: 1951051358
- Sigstore integration time:
-
Permalink:
false200/queuebridge@81486481b688ef19fb574d37e86b5cdf17a50081 -
Branch / Tag:
refs/heads/main - Owner: https://github.com/false200
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@81486481b688ef19fb574d37e86b5cdf17a50081 -
Trigger Event:
workflow_dispatch
-
Statement type: