Skip to main content

Sharded queue implementation

Project description

Sharded queue

CI pypi codecov downloads license

Introduction

Imagine your job queue operates at very high rps and needs distribution over multiple workers. But you need to keep context-sensitive requests in same thread and manage thread request processing priority. In other words, sharded queue is a queue with sub-queues inside. Tasks are executed in FIFO order and you define how to route them correctly per handler basis.

Installation

Install using pip

pip install sharded-queue

Getting started

There are some roles that you need to understand:

  • request a simple message that should be delivered to a handler
  • handler request handler that performs the job
  • route defines internal queue that is used for request distribution
    • thread a group of context-sensitive requests
    • priority can be used to sort requests inside the thread

Let's start with a simple notification task that is shared by 3 threads and there are no priorities. Notice, that handler methods are written using batch approach to reduce io latency per each message.

from sharded_queue import Handler, Queue, Route

class NotifyRequest(NamedTuple):
    '''
    In this example we have simple notify request containing user identifier
    In addition, the value is used to shard requests over worker threads
    '''
    user_id: int

class NotifyHandler(Handler):
    @classmethod
    async def route(cls, *requests: NotifyRequest) -> list[Route]:
        '''
        Spread requests by 3 threads that can be concurrently processed
        '''
        return [
            Route(thread=request.user_id % 3)
            for request in requests
        ]

    async def perform(self, *requests: NotifyRequest) -> None:
        '''
        Perform is called using configurable batch size
        This allows you to reduce io per single request
        '''
        # users = await UserRepository.find_all([r.user_id for r in requests])
        # await mailer.send_all([construct_message(user) for user in users])

Usage example

When a handler is described you can use queue and worker api to manage and process tasks. Let's describe runtime components:

  • lock helps worker bind to the queue
  • queue is used to register requests
  • storage a database containing queue data
  • worker performs requests using handler
from asyncio import gather
from notifications import NotifyHandler, NotifyRequest
from sharded_queue import Queue, Worker
from sharded_queue.drivers import RuntimeLock, RuntimeStorage


async def example():
    '''
    let's register notification for first 9 users
    '''
    queue: Queue = Queue(RuntimeStorage())
    await queue.register(NotifyHandler, *[NotifyRequest(n) for n in range(1, 9)])
    '''
    now all requests are waiting for workers on 3 notify handler tubes
    they were distributed using route handler method
    first tube contains notify request for users 1, 4, 7
    second tube contains requests for 2, 5, 8 and other goes to third tube
    '''
    futures = [
        Worker(RuntimeLock(), queue).loop(3),
        Worker(RuntimeLock(), queue).loop(3),
        Worker(RuntimeLock(), queue).loop(3),
    ]
    '''
    we've just run three coroutines that will process messages
    workers will bind to each thread and process all messages
    '''
    await gather(*futures)
    '''
    now all emails were send in 3 threads
    '''

Drivers

There are several implementations of components:

  • RedisLock persist locks in redis using setnx api
  • RedisStorage persist msgs using lists and lrange/lpop/rpush api
  • RuntimeLock persist locks in memory (process in-memory distribution)
  • RuntimeStorage persist msgs in dict (process in-memory distribution)

Handler lifecycle

As you can notice, routing is made using static method, but perform is an instance method. When a worker start processing requests it can bootstrap and tear down the handler using start and stop methods

class ParseEventRequest(NamedTuple):
    '''
    Event identifier should be enough to get it contents from storage
    '''
    event: int

class ParseEventHandler(Handler):
    @classmethod
    async def create(cls) -> Self:
        '''
        define your own handler and dependency factory
        '''
        return cls()

    @classmethod
    async def route(cls, *requests: ParseEventRequest) -> list[Route]:
        '''
        override default single thread tube
        '''
        return [
            Route(settings.default_thread, settings.default_priority)
            for request in requests
        ]

    async def start(self):
        '''
        run any code on worker is bind to the queue
        '''

    async def perform(self, *requests: ParseEventRequest):
        '''
        the handler
        '''

    async def handle(self, *requests: ParseEventRequest) -> None:
        '''
        process requests batch
        '''

    async def stop(self):
        '''
        run any code when queue is empty and worker stops processing thread
        '''

Deduplication

There is an optional if_not_exists flag. If it is set, request will be registered only if not persent in a queue

await queue.register(SycBucket, Bucket(7756527), if_not_exists=True)
await queue.register(SycBucket, Bucket(7756527), if_not_exists=True)

Deferred handlers

You can use built-in deferred request handler to defer call

await queue.register(Housekeep, Room(402), defer=5)  # numeric means seconds
await queue.register(Housekeep, Room(324), defer=timedelta(minutes=15))

Recurrent handlers

You can use built-in recurrent request handler for regular request registration

await queue.register(BalanceCheck, Company('basis'), recurrent=timedelta(hours=1))

Performance

Performance dependends on many factors, we can only measure clean library overhead with in-memory storages. You can run performance on your hardware with pytest -s, with this option performance test will print result for different cases. Perfomance test on intel i5-4670K, Ubuntu 23.04 LTS using Python 3.11.4 gives us about 200_000 rps for batch request registration with sharding and about 600_000 requests for request handling in concurrent mode.

Advanced queue configuration

You can configure sharded queue using env

  • QUEUE_DEFAULT_PRIORITY = 0
    Default queue priority
  • QUEUE_DEFAULT_THREAD = 0
    Default queue thread
  • QUEUE_DEFERRED_RETRY_DELAY = 1
    Deferred tasks retry delay
  • QUEUE_LOCK_PREFIX = 'lock_'
    Lock key prefix
  • QUEUE_LOCK_TIMEOUT = 24 * 60 * 60
    Lock key ttl
  • QUEUE_RECURRENT_CHECK_INTERVAL = 30
    Recurrent interval check in seconds
  • QUEUE_RECURRENT_TASKS_LIMIT = 1024
    Recurrent tasks limit count
  • QUEUE_TUBE_PREFIX = 'tube_'
    Default queue prefix
  • QUEUE_WORKER_ACQUIRE_DELAY = 1
    Worker acquire delay in seconds on empty queues
  • QUEUE_WORKER_BATCH_SIZE = 128
    Worker batch processing size
  • QUEUE_WORKER_EMPTY_LIMIT = 16
    Worker empty queue attempt limit berfore queue rebind
  • QUEUE_WORKER_EMPTY_PAUSE = 0.1
    Worker pause in seconds on empty queue

You can import and change settings manually

from sharded_queue import Queue, Worker
from sharded_queue.drivers import RuntimeLock, RuntimeStorage
from sharded_queue.settings import settings

settings.worker_acquire_delay = 5
settings.worker_batch_size = 64

worker = Worker(RuntimeLock(), Queue(RuntimeStorage()))
await worker.loop()

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

sharded_queue-0.1.3.tar.gz (12.0 kB view details)

Uploaded Source

Built Distribution

sharded_queue-0.1.3-py3-none-any.whl (9.9 kB view details)

Uploaded Python 3

File details

Details for the file sharded_queue-0.1.3.tar.gz.

File metadata

  • Download URL: sharded_queue-0.1.3.tar.gz
  • Upload date:
  • Size: 12.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/4.0.2 CPython/3.11.5

File hashes

Hashes for sharded_queue-0.1.3.tar.gz
Algorithm Hash digest
SHA256 7e5caba2d2d5a415f4cd8e73bcebc826869784f7ca8e77fe9ac3243f21760b48
MD5 e6a6b9c195b13addb3d0e326360fa76d
BLAKE2b-256 946e16babddf5f926cade1cb7c04a75e00ac88c47b9610f7e2a08067a8702b0a

See more details on using hashes here.

File details

Details for the file sharded_queue-0.1.3-py3-none-any.whl.

File metadata

File hashes

Hashes for sharded_queue-0.1.3-py3-none-any.whl
Algorithm Hash digest
SHA256 f894756e1b785d7f57c2f63458f99440261813aa9a7e25c1130a0eccea8176ef
MD5 c033ea6be1fca36b89058f3d13691eae
BLAKE2b-256 fd9980beb0da5642779491e37da17c7d1b891e2ec0dcb5c78cd193a81bb4faca

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page