Easy to use redis queue
Project description
Thorgate :: Redis Queue
Redis queue, that can be easily used to store json-compatible dictionaries and is aimed to be race-condition proof.
Installation
To use the synchronous version:
python3 -m pip install tg-redis-queue[synchronous]
To use asynchronous version:
python3 -m pip install tg-redis-queue[asyncio]
Example usage
from tg_redis_queue.sync_redis_queue import RedisObjectQueue
class MyRedisObjectQueue(RedisObjectQueue):
def _get_redis_url(self):
# Can alternatively use base RedisObjectQueue and provide
# redis_url to constructor, like
# RedisObjectQueue(name="test", redis_url="redis://localhost:6379")
return "redis://localhost:6379"
# Add items to the queue
queue = MyRedisObjectQueue(name='example_que')
queue.add({'key': 1})
queue.add({'key': 2})
queue.add({'key': 3})
queue.add({'key': 4})
queue.add({'key': 5})
# Can be in separate thread or process
queue = MyRedisObjectQueue(name='example_que')
items = queue.get_items(end=3)
print([item.data for item in items])
# [{'key': 1}, {'key': 2}, {'key': 3}]
print(queue.remove(items))
# 3 - number of items removed
# Can use pop as well
item = queue.pop()
print(item.data)
# {'key': 4}
print(queue.get_total_size())
# 1 - only {'key': 5} is left
# Can prune all the data
queue.prune()
print(queue.get_total_size())
# 0 - nothing left
It is possible to use it the queue with async redis (with use of aioredis package).
import asyncio
from tg_redis_queue.async_redis_que import AsyncRedisObjectQueue
async def enqueue_data():
queue = await AsyncRedisObjectQueue.crate(
name="my_queue",
redis_url="redis://localhost:6379",
)
await asyncio.gather(
queue.add({"id": 1}),
queue.add({"id": 2}),
queue.add({"id": 3}),
queue.add({"id": 4}),
queue.add({"id": 5}),
)
await queue.cleanup_connection()
async def consume_queue_data():
queue = await AsyncRedisObjectQueue.crate(
name="my_queue",
redis_url="redis://localhost:6379",
)
print(await queue.pop())
await queue.cleanup_connection()
Authors
This package is developed and maintained by Thorgate as part of our effort to change the world with the power of technology.
See our other projects:
Contributing
To start development, clone the repository and run make setup
. It expects you to
have python3.8 and poetry installed.
You will need to set REDIS_URL
environment variable to run the tests:
export REDIS_URL=redis://localhost:6379
The easiest way to run redis is to run it with Docker:
docker run --name my-redis-container -p 6379:6379 -d redis
Code-formatters are available to make the code more readable and uniform, as well as linters to make sure the code is good quality. See Makefile for details.
The following command will re-format the code
make black-format-all isort-fix
The following command will check the code with linters and tests
make quality coverage
For testing in differnet environments, tox is used. For convenience, tox is ran in gitlab pipeline.
Please make sure your commit passes all the checks before opening a merge request.
Please consider adding yourself to authors in pyptoject.toml
if your contribution
is beyond trivial changes.
Changelog
All notable changes to this project will be documented in this file.
The format is based on Keep a Changelog, and this project adheres to Semantic Versioning.
1.1.1 - 2020-06-10
Fixed
- Package description gone in pypi
1.1.0 - 2020-06-10
Added
- Asynchronous redis queue implementation
- Ability to override the serialization logic in a subclass (
dump_data
andload_data
) to allow using some alternative to json - This changelog
Changed
- Package no longer depends on redis directly (since asynchronous version uses aioredis), to
install with proper dependencies use
synchronous
orasyncio
extra requirements (see README) - Due to different implementations requiring different packages, is no longer possible to import directly from
tg_redis_queue:
from tg_redis_queue.sync_redis_queue import RedisObjectQueue
instead offrom tg_redis_queue import RedisObjectQueue
1.0.0 - 2020-05-28
Added
- Synchronous redis queue implementation, extracted from existing non-library code
- Tests, and pipeline configuration for running the tests in CI
- Code quality checks and formatters (isort, black, prospector)
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Hashes for tg_redis_queue-1.1.1-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 19c05bce338322427a5b16fc2adfe0861589f438361f1380df11ff53ed7fcf52 |
|
MD5 | 5059e51eb39b2dc179f68f817aef3075 |
|
BLAKE2b-256 | a4ac63a3a4edb877dd56d7c51267bb7ed67e7a6ce64698a1985dadeb08b15f58 |