Skip to main content

Polyglot Queues, Simplified — the Python core: the canonical BabelQueue wire-envelope codec, contracts and dead-letter helpers.

Project description

BabelQueue for Python

CI PyPI Python License: MIT

Polyglot Queues, Simplified. Read and write the canonical BabelQueue message envelope from Python — so your Python services (AI/ML, data processing, …) exchange messages with Laravel, Symfony, Go, .NET and Node over one strict JSON format, on the broker you already run.

This is the framework-agnostic Python core: the wire-envelope codec, contracts, and dead-letter helpers — zero runtime dependencies (standard library only). The full standard is documented at babelqueue.com.

Installation

pip install babelqueue

Requires Python >=3.9.

Usage

from babelqueue import EnvelopeCodec

# Produce — build the canonical envelope and publish the JSON to your broker.
envelope = EnvelopeCodec.make("urn:babel:orders:created", {"order_id": 1042})
body = EnvelopeCodec.encode(envelope)        # -> UTF-8 JSON string
# redis.rpush("queues:orders", body)  /  channel.basic_publish(body=body, ...)

# Consume — decode a message produced by ANY BabelQueue SDK.
incoming = EnvelopeCodec.decode(body)
urn      = incoming["job"]          # "urn:babel:orders:created"
data     = incoming["data"]         # {"order_id": 1042}
trace_id = incoming["trace_id"]     # correlate across services

The envelope is identical to every other SDK's:

{
  "job": "urn:babel:orders:created",
  "trace_id": "…",
  "data": { "order_id": 1042 },
  "meta": { "id": "…", "queue": "default", "lang": "python", "schema_version": 1, "created_at": 1749132727000 },
  "attempts": 0
}

Typed messages (optional)

from babelqueue import EnvelopeCodec, PolyglotMessage

class OrderCreated:                      # structurally a PolyglotMessage
    def __init__(self, order_id: int):
        self.order_id = order_id
    def get_babel_urn(self) -> str:
        return "urn:babel:orders:created"
    def to_payload(self) -> dict:
        return {"order_id": self.order_id}

envelope = EnvelopeCodec.from_message(OrderCreated(1042), queue="orders")

Continue an existing trace by adding get_babel_trace_id(self) -> str | None (see HasTraceId), or pass trace_id= to EnvelopeCodec.make.

Dead-letter

from babelqueue import dead_letter

dlq = dead_letter.annotate(envelope, "failed", "orders", attempts=3, error="boom")
# publish `EnvelopeCodec.encode(dlq)` to the "orders.dlq" queue

Runtime — produce & consume

For an end-to-end app, use BabelQueue with a broker. Redis support comes via an extra:

pip install "babelqueue[redis]"
from babelqueue import BabelQueue

app = BabelQueue("redis://localhost:6379/0", queue="orders")

@app.handler("urn:babel:orders:created")
def on_order_created(data, meta):       # AI/ML, data processing, anything
    print("order", data["order_id"])

# producer (any service, any language) …
app.publish("urn:babel:orders:created", {"order_id": 1042})

# worker
app.run()                               # consume forever (Ctrl-C to stop)
  • Routing is by URN; the wire format is the canonical envelope, so this consumes messages produced by any BabelQueue SDK.
  • Handlers receive (data, meta), or (data, meta, message) to get the full envelope (incl. trace_id).
  • Retry & dead-letter: failures are retried up to max_attempts (bumping the envelope's attempts); enable dead_letter=True to quarantine exhausted messages on <queue>.dlq. on_unknown_urn = fail | delete | release | dead_letter.
  • Transports: redis:// (reliable-queue pattern) and memory:// (in-process, great for tests/local). Bring your own by passing transport=....

RabbitMQ (pika) and Celery / Django adapters are the next iterations.

What's here

The codec/contracts/dead-letter (zero-dep core) and the BabelQueue runtime above (in-memory built in; Redis via the [redis] extra). For framework integration, the Celery and Django adapters are planned.

Testing

pip install -e ".[dev]"
pytest
# (or, dependency-free) python -m unittest discover -s tests

License

MIT © Muhammet Şafak. See LICENSE.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

babelqueue-0.2.0.tar.gz (14.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

babelqueue-0.2.0-py3-none-any.whl (13.5 kB view details)

Uploaded Python 3

File details

Details for the file babelqueue-0.2.0.tar.gz.

File metadata

  • Download URL: babelqueue-0.2.0.tar.gz
  • Upload date:
  • Size: 14.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for babelqueue-0.2.0.tar.gz
Algorithm Hash digest
SHA256 a8ff186ca2666fd1b0af3ba3957ca8b2d6260e10aef537dd592c7bf6ddd8c532
MD5 412807232edbc29a7982463d2c35598c
BLAKE2b-256 2ccb060a5763897d9a82132558bd7bfebe6c32d73e7684b923836eda6b8480f8

See more details on using hashes here.

Provenance

The following attestation bundles were made for babelqueue-0.2.0.tar.gz:

Publisher: release.yml on BabelQueue/babelqueue-python

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file babelqueue-0.2.0-py3-none-any.whl.

File metadata

  • Download URL: babelqueue-0.2.0-py3-none-any.whl
  • Upload date:
  • Size: 13.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for babelqueue-0.2.0-py3-none-any.whl
Algorithm Hash digest
SHA256 d0f3dd34b44c2eaf73f01e6acb68ecd8d3d0da59c64052cfde2d7972b7a12ebd
MD5 14b47da07e3779ed5c595eb523087d5c
BLAKE2b-256 d294ffb31e782c57ae48a7286910599ea1e518a18ba95e31ca2a815d6a736f59

See more details on using hashes here.

Provenance

The following attestation bundles were made for babelqueue-0.2.0-py3-none-any.whl:

Publisher: release.yml on BabelQueue/babelqueue-python

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page