Skip to main content

Redis integration for taskiq

Project description

TaskIQ-Redis

Taskiq-redis is a plugin for taskiq that adds a new broker and result backend based on redis.

Installation

To use this project you must have installed core taskiq library:

pip install taskiq

This project can be installed using pip:

pip install taskiq-redis

Usage

Let's see the example with the redis broker and redis async result:

# broker.py
import asyncio

from taskiq_redis import RedisAsyncResultBackend, RedisStreamBroker

result_backend = RedisAsyncResultBackend(
    redis_url="redis://localhost:6379",
)

# Or you can use PubSubBroker if you need broadcasting
# Or ListQueueBroker if you don't want acknowledges
broker = RedisStreamBroker(
    url="redis://localhost:6379",
).with_result_backend(result_backend)


@broker.task
async def best_task_ever() -> None:
    """Solve all problems in the world."""
    await asyncio.sleep(5.5)
    print("All problems are solved!")


async def main():
    task = await best_task_ever.kiq()
    print(await task.wait_result())


if __name__ == "__main__":
    asyncio.run(main())

Launch the workers: taskiq worker broker:broker Then run the main code: python3 broker.py

Brokers

This package contains 6 broker implementations. 3 broker types:

  • PubSub broker
  • ListQueue broker
  • Stream broker

Each of type is implemented for each redis architecture:

  • Single node
  • Cluster
  • Sentinel

Here's a small breakdown of how they differ from eachother.

PubSub

By default on old redis versions PUBSUB was the way of making redis into a queue. But using PUBSUB means that all messages delivered to all subscribed consumers.

[!WARNING] This broker doesn't support acknowledgements. If during message processing Worker was suddenly killed the message is going to be lost.

ListQueue

This broker creates a list of messages at some key. Adding new tasks will be done by appending them from the left side using lpush, and taking them from the right side using brpop.

[!WARNING] This broker doesn't support acknowledgements. If during message processing Worker was suddenly killed the message is going to be lost.

Stream

Stream brokers use redis stream type to store and fetch messages.

[!TIP] This broker supports acknowledgements and therefore is fine to use in cases when data durability is required.

RedisAsyncResultBackend configuration

RedisAsyncResultBackend parameters:

  • redis_url - url to redis.
  • keep_results - flag to not remove results from Redis after reading.
  • result_ex_time - expire time in seconds (by default - not specified)
  • result_px_time - expire time in milliseconds (by default - not specified)
  • Any other keyword arguments are passed to redis.asyncio.BlockingConnectionPool. Notably, you can use timeout to set custom timeout in seconds for reconnects (or set it to None to try reconnects indefinitely).

[!WARNING] It is highly recommended to use expire time in RedisAsyncResultBackend If you want to add expiration, either result_ex_time or result_px_time must be set.

# First variant
redis_async_result = RedisAsyncResultBackend(
    redis_url="redis://localhost:6379",
    result_ex_time=1000,
)

# Second variant
redis_async_result = RedisAsyncResultBackend(
    redis_url="redis://localhost:6379",
    result_px_time=1000000,
)

Schedule sources

You can use this package to add dynamic schedule sources. They are used to store schedules for taskiq scheduler.

The advantage of using schedule sources from this package over default LabelBased source is that you can dynamically add schedules in it.

We have two types of schedules:

  • RedisScheduleSource
  • ListRedisScheduleSource

RedisScheduleSource

This source is super simple. It stores all schedules by key {prefix}:{schedule_id}. When scheduler requests schedules, it retrieves all values from redis that start with a given prefix.

This is very ineficent and should not be used for high-volume schedules. Because if you have 1000 schedules, this scheduler will make at least 20 requests to retrieve them (we use scan and mget to minimize number of calls).

ListRedisScheduleSource

This source holds values in lists.

  • For cron tasks it uses key {prefix}:cron.
  • For timed schedules it uses key {prefix}:time:{time} where {time} is actually time where schedules should run.

The main advantage of this approach is that we only fetch tasks we need to run at a given time and do not perform any excesive calls to redis.

Migration from one source to another

To migrate from RedisScheduleSource to ListRedisScheduleSource you can define the latter as this:

# broker.py
import asyncio
import datetime

from taskiq import TaskiqScheduler

from taskiq_redis import ListRedisScheduleSource, RedisStreamBroker
from taskiq_redis.schedule_source import RedisScheduleSource

broker = RedisStreamBroker(url="redis://localhost:6379")

old_source = RedisScheduleSource("redis://localhost/1", prefix="prefix1")
array_source = ListRedisScheduleSource(
    "redis://localhost/1",
    prefix="prefix2",
    # To migrate schedules from an old source.
).with_migrate_from(
    old_source,
    # To delete schedules from an old source.
    delete_schedules=True,
)
scheduler = TaskiqScheduler(broker, [array_source])

During startup the scheduler will try to migrate schedules from an old source to a new one. Please be sure to specify different prefixe just to avoid any kind of collision between these two.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

taskiq_redis-1.0.9.tar.gz (15.8 kB view details)

Uploaded Source

Built Distribution

taskiq_redis-1.0.9-py3-none-any.whl (20.1 kB view details)

Uploaded Python 3

File details

Details for the file taskiq_redis-1.0.9.tar.gz.

File metadata

  • Download URL: taskiq_redis-1.0.9.tar.gz
  • Upload date:
  • Size: 15.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.1.3 CPython/3.11.12 Linux/6.11.0-1015-azure

File hashes

Hashes for taskiq_redis-1.0.9.tar.gz
Algorithm Hash digest
SHA256 d25a5ef1c8a50dab680bf2433b6c0a811909adc100a9d986cc9ca8cf13b11300
MD5 b7b2e05126c420adfc27243dfca7be62
BLAKE2b-256 ab46fa492736f5b90587e73113022093dfab3a2ae16fddcfeea79b2012f1f598

See more details on using hashes here.

File details

Details for the file taskiq_redis-1.0.9-py3-none-any.whl.

File metadata

  • Download URL: taskiq_redis-1.0.9-py3-none-any.whl
  • Upload date:
  • Size: 20.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.1.3 CPython/3.11.12 Linux/6.11.0-1015-azure

File hashes

Hashes for taskiq_redis-1.0.9-py3-none-any.whl
Algorithm Hash digest
SHA256 a27a4940cfb79fabbe99dba557423ba53728f14517e1554394c9643df2972f3c
MD5 3cafc6f118c94dadcb673a0c8e06834b
BLAKE2b-256 39286311e2f8c9eef70d80f711be86887fba5e151817553231819a24bcd02fd9

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page