Skip to main content

Easy to use redis queue

Project description

Thorgate :: Redis Queue

Redis queue, that can be easily used to store json-compatible dictionaries and is aimed to be race-condition proof.

Installation

To use the synchronous version:

python3 -m pip install tg-redis-queue[synchronous]

To use asynchronous version:

python3 -m pip install tg-redis-queue[asyncio]

Example usage

from tg_redis_queue.sync_redis_queue import RedisObjectQueue


class MyRedisObjectQueue(RedisObjectQueue):
    # Default expiry time is 60 seconds, set it to 5 minutes for example purposes
    # so that queue doesn't expire while we're playing around with it.
    #
    # It is possible to set this to None as well, then queue will never expire and 
    # you are responsible for cleaning it up with `.prune()`
    EXPIRY_TIME = 300
    
    def _get_redis_url(self):
        # Can alternatively use base RedisObjectQueue and provide 
        # redis_url to constructor, like 
        # RedisObjectQueue(name="test", redis_url="redis://localhost:6379")
        return "redis://localhost:6379"


# Add items to the queue
queue = MyRedisObjectQueue(name='example_que')
queue.add({'key': 1})
queue.add({'key': 2})
queue.add({'key': 3})
queue.add({'key': 4})
queue.add({'key': 5})

# Can be in separate thread or process
queue = MyRedisObjectQueue(name='example_que')
# Get items with offset 0, 1 and 2
items = queue.get_items(end=2)
print([item.data for item in items])
# [{'key': 1}, {'key': 2}, {'key': 3}]

print(queue.remove(items))
# 3 - number of items removed

# Can use pop as well
item = queue.pop()
print(item.data)
# {'key': 4}

print(queue.get_total_size())
# 1 - only {'key': 5} is left

# Can prune all the data
queue.prune()
print(queue.get_total_size())
# 0 - nothing left

It is possible to use it the queue with async redis (with use of aioredis package).

import asyncio

from tg_redis_queue.async_redis_que import AsyncRedisObjectQueue

async def enqueue_data():
    queue = await AsyncRedisObjectQueue.create(
        name="my_queue",
        redis_url="redis://localhost:6379",
    )
    
    await asyncio.gather(
        queue.add({"id": 1}),
        queue.add({"id": 2}),
        queue.add({"id": 3}),
        queue.add({"id": 4}),
        queue.add({"id": 5}),
    )
    
    await queue.cleanup_connection()
    
async def consume_queue_data():
    queue = await AsyncRedisObjectQueue.create(
        name="my_queue",
        redis_url="redis://localhost:6379",
    )
    
    print(await queue.pop())

    await queue.cleanup_connection()

loop = asyncio.get_event_loop()
loop.run_until_complete(enqueue_data())
# Queue now has 5 items

loop.run_until_complete(consume_queue_data())
# Prints first item in the queue

It is possible to manually set the key for queue items. Items with the same key can only exist in the queue once, and if an item is added to the queue again with the same key, item data will be updated but item score (position in the queue) will be preserved - this way you can update the item in the queue without resetting it's queue position:

from tg_redis_queue.sync_redis_queue import RedisObjectQueue
from tg_redis_queue import RedisQueItem

queue = RedisObjectQueue(name="my-queue", redis_url="redis://localhost:6379")

# Add item with explicit key to the queue
queue.add(
    RedisQueItem(
        key=f"item-1",
        data={"id": 1, "message": "test"},
    )
)

# Queue now: ["item-1"]

queue.add(
    RedisQueItem(
        key=f"item-2",
        data={"id": 2, "message": "test"},
    )
)
# Queue now: ["item-1", "item-2"]

queue.add(
    RedisQueItem(
        key=f"item-1",
        data={"id": 42, "message": "test again"},
    )
)
# Queue ordering is unchanged: ["item-1", "item-2"]

item = queue.pop()
# Data is updated
assert item.data == {"id": 42, "message": "test again"}

Authors

This package is developed and maintained by Thorgate as part of our effort to change the world with the power of technology.

See our other projects:

Contributing

To start development, clone the repository and run make setup. It expects you to have python3.8 and poetry installed.

You will need to set REDIS_URL environment variable to run the tests:

export REDIS_URL=redis://localhost:6379

The easiest way to run redis is to run it with Docker:

docker run --name my-redis-container -p 6379:6379 -d redis

Code-formatters are available to make the code more readable and uniform, as well as linters to make sure the code is good quality. See Makefile for details.

The following command will re-format the code

make black-format-all isort-fix

The following command will check the code with linters and tests

make quality coverage

For testing in different environments, tox is used. For convenience, tox is ran in gitlab pipeline.

Please make sure your commit passes all the checks before opening a merge request.

Please consider adding yourself to authors in pyptoject.toml if your contribution is beyond trivial changes.

Changelog

All notable changes to this project will be documented in this file.

The format is based on Keep a Changelog, and this project adheres to Semantic Versioning.

1.1.3 - 2021-06-30

Changed

  • Reordered pyproject.toml so that poetry sections are on the top
  • Made package homepage to be git repository for easier access

Fixed

  • Typo in readme
  • Wrong dates in this changelog (we were lost in time, 2020->2021)
  • Remove MANIFEST.in since pyproject.toml made it obsolete
  • Added missing dataclasses dependency

1.1.2 - 2021-06-10

Added

  • More examples in readme on how to manually override the key for item

Fixed

  • Typo in readme in aioredis example
  • Links in changelog to show diffs
  • Examples providing not accurate data on how get_items upper limit works

1.1.1 - 2021-06-10

Fixed

  • Package description gone in pypi

1.1.0 - 2021-06-10

Added

  • Asynchronous redis queue implementation
  • Ability to override the serialization logic in a subclass (dump_data and load_data) to allow using some alternative to json
  • This changelog

Changed

  • Package no longer depends on redis directly (since asynchronous version uses aioredis), to install with proper dependencies use synchronous or asyncio extra requirements (see README)
  • Due to different implementations requiring different packages, is no longer possible to import directly from tg_redis_queue: from tg_redis_queue.sync_redis_queue import RedisObjectQueue instead of from tg_redis_queue import RedisObjectQueue

1.0.0 - 2021-05-28

Added

  • Synchronous redis queue implementation, extracted from existing non-library code
  • Tests, and pipeline configuration for running the tests in CI
  • Code quality checks and formatters (isort, black, prospector)

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

tg-redis-queue-1.1.3.tar.gz (12.6 kB view hashes)

Uploaded Source

Built Distribution

tg_redis_queue-1.1.3-py3-none-any.whl (11.7 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page