Skip to main content

brq

Project description

codecov

This project is inspired by arq. Not intentionally dividing the community, I desperately needed a redis queue based on redis stream for work reasons and just decided to open source it.

You should also consider arq as more of a library: https://github.com/samuelcolvin/arq/issues/437

brq

Architecture.png

Prerequisites

Redis >= 6.2, tested with latest redis 6/7 docker image. Recommended to use redis>=7, which includes more inspection features.

Install

pip install brq

Feature

See examples for running examples.

  • Defer job and automatic retry error job
  • Dead queue for unprocessable job, you can process it later
  • Multiple consumers in one consumer group
  • No scheduler needed, consumer handles itself

Echo job overview

Producer

import os

from brq.producer import Producer
from brq.tools import get_redis_client, get_redis_url


async def main():
    redis_url = get_redis_url(
        host=os.getenv("REDIS_HOST", "localhost"),
        port=int(os.getenv("REDIS_PORT", 6379)),
        db=int(os.getenv("REDIS_DB", 0)),
        cluster=bool(os.getenv("REDIS_CLUSTER", "false") in ["True", "true", "1"]),
        tls=bool(os.getenv("REDIS_TLS", "false") in ["True", "true", "1"]),
        username=os.getenv("REDIS_USERNAME", ""),
        password=os.getenv("REDIS_PASSWORD", ""),
    )
    async with get_redis_client(redis_url) as async_redis_client:
        await Producer(async_redis_client).run_job("echo", ["hello"])


if __name__ == "__main__":
    import asyncio

    asyncio.run(main())

Consumer

import os

from brq.consumer import Consumer
from brq.daemon import Daemon
from brq.tools import get_redis_client, get_redis_url


async def echo(message):
    print(message)


async def main():
    redis_url = get_redis_url(
        host=os.getenv("REDIS_HOST", "localhost"),
        port=int(os.getenv("REDIS_PORT", 6379)),
        db=int(os.getenv("REDIS_DB", 0)),
        cluster=bool(os.getenv("REDIS_CLUSTER", "false") in ["True", "true", "1"]),
        tls=bool(os.getenv("REDIS_TLS", "false") in ["True", "true", "1"]),
        username=os.getenv("REDIS_USERNAME", ""),
        password=os.getenv("REDIS_PASSWORD", ""),
    )
    async with get_redis_client(redis_url) as async_redis_client:
        daemon = Daemon(Consumer(async_redis_client, echo))
        await daemon.run_forever()


if __name__ == "__main__":
    import asyncio

    asyncio.run(main())

Technical details: deferred jobs

We can use defer_until as a datetime or defer_hours+defer_minutes+defer_seconds to calculate a timestamp based on current redis timestamp. And use unique to set the job to be unique or not.

By default, unique=True means Job with the exactly same function_name, args and kwargs will be unique, which allows the same Job to add into the deferred queue more than once. In this case, we differentiate tasks by the current redis timestamp(Job.create_at) and an additional uuid(Job.uid), just like redis stream did.

If unique=False, the same Job will be added into the deferred queue only once. Duplicates will update the job's defer time. In this case, you can use your own uuid in args(or kwargs) to differentiate Job.

Develop

Install pre-commit before commit

pip install pre-commit
pre-commit install

Install package locally

pip install -e .[test]

Run unit-test before PR

pytest -v

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

brq-0.3.2.tar.gz (76.7 kB view details)

Uploaded Source

Built Distribution

brq-0.3.2-py3-none-any.whl (17.7 kB view details)

Uploaded Python 3

File details

Details for the file brq-0.3.2.tar.gz.

File metadata

  • Download URL: brq-0.3.2.tar.gz
  • Upload date:
  • Size: 76.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.1.0 CPython/3.12.5

File hashes

Hashes for brq-0.3.2.tar.gz
Algorithm Hash digest
SHA256 7d9473b7da6e43a19dacf3ecb55e76e30b98a0b2d176a15cbff05d2d751c10f8
MD5 9e615fe6df6a7174ba0c4f87290bb8e7
BLAKE2b-256 3b5eebeb80e7fb1c214976fd3d268af1150a4bf1e62f06a523161bb7c82e8642

See more details on using hashes here.

File details

Details for the file brq-0.3.2-py3-none-any.whl.

File metadata

  • Download URL: brq-0.3.2-py3-none-any.whl
  • Upload date:
  • Size: 17.7 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.1.0 CPython/3.12.5

File hashes

Hashes for brq-0.3.2-py3-none-any.whl
Algorithm Hash digest
SHA256 f3eea8bb9c8255b00f58d3fe0e90cbb2a52d9fcd38eda98645bff95c549309f7
MD5 b2097943583343b1327f3e83acd1f826
BLAKE2b-256 dee8dd8bc7f4bfe903854f575d0740166ff31ee01912a8fbcd5a6f6a39ed1977

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page