Skip to main content

brq

Project description

codecov

brq

brq is a lightweight python library for job queue based on the redis stream, with no central server and self-organized by Consumer.

Architecture.png

Prerequisites

Redis >= 6.2, tested with latest redis 6/7 docker image. Recommended to use redis>=7, which includes more inspection features.

Install

pip install brq

Feature

See examples for running examples.

  • Defer job and automatic retry error job
  • Dead queue for unprocessable job, you can process it later
  • Multiple consumers in one consumer group
  • No scheduler needed, consumer handles itself
  • Using callback function to process job result or exception

Configuration

If using BrqConfig(for example, @task), you can use a .env file and environment variables to configure brq. The prefix of environment variables is BRQ_.

For example, BRQ_REDIS_PORT=6379 python consumer.py for specifying redis port.

See configs for more details.

Echo job overview

Producer

import os

from brq.producer import Producer
from brq.configs import BrqConfig


async def main():
    config = BrqConfig()
    async with config.open_redis_client() as async_redis_client:
        await Producer(
            async_redis_client,
            redis_prefix=config.redis_key_prefix,
            redis_seperator=config.redis_key_seperator,
            max_message_len=config.producer_max_message_length,
        ).run_job("echo", ["hello"])


if __name__ == "__main__":
    import asyncio

    asyncio.run(main())

Consumer

The only thing you need is @task, and the target function can be sync or async and sync function will be converted to async function and run in a thread automatically.

from brq import task


@task
def echo(message):
    print(f"Received message: {message}")


if __name__ == "__main__":
    # Run the task once, for local debug
    # echo("hello")

    # Run as a daemon
    echo.serve()

This is the same as the following, the classic way...But more flexible.

import os

from brq.consumer import Consumer
from brq.daemon import Daemon
from brq.tools import get_redis_client, get_redis_url


async def echo(message):
    print(message)


async def main():
    redis_url = get_redis_url(
        host=os.getenv("REDIS_HOST", "localhost"),
        port=int(os.getenv("REDIS_PORT", 6379)),
        db=int(os.getenv("REDIS_DB", 0)),
        cluster=bool(os.getenv("REDIS_CLUSTER", "false") in ["True", "true", "1"]),
        tls=bool(os.getenv("REDIS_TLS", "false") in ["True", "true", "1"]),
        username=os.getenv("REDIS_USERNAME", ""),
        password=os.getenv("REDIS_PASSWORD", ""),
    )
    async with get_redis_client(redis_url) as async_redis_client:
        daemon = Daemon(Consumer(async_redis_client, echo))
        await daemon.run_forever()


if __name__ == "__main__":
    import asyncio

    asyncio.run(main())

Technical details: deferred jobs

We can use defer_until as a datetime or defer_hours+defer_minutes+defer_seconds to calculate a timestamp based on current redis timestamp. And use unique to set the job to be unique or not.

By default, unique=True means Job with the exactly same function_name, args and kwargs will be unique, which allows the same Job to add into the deferred queue more than once. In this case, we differentiate tasks by the current redis timestamp(Job.create_at) and an additional uuid(Job.uid), just like redis stream did.

If unique=False, the same Job will be added into the deferred queue only once. Duplicates will update the job's defer time. In this case, you can use your own uuid in args(or kwargs) to differentiate Job.

Develop

Install pre-commit before commit

pip install pre-commit
pre-commit install

Install package locally

pip install -e .[test]

Run unit-test before PR

pytest -v

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

brq-0.5.0.tar.gz (80.2 kB view details)

Uploaded Source

Built Distribution

brq-0.5.0-py3-none-any.whl (21.4 kB view details)

Uploaded Python 3

File details

Details for the file brq-0.5.0.tar.gz.

File metadata

  • Download URL: brq-0.5.0.tar.gz
  • Upload date:
  • Size: 80.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.1.1 CPython/3.12.7

File hashes

Hashes for brq-0.5.0.tar.gz
Algorithm Hash digest
SHA256 1f4ac67654ab43e90a1d9e37fa7eedad0b207811751b83a6e9c843dc24090b51
MD5 2f868b3d83e13599ff5cf508bf1409db
BLAKE2b-256 503b57cd088c62cb6b685e7f3ab14adde550daab02380c0d7bbbe7ad3403115e

See more details on using hashes here.

File details

Details for the file brq-0.5.0-py3-none-any.whl.

File metadata

  • Download URL: brq-0.5.0-py3-none-any.whl
  • Upload date:
  • Size: 21.4 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.1.1 CPython/3.12.7

File hashes

Hashes for brq-0.5.0-py3-none-any.whl
Algorithm Hash digest
SHA256 ad2f8789768bca3b78a0b88db04d89cd2fd6ff68ce22916be9e4d5e6d1af79ff
MD5 29c45152cdcdba7a328e3fcd4ee3f8b9
BLAKE2b-256 170deb7a26ba499638c6e4be6ce8517946a349c7ac10113fdd14114572f850bf

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page