Skip to main content

Redis integration for taskiq

Project description

TaskIQ-Redis

Taskiq-redis is a plugin for taskiq that adds a new broker and result backend based on redis.

Installation

To use this project you must have installed core taskiq library:

pip install taskiq

This project can be installed using pip:

pip install taskiq-redis

Usage

Let's see the example with the redis broker and redis async result:

# broker.py
import asyncio

from taskiq_redis import ListQueueBroker, RedisAsyncResultBackend

redis_async_result = RedisAsyncResultBackend(
    redis_url="redis://localhost:6379",
)

# Or you can use PubSubBroker if you need broadcasting
broker = ListQueueBroker(
    url="redis://localhost:6379",
    result_backend=redis_async_result,
)


@broker.task
async def best_task_ever() -> None:
    """Solve all problems in the world."""
    await asyncio.sleep(5.5)
    print("All problems are solved!")


async def main():
    task = await best_task_ever.kiq()
    print(await task.wait_result())


if __name__ == "__main__":
    asyncio.run(main())

Launch the workers: taskiq worker broker:broker Then run the main code: python3 broker.py

PubSubBroker and ListQueueBroker configuration

We have two brokers with similar interfaces, but with different logic. The PubSubBroker uses redis' pubsub mechanism and is very powerful, but it executes every task on all workers, because PUBSUB broadcasts message to all subscribers.

If you want your messages to be processed only once, please use ListQueueBroker. It uses redis' LPUSH and BRPOP commands to deal with messages.

Brokers parameters:

  • url - url to redis.
  • task_id_generator - custom task_id genertaor.
  • result_backend - custom result backend.
  • queue_name - name of the pub/sub channel in redis.
  • max_connection_pool_size - maximum number of connections in pool.
  • Any other keyword arguments are passed to redis.asyncio.BlockingConnectionPool. Notably, you can use timeout to set custom timeout in seconds for reconnects (or set it to None to try reconnects indefinitely).

RedisAsyncResultBackend configuration

RedisAsyncResultBackend parameters:

  • redis_url - url to redis.
  • keep_results - flag to not remove results from Redis after reading.
  • result_ex_time - expire time in seconds (by default - not specified)
  • result_px_time - expire time in milliseconds (by default - not specified)
  • Any other keyword arguments are passed to redis.asyncio.BlockingConnectionPool. Notably, you can use timeout to set custom timeout in seconds for reconnects (or set it to None to try reconnects indefinitely).

IMPORTANT: It is highly recommended to use expire time ​​in RedisAsyncResultBackend If you want to add expiration, either result_ex_time or result_px_time must be set.

# First variant
redis_async_result = RedisAsyncResultBackend(
   redis_url="redis://localhost:6379",
   result_ex_time=1000,
)

# Second variant
redis_async_result = RedisAsyncResultBackend(
   redis_url="redis://localhost:6379",
   result_px_time=1000000,
)

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

taskiq_redis-1.0.0.tar.gz (9.7 kB view details)

Uploaded Source

Built Distribution

taskiq_redis-1.0.0-py3-none-any.whl (11.9 kB view details)

Uploaded Python 3

File details

Details for the file taskiq_redis-1.0.0.tar.gz.

File metadata

  • Download URL: taskiq_redis-1.0.0.tar.gz
  • Upload date:
  • Size: 9.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.8.3 CPython/3.11.9 Linux/6.5.0-1021-azure

File hashes

Hashes for taskiq_redis-1.0.0.tar.gz
Algorithm Hash digest
SHA256 45b2ac312a61725b4a5bdc5595f160d83986eac21eb5bc080ede5e6272d87d61
MD5 6a1d2ad43a1e6d87ec2e7d87a283c931
BLAKE2b-256 1f397c066c604f55d5dc5f632bd24d265a72cbed278740c12d7aa91f4015b735

See more details on using hashes here.

File details

Details for the file taskiq_redis-1.0.0-py3-none-any.whl.

File metadata

  • Download URL: taskiq_redis-1.0.0-py3-none-any.whl
  • Upload date:
  • Size: 11.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.8.3 CPython/3.11.9 Linux/6.5.0-1021-azure

File hashes

Hashes for taskiq_redis-1.0.0-py3-none-any.whl
Algorithm Hash digest
SHA256 867949594e63402bdb8378fcc9f1e4e1a18360cb7d3da30fe06d8c33cb74822f
MD5 80dc4cecf63296dd04d6057b3accbe8b
BLAKE2b-256 965df101e0277122e9c064716ce2707cca191c337a362fbec7029759d824f8ac

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page