Skip to main content

A distributed task queue built with asyncio and redis, with built-in web interface

Project description

narq

image image image image

Introduction

narq is a distributed task queue with asyncio and redis, which is built upon ReArq (itself a rewrite of arq)

Motivations

This project is an independent fork of ReArq because it is fundamentally different in its goals. Narq is intended as a simple to reason about production-grade task queue.

Features

  • AsyncIO support, easy integration with FastAPI.
  • Delayed tasks, cron tasks and async task support.
  • Full-featured built-in web interface.
  • Built-in distributed task lock to ensure a given task is ran one at a time.
  • Other powerful features to be discovered.

Web Interface

dashboard

Requirements

  • Redis >= 5.0

Quick Start

Task Definition

# main.py
from narq import Narq

narq = Narq(db_url='mysql://root:123456@127.0.0.1:3306/narq')


@narq.on_shutdown
async def on_shutdown():
    # you can do some clean up work here like close db and so on...
    print("shutdown")


@narq.on_startup
async def on_startup():
    # you can do some initialization work here
    print("startup")


@narq.task(queue="q1")
async def add(self, a, b):
    return a + b


@narq.task(cron="*/5 * * * * * *")  # run task per 5 seconds
async def timer(self):
    return "timer"

Run narq worker

> narq main:narq worker -q q1 -q q2 # consume tasks from q1 and q2 as the same time
2021-03-29 09:54:50.464 | INFO     | narq.worker:_main:95 - Started worker successfully on queue: narq:queue:default
2021-03-29 09:54:50.465 | INFO     | narq.worker:_main:96 - Registered tasks: add, sleep, timer_add
2021-03-29 09:54:50.465 | INFO     | narq.worker:log_redis_info:86 - redis_version=6.2.1 mem_usage=1.43M clients_connected=5 db_keys=6

Run narq timer

If you have timing task or delay task, you should run another command also:

> narq main:narq timer
2021-03-29 09:54:43.878 | INFO     | narq.worker:_main:275 - Start timer successfully
2021-03-29 09:54:43.887 | INFO     | narq.worker:_main:277 - Registered timer tasks: timer_add
2021-03-29 09:54:43.894 | INFO     | narq.worker:log_redis_info:86 - redis_version=6.2.1 mem_usage=1.25M clients_connected=2 db_keys=6

Also, you can run timer with worker together by narq main:narq worker -t.

Integration with FastAPI

from fastapi import FastAPI

app = FastAPI()


@app.on_event("shutdown")
async def shutdown() -> None:
    await narq.close()


# then run task in view
@app.get("/test")
async def test():
    job = await add.delay(args=(1, 2))
    # or
    job = await add.delay(kwargs={"a": 1, "b": 2})
    # or
    job = await add.delay(1, 2)
    # or
    job = await add.delay(a=1, b=2)
    result = await job.result(timeout=5)  # wait result for 5 seconds
    print(result.result)
    return result

Start web interface

> narq main:narq server
Usage: narq server [OPTIONS]

  Start rest api server.

Options:
  --host TEXT         Listen host.  [default: 0.0.0.0]
  -p, --port INTEGER  Listen port.  [default: 8000]
  -h, --help          Show this message and exit..

After starting the server, check https://127.0.0.1:8000/docs to see all endpoints and https://127.0.0.1:8000 to use the web interface.

Other options will be passed into uvicorn directly, such as --root-path etc.

narq main:narq server --host 0.0.0.0 --root-path /narq

Mount as FastAPI sub app

If you have an existing FastAPI service, to simplify your deployment you might want to mount the narq server as a FastAPI sub app.

from fastapi import FastAPI

from examples.tasks import narq
from narq.server.app import app as narq

app = FastAPI()

app.mount("/narq", narq_app)
narq_app.set_narq(narq)

Start worker inside app

You can also start worker inside your app.

@app.on_event("startup")
async def startup():
    await narq.init()
    await narq.start_worker(with_timer=True, block=False)

ThanksTo

  • arq, Fast job queuing and RPC in python with asyncio and redis.
  • ReArq, Improved arq rewrite with an API + web interface

License

This project is licensed under the Apache-2.0 License.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

narq-0.2.1.tar.gz (33.3 kB view details)

Uploaded Source

Built Distribution

narq-0.2.1-py3-none-any.whl (49.3 kB view details)

Uploaded Python 3

File details

Details for the file narq-0.2.1.tar.gz.

File metadata

  • Download URL: narq-0.2.1.tar.gz
  • Upload date:
  • Size: 33.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.2 CPython/3.9.18

File hashes

Hashes for narq-0.2.1.tar.gz
Algorithm Hash digest
SHA256 c08321e8396d33902f556256c901850757af6bb7ec7b5dcea081c02b7e0ad11a
MD5 3dd6d4716bdf18bd8f734fd4905bac56
BLAKE2b-256 57bdcf7f1994fdb266976948915cde1282b6e9270d8dc54ecca39ff64fecacdf

See more details on using hashes here.

File details

Details for the file narq-0.2.1-py3-none-any.whl.

File metadata

  • Download URL: narq-0.2.1-py3-none-any.whl
  • Upload date:
  • Size: 49.3 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.2 CPython/3.9.18

File hashes

Hashes for narq-0.2.1-py3-none-any.whl
Algorithm Hash digest
SHA256 7177619650bbc9e3b549f2fd362e6d42abdbc6bb06b8dee783a89cb8e1c29737
MD5 e352f9f551c1ddf86fbd7e739fe127e0
BLAKE2b-256 6b4939c375ea8a07caa516e2ef4e7b6432119ff53937cf6f00be2a1adbab1e2c

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page