Skip to main content

The fastest open-source message broker for Redis.

Project description

chasquimq (Python)

Python bindings for ChasquiMQ — the fastest open-source message broker for Redis. The Rust engine pulls jobs; Python asyncio handlers process them.

Status: 1.0. abi3 wheels for Python 3.9+ on Linux (x86_64 + aarch64), macOS (x86_64 + aarch64), Windows (x86_64).

Install

pip install chasquimq

Quickstart

import asyncio
from chasquimq import Queue, Worker, Job, BackoffSpec, UnrecoverableError


async def send_email(job: Job) -> dict:
    to = job.data["to"]
    print(f"sending to {to} (attempt {job.attempts_made + 1})")
    if "@unrecoverable" in to:
        raise UnrecoverableError(f"hard bounce: {to}")
    return {"sent_at": time.time(), "to": to}


async def main() -> None:
    async with Queue("emails") as queue, \
               Worker("emails", send_email, store_results=True) as worker:

        # Plain enqueue.
        await queue.add("welcome", {"to": "ada@example.com"})

        # Stable jobId — second call with the same id is a no-op (idempotent).
        await queue.add_unique(
            "welcome", {"to": "alice@example.com"},
            job_id="welcome:alice",
        )

        # Per-job retry with exponential backoff.
        await queue.add(
            "welcome", {"to": "grace@flaky.example"},
            attempts=3,
            backoff=BackoffSpec.exponential(100, multiplier=2.0, max_ms=10_000),
        )

        # Delayed enqueue (in milliseconds; for `timedelta` use delay= instead).
        await queue.add("welcome", {"to": "ka@later.example"}, delay_ms=2_000)

        # Block on a single job's result, with timeout.
        job = await queue.add("welcome", {"to": "ada@example.com"})
        result = await job.wait_for_result(timeout=30.0)
        print(result)

        # Drain the worker.
        await worker.run()


asyncio.run(main())

What's in the box

Surface What it does
Queue Producer + queue inspection. add / add_bulk / add_unique / get_job_result / peek_dlq / replay_dlq / cancel_delayed / get_repeatable_jobs / remove_repeatable_by_key. Async context manager.
Worker Consumer pool. asyncio-first dispatch, opt-in result storage (store_results=True), graceful shutdown. Async context manager.
Job Frozen dataclass returned by Queue.add. Has id, name, data, attempts_made, wait_for_result(timeout=).
QueueEvents Asyncio iterator over the engine events stream. Cross-process pub/sub for completed / failed / dlq / retry-scheduled / delayed.
BackoffSpec Builders: .fixed(delay_ms) / .exponential(initial_ms, multiplier, max_ms, jitter_ms).
RepeatPattern Builders: .cron(expr, tz=) / .every(interval_ms). DST-aware via IANA tz names.
MissedFiresPolicy .skip() / .fire_once() / .fire_all(max_catchup) for cron catch-up after scheduler downtime.
UnrecoverableError Raise from your handler to bypass retries and route the job directly to DLQ.

TLS / rediss://

For TLS-fronted Redis (ElastiCache encryption-in-transit, or any non-cluster Redis with TLS), set tls=True on Queue / Worker / QueueEvents, or pass a rediss:// URL directly:

async with Queue("emails", redis_url="redis://my-cluster.cache.amazonaws.com:6379", tls=True) as queue:
    ...
# or:
async with Queue("emails", redis_url="rediss://my-cluster.cache.amazonaws.com:6379") as queue:
    ...

Trust roots come from the platform store via rustls-native-certs: keychain on macOS, the OS CA bundle on Linux (probed by openssl-probe), system store on Windows — so AWS Trust CA-signed endpoints work out of the box. For private CAs, point SSL_CERT_FILE at a PEM bundle before launching Python; that env var takes precedence over the platform store.

Rotating IAM tokens / credential_provider

For Redis deployments that use short-lived auth tokens — most notably AWS ElastiCache IAM auth, where tokens expire roughly every 15 minutes — pass an async credential_provider callback. The engine calls it before every AUTH / HELLO command (initial connect and every reconnect), so a long-lived Queue / Worker stays authenticated through token rotation without rebuilding.

from typing import Optional, Tuple

import aioboto3  # or your preferred async AWS SDK

from chasquimq import Queue, Worker


async def elasticache_credentials(
    host: Optional[str],
) -> Tuple[Optional[str], Optional[str]]:
    """Called by the engine before every AUTH/HELLO.

    ``host`` is the target server as ``"hostname:port"`` (or ``None`` when
    fred has no specific endpoint to report — e.g. cluster bootstrap).
    Returns ``(username, password)``; either side may be ``None``.
    """
    session = aioboto3.Session()
    async with session.client("elasticache") as ec:
        token = await ec.generate_iam_auth_token(...)
    return ("my-iam-user", token)


async with Queue(
    "emails",
    redis_url="rediss://my-cluster.cache.amazonaws.com:6379",
    credential_provider=elasticache_credentials,
) as queue, Worker(
    "emails",
    send_email,
    redis_url="rediss://my-cluster.cache.amazonaws.com:6379",
    credential_provider=elasticache_credentials,
) as worker:
    ...

Notes:

  • Construction is deferred when a credential_provider is supplied. The callback dispatches back to the asyncio loop that constructed the Queue / Worker, so the engine waits until the first awaited method (queue.add, worker.run, ...) to open the pool — that's the moment a running loop is guaranteed.
  • Auth errors trigger reconnect. The engine's default reconnect_on_auth_error = true means a token-fetch failure is retried on the next AUTH, with exponential backoff. Raise from your callback (or return stale credentials) and the next reconnect picks up a fresh token. A permanently broken provider will retry-loop inside fred until reconnect_max_attempts is exposed to the Python shim.
  • Same callback for both Queue and Worker. Pass the same async function to each — the native producer and consumer each capture their own asyncio-loop reference internally.

Power-user surface

The native engine handles ship from the same top-level package:

from chasquimq import Producer, Consumer, Scheduler

There is one user-facing Job — the high-level dataclass returned by Queue.add and passed to your Worker handler. The native binding's wire-format pyclass is internal-only (chasquimq._native._Job) and not re-exported (mirrors the Node shim).

Build from source

cd chasquimq-py
python -m venv .venv && source .venv/bin/activate
pip install maturin
maturin develop          # editable install
pytest tests/            # smoke + integration tests (requires Redis 8.6+)
maturin build --release  # wheels under target/wheels/

TODOs / known limitations

  • reconnect_max_attempts is not yet exposed to the Python shim. A permanently-misconfigured credential_provider will retry-loop inside fred indefinitely. The engine's ConnectionTuning::reconnect_max_attempts field needs a sibling keyword on Queue / Worker (Python) to cap retries — tracked for a follow-up slice.

See also

License

MIT — see LICENSE at the workspace root.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distributions

No source distribution files available for this release.See tutorial on generating distribution archives.

Built Distributions

If you're not sure about the file name format, learn more about wheel file names.

chasquimq-1.3.0-cp39-abi3-win_amd64.whl (3.5 MB view details)

Uploaded CPython 3.9+Windows x86-64

chasquimq-1.3.0-cp39-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (3.6 MB view details)

Uploaded CPython 3.9+manylinux: glibc 2.17+ x86-64

chasquimq-1.3.0-cp39-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl (3.5 MB view details)

Uploaded CPython 3.9+manylinux: glibc 2.17+ ARM64

chasquimq-1.3.0-cp39-abi3-macosx_11_0_arm64.whl (3.3 MB view details)

Uploaded CPython 3.9+macOS 11.0+ ARM64

chasquimq-1.3.0-cp39-abi3-macosx_10_12_x86_64.whl (3.4 MB view details)

Uploaded CPython 3.9+macOS 10.12+ x86-64

File details

Details for the file chasquimq-1.3.0-cp39-abi3-win_amd64.whl.

File metadata

  • Download URL: chasquimq-1.3.0-cp39-abi3-win_amd64.whl
  • Upload date:
  • Size: 3.5 MB
  • Tags: CPython 3.9+, Windows x86-64
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for chasquimq-1.3.0-cp39-abi3-win_amd64.whl
Algorithm Hash digest
SHA256 59437974fa62e86ac2759908dbdbf318a8221752df7aeb923c7531ac7d562898
MD5 b66eadbd9de4996780683f1c6e7edd95
BLAKE2b-256 7da2761fafb4b38a55f492490164ed1538036169b50e6113511a73139bae50bf

See more details on using hashes here.

Provenance

The following attestation bundles were made for chasquimq-1.3.0-cp39-abi3-win_amd64.whl:

Publisher: py-ci.yml on jotarios/chasquimq

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file chasquimq-1.3.0-cp39-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.

File metadata

File hashes

Hashes for chasquimq-1.3.0-cp39-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl
Algorithm Hash digest
SHA256 e1e3062ce5cfa75e01b1f930a1268a5536f9747bf65e2eb8a4c4af7aa9d390c7
MD5 c321fa405fb23b6746a238b78548ba04
BLAKE2b-256 48e957a6a4e82b9d7eb586afd0b254b4985b5d62b87cb041a063c206296e29d9

See more details on using hashes here.

Provenance

The following attestation bundles were made for chasquimq-1.3.0-cp39-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl:

Publisher: py-ci.yml on jotarios/chasquimq

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file chasquimq-1.3.0-cp39-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl.

File metadata

File hashes

Hashes for chasquimq-1.3.0-cp39-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl
Algorithm Hash digest
SHA256 835273d6eec82ae501782274a7d09a5ecb499e99e9748e5da528f02b3983e8e7
MD5 9328c8967350fca063742c40dbc830c6
BLAKE2b-256 2b2af2ed91730aec3c55a297352bcbe995b4300e48bfb6fb0bd1bbd0175203e1

See more details on using hashes here.

Provenance

The following attestation bundles were made for chasquimq-1.3.0-cp39-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl:

Publisher: py-ci.yml on jotarios/chasquimq

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file chasquimq-1.3.0-cp39-abi3-macosx_11_0_arm64.whl.

File metadata

File hashes

Hashes for chasquimq-1.3.0-cp39-abi3-macosx_11_0_arm64.whl
Algorithm Hash digest
SHA256 407b9253feaa966876ca2b89c377a45dad27b8bfa183b3823958cfca1b0a1118
MD5 0fa942fa51bb55340bee2f171a06aa08
BLAKE2b-256 0523e2d5626b595bacf1bf84732ece312b18bffd97aa697fdfdda6a3fbdb9e00

See more details on using hashes here.

Provenance

The following attestation bundles were made for chasquimq-1.3.0-cp39-abi3-macosx_11_0_arm64.whl:

Publisher: py-ci.yml on jotarios/chasquimq

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file chasquimq-1.3.0-cp39-abi3-macosx_10_12_x86_64.whl.

File metadata

File hashes

Hashes for chasquimq-1.3.0-cp39-abi3-macosx_10_12_x86_64.whl
Algorithm Hash digest
SHA256 725baff6af74af8effc604b2bc70fffa2bdecb6bbce672dee6a939cb4c2cdffa
MD5 65da1aad34a4c120aacc9184796609b0
BLAKE2b-256 ed300c697f4438d8d78e0d47accc7e6f34b9e2ef630d27bfae1b7144bfd0aa4c

See more details on using hashes here.

Provenance

The following attestation bundles were made for chasquimq-1.3.0-cp39-abi3-macosx_10_12_x86_64.whl:

Publisher: py-ci.yml on jotarios/chasquimq

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page