Skip to main content

Polyglot Queues, Simplified — the Python core: the canonical BabelQueue wire-envelope codec, contracts and dead-letter helpers.

Project description

BabelQueue for Python

CI PyPI Python License: MIT

Polyglot Queues, Simplified. Read and write the canonical BabelQueue message envelope from Python — so your Python services (AI/ML, data processing, …) exchange messages with Laravel, Symfony, Go, .NET and Node over one strict JSON format, on the broker you already run.

This is the framework-agnostic Python core: the wire-envelope codec, contracts, and dead-letter helpers — zero runtime dependencies (standard library only). The full standard is documented at babelqueue.com.

Installation

pip install babelqueue

Requires Python >=3.9.

Usage

from babelqueue import EnvelopeCodec

# Produce — build the canonical envelope and publish the JSON to your broker.
envelope = EnvelopeCodec.make("urn:babel:orders:created", {"order_id": 1042})
body = EnvelopeCodec.encode(envelope)        # -> UTF-8 JSON string
# redis.rpush("queues:orders", body)  /  channel.basic_publish(body=body, ...)

# Consume — decode a message produced by ANY BabelQueue SDK.
incoming = EnvelopeCodec.decode(body)
urn      = incoming["job"]          # "urn:babel:orders:created"
data     = incoming["data"]         # {"order_id": 1042}
trace_id = incoming["trace_id"]     # correlate across services

The envelope is identical to every other SDK's:

{
  "job": "urn:babel:orders:created",
  "trace_id": "…",
  "data": { "order_id": 1042 },
  "meta": { "id": "…", "queue": "default", "lang": "python", "schema_version": 1, "created_at": 1749132727000 },
  "attempts": 0
}

Typed messages (optional)

from babelqueue import EnvelopeCodec, PolyglotMessage

class OrderCreated:                      # structurally a PolyglotMessage
    def __init__(self, order_id: int):
        self.order_id = order_id
    def get_babel_urn(self) -> str:
        return "urn:babel:orders:created"
    def to_payload(self) -> dict:
        return {"order_id": self.order_id}

envelope = EnvelopeCodec.from_message(OrderCreated(1042), queue="orders")

Continue an existing trace by adding get_babel_trace_id(self) -> str | None (see HasTraceId), or pass trace_id= to EnvelopeCodec.make.

Dead-letter

from babelqueue import dead_letter

dlq = dead_letter.annotate(envelope, "failed", "orders", attempts=3, error="boom")
# publish `EnvelopeCodec.encode(dlq)` to the "orders.dlq" queue

Runtime — produce & consume

For an end-to-end app, use BabelQueue with a broker. Broker clients come via extras:

pip install "babelqueue[redis]"   # redis://
pip install "babelqueue[amqp]"    # amqp:// (RabbitMQ)
from babelqueue import BabelQueue

app = BabelQueue("redis://localhost:6379/0", queue="orders")
# or: BabelQueue("amqp://guest:guest@localhost:5672/", queue="orders")

@app.handler("urn:babel:orders:created")
def on_order_created(data, meta):       # AI/ML, data processing, anything
    print("order", data["order_id"])

# producer (any service, any language) …
app.publish("urn:babel:orders:created", {"order_id": 1042})

# worker
app.run()                               # consume forever (Ctrl-C to stop)
  • Routing is by URN; the wire format is the canonical envelope, so this consumes messages produced by any BabelQueue SDK.
  • Handlers receive (data, meta), or (data, meta, message) to get the full envelope (incl. trace_id).
  • Retry & dead-letter: failures are retried up to max_attempts (bumping the envelope's attempts); enable dead_letter=True to quarantine exhausted messages on <queue>.dlq. on_unknown_urn = fail | delete | release | dead_letter.
  • Transports: redis:// (reliable-queue pattern), amqp:// (RabbitMQ via pika, with the contract AMQP properties) and memory:// (in-process, great for tests/local). Bring your own by passing transport=....

Framework adapters — Celery & Django

Celery (pip install "babelqueue[celery]") — reuse your Celery app's broker for polyglot interop, and consume inbound messages as a Celery worker bootstep:

from babelqueue.celery import from_celery, install_worker

bq = from_celery(celery_app, queue="orders")    # runtime on Celery's broker
bq.publish("urn:babel:orders:created", {"order_id": 1042})

@bq.handler("urn:babel:orders:created")
def on_created(data, meta): ...

install_worker(celery_app, bq)                   # `celery worker` also drains URN messages

Django (pip install "babelqueue[django]") — add "babelqueue.django" to INSTALLED_APPS and configure a BABELQUEUE dict:

# settings.py
BABELQUEUE = {"broker_url": "redis://localhost:6379/0", "queue": "orders", "dead_letter": True}
from babelqueue.django import publish, get_app

publish("urn:babel:orders:created", {"order_id": 1042})   # in a view / signal

@get_app().handler("urn:babel:orders:created")            # register handlers at startup
def on_created(data, meta): ...
python manage.py babelqueue_worker --queue orders          # run the consumer

What's here

The codec/contracts/dead-letter (zero-dep core), the BabelQueue runtime (in-memory built in; Redis via [redis], RabbitMQ via [amqp]), and framework adapters for Celery ([celery]) and Django ([django]). Every layer speaks the one canonical envelope, so it interoperates with the PHP/Laravel, Symfony, Go, Node and .NET SDKs.

Testing

pip install -e ".[dev]"
pytest
# (or, dependency-free) python -m unittest discover -s tests

License

MIT © Muhammet Şafak. See LICENSE.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

babelqueue-1.0.0.tar.gz (25.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

babelqueue-1.0.0-py3-none-any.whl (20.4 kB view details)

Uploaded Python 3

File details

Details for the file babelqueue-1.0.0.tar.gz.

File metadata

  • Download URL: babelqueue-1.0.0.tar.gz
  • Upload date:
  • Size: 25.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for babelqueue-1.0.0.tar.gz
Algorithm Hash digest
SHA256 56f06b1744d1e7bd2058420cd5eb4a0adf7a4586ae7dd3c89cc1fbbfbe9c8832
MD5 c4797018c27bb933b88473203d1e7e5e
BLAKE2b-256 8883be322c2806376d3040e0cbfc1906b69b1f4d3bbf76a1ea5e1793624c0e5d

See more details on using hashes here.

Provenance

The following attestation bundles were made for babelqueue-1.0.0.tar.gz:

Publisher: release.yml on BabelQueue/babelqueue-python

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file babelqueue-1.0.0-py3-none-any.whl.

File metadata

  • Download URL: babelqueue-1.0.0-py3-none-any.whl
  • Upload date:
  • Size: 20.4 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for babelqueue-1.0.0-py3-none-any.whl
Algorithm Hash digest
SHA256 69148f7eec496181ed629a38a027743dd009ea385cf7395fa2de138bdff6da95
MD5 e51122f93e1518838b2b6989f37e2d4b
BLAKE2b-256 ed25941aeaccec889dbd3432f6f6706ebc5d3e736af0d6ffde3cb3cea1903bef

See more details on using hashes here.

Provenance

The following attestation bundles were made for babelqueue-1.0.0-py3-none-any.whl:

Publisher: release.yml on BabelQueue/babelqueue-python

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page