Sharded queue implementation
Project description
Sharded queue
Introduction
Imagine your job queue operates at very high rps and needs distribution over multiple workers. But you need to keep context-sensitive requests in same thread and manage thread request processing priority. In other words, sharded queue is a queue with sub-queues inside. Tasks are executed in FIFO order and you define how to route them correctly per handler basis.
Installation
Install using pip
pip install sharded-queue
Getting started
There are some roles that you need to understand:
requesta simple message that should be delivered to a handlerhandlerrequest handler that performs the jobroutedefines internal queue that is used for request distributionthreada group of context-sensitive requestsprioritycan be used to sort requests inside the thread
Let's start with a simple notification task that is shared by 3 threads and there are no priorities. Notice, that handler methods are written using batch approach to reduce io latency per each message.
from sharded_queue import Handler, Queue, Route
class NotifyRequest(NamedTuple):
'''
In this example we have simple notify request containing user identifier
In addition, the value is used to shard requests over worker threads
'''
user_id: int
class NotifyHandler(Handler):
@classmethod
async def route(cls, *requests: NotifyRequest) -> list[Route]:
'''
Spread requests by 3 threads that can be concurrently processed
'''
return [
Route(thread=request.user_id % 3)
for request in requests
]
async def perform(self, *requests: NotifyRequest) -> None:
'''
Perform is called using configurable batch size
This allows you to reduce io per single request
'''
# users = await UserRepository.find_all([r.user_id for r in requests])
# await mailer.send_all([construct_message(user) for user in users])
def batch_size(self) -> Optional[int]:
'''
Optionally, you can ovveride global batch size per handler
'''
return 128
Usage example
When a handler is described you can use queue and worker api to manage and process tasks. Let's describe runtime components:
lockhelps worker bind to the queuequeueis used to register requestsstoragea database containing queue dataworkerperforms requests using handler
from asyncio import gather
from notifications import NotifyHandler, NotifyRequest
from sharded_queue import Queue, Worker
from sharded_queue.drivers import RuntimeLock, RuntimeStorage
async def example():
'''
let's register notification for first 9 users
'''
queue: Queue = Queue(RuntimeStorage())
await queue.register(NotifyHandler, *[NotifyRequest(n) for n in range(1, 9)])
'''
now all requests are waiting for workers on 3 notify handler tubes
they were distributed using route handler method
first tube contains notify request for users 1, 4, 7
second tube contains requests for 2, 5, 8 and other goes to third tube
'''
futures = [
Worker(RuntimeLock(), queue).loop(3),
Worker(RuntimeLock(), queue).loop(3),
Worker(RuntimeLock(), queue).loop(3),
]
'''
we've just run three coroutines that will process messages
workers will bind to each thread and process all messages
'''
await gather(*futures)
'''
now all emails were send in 3 threads
'''
Drivers
There are several implementations of components:
RedisLockpersist locks in redis using setnx apiRedisStoragepersist msgs using lists and lrange/lpop/rpush apiRuntimeLockpersist locks in memory (process in-memory distribution)RuntimeStoragepersist msgs in dict (process in-memory distribution)
Handler lifecycle
As you can notice, routing is made using static method, but perform is an instance method. When a worker start processing requests it can bootstrap and tear down the handler using start and stop methods
class ParseEventRequest(NamedTuple):
'''
Event identifier should be enough to get it contents from storage
'''
event: int
class ParseEventHandler(Handler):
@classmethod
async def create(cls) -> Self:
'''
define your own handler and dependency factory
'''
return cls()
@classmethod
async def route(cls, *requests: ParseEventRequest) -> list[Route]:
'''
override default single thread tube
'''
return [Route(0, 0) for request in requests]
async def start(self):
'''
run any code on worker is bind to the queue
'''
async def perform(self, *requests: ParseEventRequest):
'''
the handler
'''
async def handle(self, *requests: ParseEventRequest) -> None:
'''
process requests batch
'''
async def stop(self):
'''
run any code when queue is empty and worker stops processing thread
'''
Deduplication
There is an optional if_not_exists flag. If it is set, request will be registered only if not persent in a queue
await queue.register(SycBucket, Bucket(7756527), if_not_exists=True)
await queue.register(SycBucket, Bucket(7756527), if_not_exists=True)
Deferred handlers
You can use built-in deferred request handler to defer call
await queue.register(Housekeep, Room(402), defer=5) # numeric means seconds
await queue.register(Housekeep, Room(324), defer=timedelta(minutes=15))
Recurrent handlers
You can use built-in recurrent request handler for regular request registration
await queue.register(BalanceCheck, Company('basis'), recurrent=timedelta(hours=1))
Performance
Performance dependends on many factors, we can only measure clean library overhead with in-memory storages. You can run performance on your hardware with pytest -s, with this option performance test will print result for different cases. Perfomance test on intel i5-4670K, Ubuntu 23.04 LTS using Python 3.11.4 gives us about 200_000 rps for batch request registration with sharding and about 600_000 requests for request handling in concurrent mode.
Advanced queue configuration
You can configure sharded queue using env
QUEUE_LOCK_PREFIX = 'lock_'
Lock key prefixQUEUE_LOCK_TIMEOUT = 24 * 60 * 60
Lock key ttlQUEUE_STORAGE_PREFIX = 'tube_'
Default queue prefixQUEUE_WORKER_ACQUIRE_DELAY = 1
Worker acquire delay in seconds on empty queuesQUEUE_WORKER_BATCH_SIZE = 128
Worker batch processing sizeQUEUE_WORKER_DEFERRED_RETRY_DELAY = 1
Deferred tasks retry delayQUEUE_WORKER_EMPTY_LIMIT = 16
Worker empty queue attempt limit berfore queue rebindQUEUE_WORKER_EMPTY_PAUSE = 0.1
Worker pause in seconds on empty queueQUEUE_WORKER_RECURRENT_CHECK_INTERVAL = 30
Recurrent interval check in secondsQUEUE_WORKER_RECURRENT_TASKS_LIMIT = 1024
Recurrent tasks limit count
You can change runtime settings
from sharded_queue import Queue, Worker
from sharded_queue.drivers import RuntimeLock, RuntimeStorage
worker = Worker(RuntimeLock(), Queue(RuntimeStorage()))
worker.lock.settings.timeout = 5 * 60
worker.settings.acquire_delay = 5
worker.settings.batch_size = 64
await worker.loop()
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters