Skip to main content

Good Kiwi Redis Library

Project description

good-redis

Fast-depends compatible Redis client - sync and async versions

Also adds "prioritized queue" redis commands

Prioritized Stream Implementation in Redis Client Library

The Prioritized Stream is a custom data structure implemented using Redis, backed by Lua scripts to handle operations with priority-based ordering. The key features and behaviors of this prioritized stream are outlined below:

Key Components

  1. Redis Keys:

    • :ps:keys: Set containing all active prioritized stream names.
    • <name>:t: Hash table where each key corresponds to a specific item, and its value represents the stored data.
    • <name>:s: Sorted set where the score represents the priority of each key.
    • <name>:exc: Set of keys that are excluded from re-adding, typically after being processed and removed.
    • <name>:exp: Sorted set for tracking the expiration of keys.
  2. Lua Scripts:

    • psadd: Adds a key-value pair to the stream with a specified priority score. The item is added only if it’s not in the exclusion or expiration sets.
    • pspull: Retrieves a specified number of items from the stream in order of highest priority. The items can be purged (removed) or retained with their priority decreased, and they can also be gated (prevented from being re-added).
    • psdecrementall: Decreases the priority of all items in the stream by a specified amount.

Stream Operations

  1. Add Items (psadd and psaddmany):

    • psadd(name, key, value, score, readd=False): Adds a single item to the stream.
      • If readd=True, the item is removed from the exclusion and expiration sets before being added.
    • psaddmany(name, items, readd=False): Adds multiple items to the stream in a single pipeline operation.
  2. Retrieve and Process Items (pspull):

    • pspull(name, count=1, purge=True, gate=False, expire_at=timedelta(minutes=1440)): Retrieves up to count items from the stream, ordered by priority.
      • purge: If True, removes the items from the stream after retrieval. If False, decreases their priority and retains them in the stream.
      • gate: If True, adds retrieved items to the exclusion set, preventing them from being re-added.
      • expire_at: Sets the expiration timestamp for the retrieved items.
  3. Manage Stream Items:

    • pskeys(name): Retrieves all keys from the stream's sorted set.
    • psvalues(name): Retrieves all values from the stream's hash table.
    • psdelete(name): Deletes the entire stream, removing associated keys, sorted sets, and exclusion sets.
    • psincrement(name, key, score): Increments the priority score of a specific key.
    • psdeletebelowthreshold(name, threshold, gate=False): Removes all items with a priority score below a specified threshold.
      • gate: If True, adds the removed items to the exclusion set.
    • pslen(name): Returns the total number of items in the stream.
    • psget(name, key): Retrieves the value associated with a specific key.
    • psdecrement_all(name, decrement): Decrements the priority score of all items in the stream by a specified value.
  4. Expiration Management:

    • psexpire(): Removes expired items from all active prioritized streams based on the current timestamp.

Usage Scenarios

  • Task Queues: The prioritized stream can be used to implement task queues where tasks with higher priorities are processed first.
  • Rate Limiting: Items can be gated and expired to prevent frequent re-processing.
  • Deferred Processing: Items with lower priority can be retained and re-processed later when their priority increases.

This prioritized stream implementation allows for efficient management of time-sensitive and prioritized workloads in a Redis-backed environment, leveraging the power of Lua scripts for atomic and complex operations.

1. Adding Items to the Stream

from fast_depends import inject

@inject
def add_items_to_stream(
    redis: Redis = RedisProvider(),
):
    # Add a single item to the stream
    redis.psadd(name="task_queue", key="task_1", value="Process data", score=10)

    # Add multiple items to the stream
    items = [
        ("task_2", "Send email", 8),
        ("task_3", "Generate report", 12),
    ]
    redis.psaddmany(name="task_queue", items=items)

    # Re-add an item that was previously excluded
    redis.psadd(name="task_queue", key="task_1", value="Process data", score=15, readd=True)

add_items_to_stream()
from fast_depends import inject

@inject
async def add_items_to_stream_async(
    rc: AsyncRedis = AsyncRedisProvider(),
):
    async with rc as redis:
        # Add a single item to the stream
        await redis.psadd(name="task_queue", key="task_1", value="Process data", score=10)

        # Add multiple items to the stream
        items = [
            ("task_2", "Send email", 8),
            ("task_3", "Generate report", 12),
        ]
        await redis.psaddmany(name="task_queue", items=items)

        # Re-add an item that was previously excluded
        await redis.psadd(name="task_queue", key="task_1", value="Process data", score=15, readd=True)

await add_items_to_stream_async()

2. Retrieving and Processing Items from the Stream

from fast_depends import inject

@inject
def process_items_from_stream(
    redis: Redis = RedisProvider(),
):
    # Pull one item with the highest priority, remove it from the stream after processing
    item = redis.pspull(name="task_queue", count=1, purge=True)
    print(item)

    # Pull two items without removing them, but decrement their priority
    items = redis.pspull(name="task_queue", count=2, purge=False)
    print(items)

process_items_from_stream()
from fast_depends import inject

@inject
async def process_items_from_stream_async(
    rc: AsyncRedis = AsyncRedisProvider(),
):
    async with rc as redis:
        # Pull one item with the highest priority, remove it from the stream after processing
        item = await redis.pspull(name="task_queue", count=1, purge=True)
        print(item)

        # Pull two items without removing them, but decrement their priority
        items = await redis.pspull(name="task_queue", count=2, purge=False)
        print(items)

await process_items_from_stream_async()

3. Incrementing the Priority of a Specific Item

from fast_depends import inject

@inject
def increment_priority(
    redis: Redis = RedisProvider(),
):
    # Increment the priority score of a specific task
    redis.psincrement(name="task_queue", key="task_2", score=5)

increment_priority()
from fast_depends import inject

@inject
async def increment_priority_async(
    rc: AsyncRedis = AsyncRedisProvider(),
):
    async with rc as redis:
        # Increment the priority score of a specific task
        await redis.psincrement(name="task_queue", key="task_2", score=5)

await increment_priority_async()

4. Deleting Items Below a Certain Priority Threshold

from fast_depends import inject

@inject
def delete_below_threshold(
    redis: Redis = RedisProvider(),
):
    # Delete all tasks with a priority score below 10
    redis.psdeletebelowthreshold(name="task_queue", threshold=10)

delete_below_threshold()
from fast_depends import inject

@inject
async def delete_below_threshold_async(
    rc: AsyncRedis = AsyncRedisProvider(),
):
    async with rc as redis:
        # Delete all tasks with a priority score below 10
        await redis.psdeletebelowthreshold(name="task_queue", threshold=10)

await delete_below_threshold_async()

5. Viewing All Keys and Values in the Stream

from fast_depends import inject

@inject
def view_keys_and_values(
    redis: Redis = RedisProvider(),
):
    # Retrieve all keys in the sorted set (in order of priority)
    keys = redis.pskeys(name="task_queue")
    print(keys)

    # Retrieve all values in the hash table
    values = redis.psvalues(name="task_queue")
    print(values)

view_keys_and_values()
from fast_depends import inject

@inject
async def view_keys_and_values_async(
    rc: AsyncRedis = AsyncRedisProvider(),
):
    async with rc as redis:
        # Retrieve all keys in the sorted set (in order of priority)
        keys = await redis.pskeys(name="task_queue")
        print(keys)

        # Retrieve all values in the hash table
        values = await redis.psvalues(name="task_queue")
        print(values)

await view_keys_and_values_async()

6. Decrementing the Priority of All Items in the Stream

from fast_depends import inject

@inject
def decrement_all_priorities(
    redis: Redis = RedisProvider(),
):
    # Decrease the priority of all items by 2
    redis.psdecrement_all(name="task_queue", decrement=2)

decrement_all_priorities()
from fast_depends import inject

@inject
async def decrement_all_priorities_async(
    rc: AsyncRedis = AsyncRedisProvider(),
):
    async with rc as redis:
        # Decrease the priority of all items by 2
        await redis.psdecrement_all(name="task_queue", decrement=2)

await decrement_all_priorities_async()

7. Handling Expired Items

from fast_depends import inject

@inject
def handle_expired_items(
    redis: Redis = RedisProvider(),
):
    # Remove all expired items from all prioritized streams
    redis.psexpire()

handle_expired_items()
from fast_depends import inject

@inject
async def handle_expired_items_async(
    rc: AsyncRedis = AsyncRedisProvider(),
):
    async with rc as redis:
        # Remove all expired items from all prioritized streams
        await redis.psexpire()

await handle_expired_items_async()

These examples demonstrate how to utilize the prioritized stream functionality in a dependency injection style with fast-depends. This approach makes it easy to manage Redis clients and stream operations in both synchronous and asynchronous contexts.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

good_redis-0.1.5.tar.gz (7.1 kB view details)

Uploaded Source

Built Distribution

good_redis-0.1.5-py3-none-any.whl (7.9 kB view details)

Uploaded Python 3

File details

Details for the file good_redis-0.1.5.tar.gz.

File metadata

  • Download URL: good_redis-0.1.5.tar.gz
  • Upload date:
  • Size: 7.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.1.1 CPython/3.12.3

File hashes

Hashes for good_redis-0.1.5.tar.gz
Algorithm Hash digest
SHA256 3e3b1b85a83cbedcb423da91f445cd195cf2c4f51d17dff1b07189d70c768858
MD5 b0822e6742253177ef9ceb514d4769c6
BLAKE2b-256 7130561d912f67f63b49340233352c390bd88848ec0368cc1d3c024273ad1fdd

See more details on using hashes here.

File details

Details for the file good_redis-0.1.5-py3-none-any.whl.

File metadata

  • Download URL: good_redis-0.1.5-py3-none-any.whl
  • Upload date:
  • Size: 7.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.1.1 CPython/3.12.3

File hashes

Hashes for good_redis-0.1.5-py3-none-any.whl
Algorithm Hash digest
SHA256 b7d3a4ada6ad9f553bbd2c86f7daba737bed3d9e8ce814e8d769cd1537800b63
MD5 ea5a71021e603e4e4c5b79f2e8a4ab34
BLAKE2b-256 4ffd02d7e79bf24b83133865d6cb628a73b8f5e3da913430a96c02461fa22e6d

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page