Skip to main content

Sharded queue implementation

Project description

Sharded queue

Introduction

This library can be used to distribute your job queue using sharding technique.
Any handler consists of the handler and route method that defines your request routing.
Thread meaning subqueue of a handler with strict fifo order.
Order segment can be used for request priority management inside your thread subqueue.
All code is written using batch approach to reduce io latency per each message.

Getting started

Install library. pip install sharded-queue

Describe your handler.

from repositories import UserRepository
from sharded_queue import Handler, Queue, RequestRoute, Route
from services import construct_message, mailer

class NotifyRequest:
    '''
    This is your single handler request
    '''
    user_id: int

class NotifyHandler(Handler):
    @classmethod
    async def route(cls, *requests: NotifyRequest):
        '''
        Spread requests by 3 threads that can be concurrently processed
        '''
        return [
            RequestRoute(request, Route(thread=str(request.user_id % 3)))
            for request in requests
        ]

    async def perform(self, *requests: NotifyRequest):
        '''
        Perform is called using configurable batch size
        This allows you to reduce io per single request
        '''
        users = await UserRepository.find_all([r.user_id for r in requests])

        await mailer.send_all([construct_message(user) for user in users])


async def main():
    queue = Queue()
    await queue.register(
        NotifyHandler,
        NotifyRequest(1),
        NotifyRequest(2),
        NotifyRequest(3),
        NotifyRequest(4),
        NotifyRequest(5),
        NotifyRequest(6),
        NotifyRequest(7),
        NotifyRequest(8),
        NotifyRequest(9),
    )

    # now all requests are waiting for workers on 3 notify handler tubes
    # first tube contains notify request for users 1, 4, 7
    # second tube contains requests for 2, 5, 8 and so on
    # they were distributed using route handler method

    worker = Worker(queue)
    # we can run worker with processed message limit
    # in this example we will run three coroutines that will process messages
    # workers will bind to any tube and process all 3 messages
    # in advance, you can run workers on a distributed system
    futures = [
        worker.loop(3),
        worker.loop(3),
        worker.loop(3),
    ]

    # now all emails were send
    await gather(*futures)

Handler boostrap

When a worker will bind to queue it created async context that is used to do bootstrap and shutdown routines.

class BucketRequest:
    bucket: int

class SyncBucketHandler(Handler):
    async def start(self):
        '''
        perform any tasks before perform would be called
        '''
    async def perform(self, *requests: BucketRequest):
        pass

    async def stop(self):
        '''
        perform any tasks after perform would be called
        '''

Queue configuration

You can configure sharded queue using env.

  • QUEUE_COORDINATOR_DELAY=1 Coordinator delay in seconds on empty queues
  • QUEUE_DEFAULT_ORDER='0' Default queue order
  • QUEUE_DEFAULT_THREAD='0' Default queue thread
  • QUEUE_WORKER_BATCH_SIZE=128 Worker batch processing size
  • QUEUE_WORKER_EMPTY_LIMIT=16 Worker empty queue attempt limit berfore queue rebind
  • QUEUE_WORKER_EMPTY_PAUSE=0.1 Worker pause in seconds on empty queue

Or import and settings object:

from sharded_queue import settings
settings.coordinator_delay = 5
settings.worker_batch_size = 64

worker = Worker(Queue())
await worker.loop()

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

sharded_queue-0.0.1.tar.gz (6.0 kB view details)

Uploaded Source

Built Distribution

sharded_queue-0.0.1-py3-none-any.whl (5.7 kB view details)

Uploaded Python 3

File details

Details for the file sharded_queue-0.0.1.tar.gz.

File metadata

  • Download URL: sharded_queue-0.0.1.tar.gz
  • Upload date:
  • Size: 6.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/4.0.2 CPython/3.11.5

File hashes

Hashes for sharded_queue-0.0.1.tar.gz
Algorithm Hash digest
SHA256 8334fde265a49d6bdbe5d6a13ed5bd84a5f8551e5089f4d40fd1614b2e6043e7
MD5 2c43e3200a85337b7a63aef9b0b5b5f1
BLAKE2b-256 edcb0f5ad30833083f2ea76f296dd6c18336d62f520c2cc8def91b2f953bd9b6

See more details on using hashes here.

File details

Details for the file sharded_queue-0.0.1-py3-none-any.whl.

File metadata

File hashes

Hashes for sharded_queue-0.0.1-py3-none-any.whl
Algorithm Hash digest
SHA256 c43f28a43dbc7ad9deb92bcd2ed9a922781f8c62f95fa1772ff6f70464e9d3ba
MD5 f1a8c8a3aafdf5be45f7d25b427f2fe0
BLAKE2b-256 19f0160b7095a2771dd2aef6a6f2fcba7590d0e6460b2ebc189cb284c795038c

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page