Skip to main content

A distributed task queue built with asyncio and redis, with built-in web interface

Project description

ReArq

image image image image

Introduction

ReArq is a distributed task queue with asyncio and redis, which rewrite from arq to make improvement and include web interface.

You can try Demo Online here.

Features

  • AsyncIO support, easy integration with FastAPI.
  • Delay task, cron task and async task support.
  • Full-featured build-in web interface.
  • Built-in distributed task lock to make same task only run one at the same time.
  • Other powerful features to be discovered.

Screenshots

dashboard worker task job result

Requirements

  • Redis >= 5.0

Install

Use MySQL backend:

pip install rearq[mysql]

Use PostgreSQL backend:

pip install rearq[postgres]

Quick Start

Task Definition

# main.py
from rearq import ReArq

rearq = ReArq(db_url='mysql://root:123456@127.0.0.1:3306/rearq')


@rearq.on_shutdown
async def on_shutdown():
    # you can do some clean work here like close db and so on...
    print("shutdown")


@rearq.on_startup
async def on_startup():
    # you should do some initialization work here
    print("startup")
    # you must init Tortoise ORM here
    await Tortoise.init(
        db_url=settings.DB_URL,
        modules={"rearq": ["rearq.server.models"]},
    )


@rearq.task(queue="q1")
async def add(self, a, b):
    return a + b


@rearq.task(cron="*/5 * * * * * *")  # run task per 5 seconds
async def timer(self):
    return "timer"

Run rearq worker

> rearq main:rearq worker -q q1 -q q2 # consume tasks from q1 and q2 as the same time
2021-03-29 09:54:50.464 | INFO     | rearq.worker:_main:95 - Start worker success with queue: rearq:queue:default
2021-03-29 09:54:50.465 | INFO     | rearq.worker:_main:96 - Registered tasks: add, sleep, timer_add
2021-03-29 09:54:50.465 | INFO     | rearq.worker:log_redis_info:86 - redis_version=6.2.1 mem_usage=1.43M clients_connected=5 db_keys=6

Run rearq timer

If you have timing task or delay task, you should run another command also:

> rearq main:rearq timer
2021-03-29 09:54:43.878 | INFO     | rearq.worker:_main:275 - Start timer success
2021-03-29 09:54:43.887 | INFO     | rearq.worker:_main:277 - Registered timer tasks: timer_add
2021-03-29 09:54:43.894 | INFO     | rearq.worker:log_redis_info:86 - redis_version=6.2.1 mem_usage=1.25M clients_connected=2 db_keys=6

Also, you can run timer with worker together by rearq main:rearq worker -t.

Integration in FastAPI

from fastapi import FastAPI

app = FastAPI()


@app.on_event("startup")
async def startup() -> None:
    await Tortoise.init(
        db_url=settings.DB_URL,
        modules={"rearq": ["rearq.server.models"]},
    )


@app.on_event("shutdown")
async def shutdown() -> None:
    await rearq.close()


# then run task in view
@app.get("/test")
async def test():
    job = await add.delay(args=(1, 2))
    # or
    job = await add.delay(kwargs={"a": 1, "b": 2})
    # or
    job = await add.delay(1, 2)
    # or
    job = await add.delay(a=1, b=2)
    result = await job.result(timeout=5)  # wait result for 5 seconds
    print(result.result)
    return result

Start web interface

> rearq main:rearq server
Usage: rearq server [OPTIONS]

  Start rest api server.

Options:
  --host TEXT         Listen host.  [default: 0.0.0.0]
  -p, --port INTEGER  Listen port.  [default: 8000]
  -h, --help          Show this message and exit..

After server run, you can visit https://127.0.0.1:8000/docs to see all apis and https://127.0.0.1:8000 to see web interface.

Other options will pass into uvicorn directly, such as --root-path etc.

rearq main:rearq server --host 0.0.0.0 --root-path /rearq

Mount as FastAPI sub app

You can also mount rearq server as FastAPI sub app.

from fastapi import FastAPI

from examples.tasks import rearq
from rearq.server.app import app as rearq_app

app = FastAPI()

app.mount("/rearq", rearq_app)
rearq_app.set_rearq(rearq)

Start worker inside app

You can also start worker inside your app.

@app.on_event("startup")
async def startup():
    await rearq.init()
    await rearq_app.start_worker(with_timer=True, block=False)

ThanksTo

  • arq, Fast job queuing and RPC in python with asyncio and redis.

License

This project is licensed under the Apache-2.0 License.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

rearq-0.2.9.tar.gz (33.1 kB view details)

Uploaded Source

Built Distribution

rearq-0.2.9-py3-none-any.whl (49.1 kB view details)

Uploaded Python 3

File details

Details for the file rearq-0.2.9.tar.gz.

File metadata

  • Download URL: rearq-0.2.9.tar.gz
  • Upload date:
  • Size: 33.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.0.0 CPython/3.9.18

File hashes

Hashes for rearq-0.2.9.tar.gz
Algorithm Hash digest
SHA256 a7626b8b20243583905b48a998a5b7494f050a95532f89e15cd34996486308f8
MD5 c794271611e1f8ec740c888e495ec8cb
BLAKE2b-256 e5db3296837440cdfc96696e9d754a548dca2d97e65fe0d26c1da8d089646547

See more details on using hashes here.

File details

Details for the file rearq-0.2.9-py3-none-any.whl.

File metadata

  • Download URL: rearq-0.2.9-py3-none-any.whl
  • Upload date:
  • Size: 49.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.0.0 CPython/3.9.18

File hashes

Hashes for rearq-0.2.9-py3-none-any.whl
Algorithm Hash digest
SHA256 4536c04db1bd366dd2c5ba6fea713e7552b592edaa8ea086b2038c26e46cdfae
MD5 d4a0d8e957107e3cb4b980aa83b878e3
BLAKE2b-256 c9a2461249d139492b29fbf1a64161be782eda132cf86c5cab58bf273ab63035

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page