Skip to main content

Polyglot Queues, Simplified — the Python core: the canonical BabelQueue wire-envelope codec, contracts and dead-letter helpers.

Project description

BabelQueue for Python

CI PyPI Python License: MIT

Polyglot Queues, Simplified. Read and write the canonical BabelQueue message envelope from Python — so your Python services (AI/ML, data processing, …) exchange messages with Laravel, Symfony, Go, .NET and Node over one strict JSON format, on the broker you already run.

This is the framework-agnostic Python core: the wire-envelope codec, contracts, and dead-letter helpers — zero runtime dependencies (standard library only). The full standard is documented at babelqueue.com.

Installation

pip install babelqueue

Requires Python >=3.9.

Usage

from babelqueue import EnvelopeCodec

# Produce — build the canonical envelope and publish the JSON to your broker.
envelope = EnvelopeCodec.make("urn:babel:orders:created", {"order_id": 1042})
body = EnvelopeCodec.encode(envelope)        # -> UTF-8 JSON string
# redis.rpush("queues:orders", body)  /  channel.basic_publish(body=body, ...)

# Consume — decode a message produced by ANY BabelQueue SDK.
incoming = EnvelopeCodec.decode(body)
urn      = incoming["job"]          # "urn:babel:orders:created"
data     = incoming["data"]         # {"order_id": 1042}
trace_id = incoming["trace_id"]     # correlate across services

The envelope is identical to every other SDK's:

{
  "job": "urn:babel:orders:created",
  "trace_id": "…",
  "data": { "order_id": 1042 },
  "meta": { "id": "…", "queue": "default", "lang": "python", "schema_version": 1, "created_at": 1749132727000 },
  "attempts": 0
}

Typed messages (optional)

from babelqueue import EnvelopeCodec, PolyglotMessage

class OrderCreated:                      # structurally a PolyglotMessage
    def __init__(self, order_id: int):
        self.order_id = order_id
    def get_babel_urn(self) -> str:
        return "urn:babel:orders:created"
    def to_payload(self) -> dict:
        return {"order_id": self.order_id}

envelope = EnvelopeCodec.from_message(OrderCreated(1042), queue="orders")

Continue an existing trace by adding get_babel_trace_id(self) -> str | None (see HasTraceId), or pass trace_id= to EnvelopeCodec.make.

Dead-letter

from babelqueue import dead_letter

dlq = dead_letter.annotate(envelope, "failed", "orders", attempts=3, error="boom")
# publish `EnvelopeCodec.encode(dlq)` to the "orders.dlq" queue

Runtime — produce & consume

For an end-to-end app, use BabelQueue with a broker. Broker clients come via extras:

pip install "babelqueue[redis]"   # redis://
pip install "babelqueue[amqp]"    # amqp:// (RabbitMQ)
from babelqueue import BabelQueue

app = BabelQueue("redis://localhost:6379/0", queue="orders")
# or: BabelQueue("amqp://guest:guest@localhost:5672/", queue="orders")

@app.handler("urn:babel:orders:created")
def on_order_created(data, meta):       # AI/ML, data processing, anything
    print("order", data["order_id"])

# producer (any service, any language) …
app.publish("urn:babel:orders:created", {"order_id": 1042})

# worker
app.run()                               # consume forever (Ctrl-C to stop)
  • Routing is by URN; the wire format is the canonical envelope, so this consumes messages produced by any BabelQueue SDK.
  • Handlers receive (data, meta), or (data, meta, message) to get the full envelope (incl. trace_id).
  • Retry & dead-letter: failures are retried up to max_attempts (bumping the envelope's attempts); enable dead_letter=True to quarantine exhausted messages on <queue>.dlq. on_unknown_urn = fail | delete | release | dead_letter.
  • Transports: redis:// (reliable-queue pattern), amqp:// (RabbitMQ via pika, with the contract AMQP properties) and memory:// (in-process, great for tests/local). Bring your own by passing transport=....

Framework adapters — Celery & Django

Celery (pip install "babelqueue[celery]") — reuse your Celery app's broker for polyglot interop, and consume inbound messages as a Celery worker bootstep:

from babelqueue.celery import from_celery, install_worker

bq = from_celery(celery_app, queue="orders")    # runtime on Celery's broker
bq.publish("urn:babel:orders:created", {"order_id": 1042})

@bq.handler("urn:babel:orders:created")
def on_created(data, meta): ...

install_worker(celery_app, bq)                   # `celery worker` also drains URN messages

Django (pip install "babelqueue[django]") — add "babelqueue.django" to INSTALLED_APPS and configure a BABELQUEUE dict:

# settings.py
BABELQUEUE = {"broker_url": "redis://localhost:6379/0", "queue": "orders", "dead_letter": True}
from babelqueue.django import publish, get_app

publish("urn:babel:orders:created", {"order_id": 1042})   # in a view / signal

@get_app().handler("urn:babel:orders:created")            # register handlers at startup
def on_created(data, meta): ...
python manage.py babelqueue_worker --queue orders          # run the consumer

What's here

The codec/contracts/dead-letter (zero-dep core), the BabelQueue runtime (in-memory built in; Redis via [redis], RabbitMQ via [amqp]), and framework adapters for Celery ([celery]) and Django ([django]). Every layer speaks the one canonical envelope, so it interoperates with the PHP/Laravel, Symfony, Go, Node and .NET SDKs.

Testing

pip install -e ".[dev]"
pytest
# (or, dependency-free) python -m unittest discover -s tests

License

MIT © Muhammet Şafak. See LICENSE.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

babelqueue-0.5.0.tar.gz (23.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

babelqueue-0.5.0-py3-none-any.whl (20.2 kB view details)

Uploaded Python 3

File details

Details for the file babelqueue-0.5.0.tar.gz.

File metadata

  • Download URL: babelqueue-0.5.0.tar.gz
  • Upload date:
  • Size: 23.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for babelqueue-0.5.0.tar.gz
Algorithm Hash digest
SHA256 e64dd52a57af41d6e706d9927a2c8c8c78dd2679771029998541aa8948dd3e6e
MD5 63c7397620cd86fefc6c1e6179021c91
BLAKE2b-256 c949288b2612400e221150f33b6caf3b035d659dce51d74ff22f367b7874eb9a

See more details on using hashes here.

Provenance

The following attestation bundles were made for babelqueue-0.5.0.tar.gz:

Publisher: release.yml on BabelQueue/babelqueue-python

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file babelqueue-0.5.0-py3-none-any.whl.

File metadata

  • Download URL: babelqueue-0.5.0-py3-none-any.whl
  • Upload date:
  • Size: 20.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for babelqueue-0.5.0-py3-none-any.whl
Algorithm Hash digest
SHA256 fed8084340d6d32955d916f291bb82ec703add2c176e643feff54eee9b389f45
MD5 1f1b3c5eb6507df2c70beaa3247ae4ed
BLAKE2b-256 18c006b4c42965cd76448f7b34e177aede77dd80c2162509d5d7792b2c53568d

See more details on using hashes here.

Provenance

The following attestation bundles were made for babelqueue-0.5.0-py3-none-any.whl:

Publisher: release.yml on BabelQueue/babelqueue-python

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page