Skip to main content

High reliability asynchronous queue using mysql(lock)

Project description

jasyncq

PyPI version

Asynchronous task queue using mysql

You should know

  • Dispatcher's fetch_scheduled_tasks and fetch_pending_tasks method takes scheduled job and concurrently update their status as WORK IN PROGRESS in same transaction
  • Most of tasks that queued in jasyncq would run in exactly once by fetch_scheduled_tasks BUT, some cases job disappeared because of worker shutdown while working. It could be restored by fetch_pending_tasks (that can check how long worker tolerate WIP-ed but not Completed(deleted row))

How to use

1. Create aiomysql connection pool

import asyncio
import logging

import aiomysql

loop = asyncio.get_event_loop()

pool = await aiomysql.create_pool(
    host='127.0.0.1',
    port=3306,
    user='root',
    db='test',
    loop=loop,
    autocommit=False,
)

2. Generate topic (table) with initialize and inject repository to dispatcher

from jasyncq.dispatcher.tasks import TasksDispatcher
from jasyncq.repository.tasks import TaskRepository

repository = TaskRepository(pool=pool, topic_name='test_topic')
await repository.initialize()
dispatcher = TasksDispatcher(repository=repository)

3. Enjoy queue

  • Publish tasks
await dispatcher.apply_tasks(
    tasks=[...list of jasyncq.dispatcher.model.task.TaskIn...],
)
  • Consume tasks
scheduled_tasks = await dispatcher.fetch_scheduled_tasks(queue_name='QUEUE_TEST', limit=10)
pending_tasks = await dispatcher.fetch_pending_tasks(
    queue_name='QUEUE_TEST',
    limit=10,
    check_term_seconds=60,
)
tasks = [*pending_tasks, *scheduled_tasks]
# ...RUN JOBS WITH tasks

4. Complete tasks

task_ids = [str(task.uuid) for task in tasks]
await dispatcher.complete_tasks(task_ids=task_ids)

Other features

Apply tasks with dependency

genesis = TaskIn(task={}, queue_name=queue_name)
dependent = TaskIn(task={}, queue_name=queue_name, depend_on=task.uuid)
# 'dependent' task might fetched after 'genesis' task is completed
await dispatcher.apply_tasks(tasks=[genesis, dependent])

Apply delayed task(scheduled task)

scheduled_at = time.time() + 60
task = TaskIn(task={}, queue_name=queue_name, scheduled_at=scheduled_at)
# 'task' task might fetched after 60 seconds from now
await dispatcher.apply_tasks(tasks=[task])

Apply urgent task (priority)

normal = TaskIn(task={}, queue_name=queue_name)
urgent = TaskIn(task={}, queue_name=queue_name, is_urgent=True)
# 'urgent' task might fetched earlier than 'normal' task if queue was already fulled
await dispatcher.apply_tasks(tasks=[normal, urgent])

Fetching with ignoring dependency

scheduled_tasks = await dispatcher.fetch_scheduled_tasks(
    queue_name='QUEUE_TEST',
    limit=10,
    ignore_dependency=True,
)
pending_tasks = await dispatcher.fetch_pending_tasks(
    queue_name='QUEUE_TEST',
    limit=10,
    check_term_seconds=60,
    ignore_dependency=True,
)
tasks = [*pending_tasks, *scheduled_tasks]
# ...RUN JOBS WITH tasks

Example

  • Consumer: /example/consumer.py
  • Producer: /example/producer.py

Run example scripts

$ docker run --name test_db -p 3306:3306 -e MYSQL_ALLOW_EMPTY_PASSWORD=true -d mysql:8.0.17
$ docker exec -it test_db bash -c 'mysql -u root -e "create database test;"'
$ python3 -m example.producer
$ python3 -m example.consumer

Build

$ python3 setup.py sdist
$ python3 -m pip install ./dist/jasyncq-*

Deploy

$ twine upload ./dist/jasyncq-{version}.tar.gz

Test

$ docker run --name test_db -p 3306:3306 -e MYSQL_ALLOW_EMPTY_PASSWORD=true -d mysql:8.0.17
$ docker exec -it test_db bash -c 'mysql -u root -e "create database test;"'
$ python3 -m pip install pytest==6.2.3
$ pytest

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

jasyncq-1.1.2.tar.gz (9.1 kB view details)

Uploaded Source

File details

Details for the file jasyncq-1.1.2.tar.gz.

File metadata

  • Download URL: jasyncq-1.1.2.tar.gz
  • Upload date:
  • Size: 9.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.1.1 pkginfo/1.5.0.1 requests/2.22.0 setuptools/46.1.1 requests-toolbelt/0.9.1 tqdm/4.41.0 CPython/3.7.5

File hashes

Hashes for jasyncq-1.1.2.tar.gz
Algorithm Hash digest
SHA256 5c94687ce8f1b2d202663656b14578c534cd49df44e1665ea00bedd6877ef7d5
MD5 7896566ec1c06492a97c731931e2146b
BLAKE2b-256 3a2bc876a8d5bc7fe9a84d346f06ecc2c5b24af4010854e444fe880104b159c9

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page