High reliability asynchronous queue using mysql(lock)
Project description
jasyncq
Asynchronous task queue using mysql
You should know
- Dispatcher's
fetch_scheduled_tasks
andfetch_pending_tasks
method takes scheduled job and concurrently update their status asWORK IN PROGRESS
in same transaction - Most of tasks that queued in jasyncq would run in
exactly once
byfetch_scheduled_tasks
BUT, some cases job disappeared because of worker shutdown while working. It could be restored byfetch_pending_tasks
(that can check how long worker tolerateWIP
-ed but notCompleted
(deleted row))
How to use
1. Create aiomysql connection pool
import asyncio
import logging
import aiomysql
loop = asyncio.get_event_loop()
pool = await aiomysql.create_pool(
host='127.0.0.1',
port=3306,
user='root',
db='test',
loop=loop,
autocommit=False,
)
2. Generate topic (table) with initialize and inject repository to dispatcher
from jasyncq.dispatcher.tasks import TasksDispatcher
from jasyncq.repository.tasks import TaskRepository
repository = TaskRepository(pool=pool, topic_name='test_topic')
await repository.initialize()
dispatcher = TasksDispatcher(repository=repository)
3. Enjoy queue
- Publish tasks
await dispatcher.apply_tasks(
tasks=[...list of jasyncq.dispatcher.model.task.TaskIn...],
)
- Consume tasks
scheduled_tasks = await dispatcher.fetch_scheduled_tasks(queue_name='QUEUE_TEST', limit=10)
pending_tasks = await dispatcher.fetch_pending_tasks(
queue_name='QUEUE_TEST',
limit=10,
check_term_seconds=60,
)
tasks = [*pending_tasks, *scheduled_tasks]
# ...RUN JOBS WITH tasks
4. Complete tasks
task_ids = [str(task.uuid) for task in tasks]
await dispatcher.complete_tasks(task_ids=task_ids)
Other features
Apply tasks with dependency
genesis = TaskIn(task={}, queue_name=queue_name)
dependent = TaskIn(task={}, queue_name=queue_name, depend_on=task.uuid)
# 'dependent' task might fetched after 'genesis' task is completed
await dispatcher.apply_tasks(tasks=[genesis, dependent])
Apply delayed task(scheduled task)
scheduled_at = time.time() + 60
task = TaskIn(task={}, queue_name=queue_name, scheduled_at=scheduled_at)
# 'task' task might fetched after 60 seconds from now
await dispatcher.apply_tasks(tasks=[task])
Apply urgent task (priority)
normal = TaskIn(task={}, queue_name=queue_name)
urgent = TaskIn(task={}, queue_name=queue_name, is_urgent=True)
# 'urgent' task might fetched earlier than 'normal' task if queue was already fulled
await dispatcher.apply_tasks(tasks=[normal, urgent])
Fetching with ignoring dependency
scheduled_tasks = await dispatcher.fetch_scheduled_tasks(
queue_name='QUEUE_TEST',
limit=10,
ignore_dependency=True,
)
pending_tasks = await dispatcher.fetch_pending_tasks(
queue_name='QUEUE_TEST',
limit=10,
check_term_seconds=60,
ignore_dependency=True,
)
tasks = [*pending_tasks, *scheduled_tasks]
# ...RUN JOBS WITH tasks
Example
- Consumer: /example/consumer.py
- Producer: /example/producer.py
Run example scripts
$ docker run --name test_db -p 3306:3306 -e MYSQL_ALLOW_EMPTY_PASSWORD=true -d mysql:8.0.17
$ docker exec -it test_db bash -c 'mysql -u root -e "create database test;"'
$ python3 -m example.producer
$ python3 -m example.consumer
Build
$ python3 setup.py sdist
$ python3 -m pip install ./dist/jasyncq-*
Deploy
$ twine upload ./dist/jasyncq-{version}.tar.gz
Test
$ docker run --name test_db -p 3306:3306 -e MYSQL_ALLOW_EMPTY_PASSWORD=true -d mysql:8.0.17
$ docker exec -it test_db bash -c 'mysql -u root -e "create database test;"'
$ python3 -m pip install pytest==6.2.3
$ pytest
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
jasyncq-1.1.2.tar.gz
(9.1 kB
view details)
File details
Details for the file jasyncq-1.1.2.tar.gz
.
File metadata
- Download URL: jasyncq-1.1.2.tar.gz
- Upload date:
- Size: 9.1 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/3.1.1 pkginfo/1.5.0.1 requests/2.22.0 setuptools/46.1.1 requests-toolbelt/0.9.1 tqdm/4.41.0 CPython/3.7.5
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 5c94687ce8f1b2d202663656b14578c534cd49df44e1665ea00bedd6877ef7d5 |
|
MD5 | 7896566ec1c06492a97c731931e2146b |
|
BLAKE2b-256 | 3a2bc876a8d5bc7fe9a84d346f06ecc2c5b24af4010854e444fe880104b159c9 |