Skip to main content

Distributed async task/job queue

Project description

Package version

Oxalis

Distributed async task/job queue, like Celery for asyncio world

Feature

  • Redis and AMQP(RabbitMQ etc.) support
  • Task timeout and concurrency limit support
  • Delayed task(Both Redis and RabbitMQ) support
  • Cron task/job beater
  • Built-in coroutine pool with concurrency and time limit

Install

pip install oxalis

Example with Redis backend

Define task:

from redis.asyncio.client import Redis
from oxalis.redis import Oxalis


oxalis = Oxalis(Redis(host=os.getenv("REDIS_HOST", "redis")))

@oxalis.register()
async def hello_task():
    print("Hello oxalis")

Run worker(consumer):

oxalis.run_worker_master()
python ex.py
INFO:oxalis:Registered Task: <Task(hello_task)>
INFO:oxalis:Run worker: <Oxalis(pid-101547)>...
INFO:oxalis:Run worker: <Oxalis(pid-101548)>...
INFO:oxalis:Run worker: <Oxalis(pid-101549)>...
INFO:oxalis:Run worker: <Oxalis(pid-101550)>...
INFO:oxalis:Run worker: <Oxalis(pid-101551)>...
INFO:oxalis:Run worker: <Oxalis(pid-101552)>...
INFO:oxalis:Run worker: <Oxalis(pid-101554)>...

Run client(producer):

import asyncio

asyncio.get_event_loop().run_until_complete(oxalis.connect())
for i in range(10):
    asyncio.get_event_loop().run_until_complete(hello_task.delay())
    asyncio.get_event_loop().run_until_complete(hello_task.delay(_delay=1))  # delay execution after 1s

Run cron beater:

from oxalis.beater import Beater

beater = Beater(oxalis)

beater.register("*/1 * * * *", hello_task)
beater.run()
python exb.py 
INFO:oxalis:Beat task: <Task(hello_task)> at <*/1 * * * *> ...

TaskCodec

The TaskCodec will encode/decode task args, default codec will use json

Custom task codec:

from oxalis.base import TaskCodec

class MyTaskCodec(TaskCodec):
    @classmethod
    def encode(
        cls,
        task: Task,
        task_args: tp.Sequence[tp.Any],
        task_kwargs: tp.Dict[str, tp.Any],
    ) -> bytes:
        ...

    @classmethod
    def decode(cls, content: bytes) -> TaskCodec.MESSAGE_TYPE:
        ...



oxalis = Oxalis(Redis(host=os.getenv("REDIS_HOST", "redis")), task_codec=MyTaskCodec())
...

Task pool

Oxalis use one coroutine pool with concurrency limit and timeout limit to run all task

Custom pool:

from redis.asyncio.client import Redis
from oxalis.redis import Oxalis
from oxalis.pool import Pool

oxalis = Oxalis(Redis(host=os.getenv("REDIS_HOST", "redis")), pool=Pool(concurrency=10, timeout=60))
  • For Redis task, the queue will be blocked util pool is not fully loaded
  • For AMQP task, oxalis use AMQP's QOS to limit worker concurrency(pool's concurrency will be -1 which means the pool's concurrency will not be limited)
  • asyncio.TimeoutError will be raised if one task is timeout
  • Every worker process has owned limited pool

Specified one task timeout limit:

@oxalis.register(queue=custom_queue, timeout=10)
def custom_task():
    print("Hello oxalis")

Custom hook

Oxalis defined some hook API for inherited subclass:

class MyOxalis(Oxalis):
    def on_worker_init():
        # will be called before worker started
        pass

    def on_worker_close():
        # will be called after worker started
        pass

Some API can be rewritten or inherited for custom usage, eg:

import sentry_sdk

class MyOxalis(Oxalis):
    async def exec_task(self, task: Task, *task_args, **task_kwargs):
        """
        capture exception to sentry
        """
        try:
            await super().exec_task(task, *task_args, **task_kwargs)
        except Exception as e:
            sentry_sdk.capture_exception(e)

Redis Backend Detail

Oxalis use redis's list and pubsub structure as a message queue

Queue

Custom queue:

from oxalis.redis import Queue, PubsubQueue

custom_queue = Queue("custom")
bus_queue = PubsubQueue("bus")

Register task:

@oxalis.register(queue=custom_queue)
def custom_task():
    print("Hello oxalis")

@oxalis.register(queue=bus_queue)
def bus_task():
    print("Hello oxalis")
  • For task producer, the task will send to specified queue when call task.delay()
  • For task consumer, oxalis will listen those queues and receive task from them

Concurrency limit

Oxalis using coroutine pool's concurrency limit way, we can set different concurrency limit with specified pool for one task:

@oxalis.register(pool=Pool(concurrency=1))
def custom_task():
    print("Hello oxalis")

Delayed task

Support by redis zset

AMQP Backend Detail

Custom Queue and Exchange

Oxalis using AMQP's way to define Exchange, Queue and their bindings

import asyncio
import logging
import time

from aio_pika import RobustConnection
from oxalis.amqp import Exchange, ExchangeType, Oxalis, Pool, Queue

e = Exchange("test")
q = Queue("test", durable=False)
e2 = Exchange("testfanout", type=ExchangeType.FANOUT)
q2 = Queue("testfanout", durable=False)


oxalis = Oxalis(RobustConnection("amqp://root:letmein@rabbitmq:5672/"))
oxalis.register_binding(q, e, "test")
oxalis.register_binding(q2, e2, "")
oxalis.register_queues([q, q2])


@oxalis.register(exchange=e, routing_key="test")
async def task1():
    await asyncio.sleep(1)
    print("hello oxalis")


@oxalis.register(exchange=e2)
async def task2():
    await asyncio.sleep(10)
    print("hello oxalis")
  • For producer, task oxalis.register defined one task message will send to which exchange(by routing key)
  • For consumer, register_queues defined which queues oxalis will listened
  • Task routing defined by bindings

Concurrency limit

Oxalis use AMQP's QOS to limit worker concurrency(Task's ack_later should be true), so coroutine pool's concurrency should not be limited.

Custom queue QOS:

oxalis = Oxalis(RobustConnection("amqp://root:letmein@rabbitmq:5672/"), default_queue=Queue("custom",consumer_prefetch_count=10))
...
fanout_queue = Queue("testfanout", durable=False, consumer_prefetch_count=3)
oxalis.register_queues([fanout_queue])
...

Custom task behavior

Define task how to perform ack and reject

# always ack even task failed(raise exception)
@oxalis.register(ack_always=True, reject=False)
async def task2():
    await asyncio.sleep(10)
    print("hello oxalis")

#  reject with requeue when task failed
@oxalis.register(reject_requeue=True)
async def task2():
    await asyncio.sleep(10)
    print("hello oxalis")

Delayed task

Support by RabbitMq's plugin

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

oxalis-0.6.6.tar.gz (13.9 kB view details)

Uploaded Source

Built Distribution

oxalis-0.6.6-py3-none-any.whl (15.9 kB view details)

Uploaded Python 3

File details

Details for the file oxalis-0.6.6.tar.gz.

File metadata

  • Download URL: oxalis-0.6.6.tar.gz
  • Upload date:
  • Size: 13.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.5.1 CPython/3.11.1 Linux/6.2.11-2-pve

File hashes

Hashes for oxalis-0.6.6.tar.gz
Algorithm Hash digest
SHA256 3d93d24cfd355fac76ce45140d6188e185f5552c6f14a9c49532108e103cb2d6
MD5 3c8e11b3ae29ffb4fb25e130518eaaef
BLAKE2b-256 168c851e053a3641dc29773a4ade3460c789d4e98be600b3d0fb31f32ddba340

See more details on using hashes here.

File details

Details for the file oxalis-0.6.6-py3-none-any.whl.

File metadata

  • Download URL: oxalis-0.6.6-py3-none-any.whl
  • Upload date:
  • Size: 15.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.5.1 CPython/3.11.1 Linux/6.2.11-2-pve

File hashes

Hashes for oxalis-0.6.6-py3-none-any.whl
Algorithm Hash digest
SHA256 c3bad2169f26788bf590fba85efbc32e46c0e5c900990e2b47f61e8b0cd05a8a
MD5 3b07f839599b203db21bc19d49e5645e
BLAKE2b-256 a8d22d2a5f49896988253b058cb0e4aa68bba0394b0f0305876d618501425635

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page