brq
Project description
This project is inspired by arq. Not intentionally dividing the community, I desperately needed a redis queue based on redis stream for work reasons and just decided to open source it.
You should also consider arq as more of a library: https://github.com/samuelcolvin/arq/issues/437
brq
Prerequisites
Redis >= 6.2, tested with latest redis 6/7 docker image. Recommended to use redis>=7, which includes more inspection features.
Install
pip install brq
Feature
See examples for running examples.
- Defer job and automatic retry error job
- Dead queue for unprocessable job, you can process it later
- Multiple consumers in one consumer group
- No scheduler needed, consumer handles itself
Configuration
If using BrqConfig
(for example, @task
), you can use a .env
file and environment variables to configure brq. The prefix of environment variables is BRQ_
.
For example,
BRQ_REDIS_PORT=6379 python consumer.py
for specifying redis port.
See configs for more details.
Echo job overview
Producer
import os
from brq.producer import Producer
from brq.configs import BrqConfig
async def main():
config = BrqConfig()
async with config.open_redis_client() as async_redis_client:
await Producer(
async_redis_client,
redis_prefix=config.redis_key_prefix,
redis_seperator=config.redis_key_seperator,
max_message_len=config.producer_max_message_length,
).run_job("echo", ["hello"])
if __name__ == "__main__":
import asyncio
asyncio.run(main())
Consumer
The only thing you need is @task
, and the target function can be sync
or async
and sync
function will be converted to async
function and run in a thread automatically.
from brq import task
@task
def echo(message):
print(f"Received message: {message}")
if __name__ == "__main__":
# Run the task once, for local debug
# echo("hello")
# Run as a daemon
echo.serve()
This is the same as the following, the classic way...But more flexible.
import os
from brq.consumer import Consumer
from brq.daemon import Daemon
from brq.tools import get_redis_client, get_redis_url
async def echo(message):
print(message)
async def main():
redis_url = get_redis_url(
host=os.getenv("REDIS_HOST", "localhost"),
port=int(os.getenv("REDIS_PORT", 6379)),
db=int(os.getenv("REDIS_DB", 0)),
cluster=bool(os.getenv("REDIS_CLUSTER", "false") in ["True", "true", "1"]),
tls=bool(os.getenv("REDIS_TLS", "false") in ["True", "true", "1"]),
username=os.getenv("REDIS_USERNAME", ""),
password=os.getenv("REDIS_PASSWORD", ""),
)
async with get_redis_client(redis_url) as async_redis_client:
daemon = Daemon(Consumer(async_redis_client, echo))
await daemon.run_forever()
if __name__ == "__main__":
import asyncio
asyncio.run(main())
Technical details: deferred jobs
We can use defer_until
as a datetime
or defer_hours
+defer_minutes
+defer_seconds
to calculate a timestamp based on current redis timestamp. And use unique
to set the job to be unique or not.
By default, unique=True
means Job
with the exactly same function_name
, args
and kwargs
will be unique, which allows the same Job
to add into the deferred queue more than once. In this case, we differentiate tasks by the current redis timestamp(Job.create_at
) and an additional uuid(Job.uid
), just like redis stream
did.
If unique=False
, the same Job
will be added into the deferred queue only once. Duplicates will update the job's defer time. In this case, you can use your own uuid in args
(or kwargs
) to differentiate Job
.
Develop
Install pre-commit before commit
pip install pre-commit
pre-commit install
Install package locally
pip install -e .[test]
Run unit-test before PR
pytest -v
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file brq-0.4.0.tar.gz
.
File metadata
- Download URL: brq-0.4.0.tar.gz
- Upload date:
- Size: 79.7 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/5.1.1 CPython/3.12.5
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 58ea7440d92d71dfb953140ee88f19dd19d843261a1fe396b3c60241592b7dff |
|
MD5 | f742c6c8f6a4d635fd7a184681c96d70 |
|
BLAKE2b-256 | 3af048437b11cb47f595cedc0806d273158d890f49689d814f62797f6ed2240c |
File details
Details for the file brq-0.4.0-py3-none-any.whl
.
File metadata
- Download URL: brq-0.4.0-py3-none-any.whl
- Upload date:
- Size: 21.2 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/5.1.1 CPython/3.12.5
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 8789fd2abdca1e24de43e12b8b70868cc5681ae5f7c9232f865cb98586d01f2c |
|
MD5 | 5adceea164908a515cc6bdfd12698cb7 |
|
BLAKE2b-256 | 51cd655f5666940ed188f51f48808ba889f7dafc150867ea42c7b0b5254ebfaf |