Skip to main content

A distributed task queue built with asyncio and redis, with built-in web interface

Project description

narq

image image image image

Introduction

narq is a distributed task queue with asyncio and redis, which is built upon ReArq (itself a rewrite of arq)

Motivations

This project is an independent fork of ReArq because it is fundamentally different in its goals. Narq is intended as a simple to reason about production-grade task queue.

Features

  • AsyncIO support, easy integration with FastAPI.
  • Delayed tasks, cron tasks and async task support.
  • Full-featured built-in web interface.
  • Built-in distributed task lock to ensure a given task is ran one at a time.
  • Other powerful features to be discovered.

Web Interface

dashboard

Requirements

  • Redis >= 5.0

Quick Start

Task Definition

# main.py
from narq import Narq

narq = Narq(db_url='mysql://root:123456@127.0.0.1:3306/narq')


@narq.on_shutdown
async def on_shutdown():
    # you can do some clean up work here like close db and so on...
    print("shutdown")


@narq.on_startup
async def on_startup():
    # you can do some initialization work here
    print("startup")


@narq.task(queue="q1")
async def add(self, a, b):
    return a + b


@narq.task(cron="*/5 * * * * * *")  # run task per 5 seconds
async def timer(self):
    return "timer"

Run narq worker

> narq main:narq worker -q q1 -q q2 # consume tasks from q1 and q2 as the same time
2021-03-29 09:54:50.464 | INFO     | narq.worker:_main:95 - Started worker successfully on queue: narq:queue:default
2021-03-29 09:54:50.465 | INFO     | narq.worker:_main:96 - Registered tasks: add, sleep, timer_add
2021-03-29 09:54:50.465 | INFO     | narq.worker:log_redis_info:86 - redis_version=6.2.1 mem_usage=1.43M clients_connected=5 db_keys=6

Run narq timer

If you have timing task or delay task, you should run another command also:

> narq main:narq timer
2021-03-29 09:54:43.878 | INFO     | narq.worker:_main:275 - Start timer successfully
2021-03-29 09:54:43.887 | INFO     | narq.worker:_main:277 - Registered timer tasks: timer_add
2021-03-29 09:54:43.894 | INFO     | narq.worker:log_redis_info:86 - redis_version=6.2.1 mem_usage=1.25M clients_connected=2 db_keys=6

Also, you can run timer with worker together by narq main:narq worker -t.

Integration with FastAPI

from fastapi import FastAPI

app = FastAPI()


@app.on_event("shutdown")
async def shutdown() -> None:
    await narq.close()


# then run task in view
@app.get("/test")
async def test():
    job = await add.delay(args=(1, 2))
    # or
    job = await add.delay(kwargs={"a": 1, "b": 2})
    # or
    job = await add.delay(1, 2)
    # or
    job = await add.delay(a=1, b=2)
    result = await job.result(timeout=5)  # wait result for 5 seconds
    print(result.result)
    return result

Start web interface

> narq main:narq server
Usage: narq server [OPTIONS]

  Start rest api server.

Options:
  --host TEXT         Listen host.  [default: 0.0.0.0]
  -p, --port INTEGER  Listen port.  [default: 8000]
  -h, --help          Show this message and exit..

After starting the server, check https://127.0.0.1:8000/docs to see all endpoints and https://127.0.0.1:8000 to use the web interface.

Other options will be passed into uvicorn directly, such as --root-path etc.

narq main:narq server --host 0.0.0.0 --root-path /narq

Mount as FastAPI sub app

If you have an existing FastAPI service, to simplify your deployment you might want to mount the narq server as a FastAPI sub app.

from fastapi import FastAPI

from examples.tasks import narq
from narq.server.app import app as narq

app = FastAPI()

app.mount("/narq", narq_app)
narq_app.set_narq(narq)

Start worker inside app

You can also start worker inside your app.

@app.on_event("startup")
async def startup():
    await narq.init()
    await narq.start_worker(with_timer=True, block=False)

ThanksTo

  • arq, Fast job queuing and RPC in python with asyncio and redis.
  • ReArq, Improved arq rewrite with an API + web interface

License

This project is licensed under the Apache-2.0 License.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

narq-0.2.0.tar.gz (33.3 kB view details)

Uploaded Source

Built Distribution

narq-0.2.0-py3-none-any.whl (49.2 kB view details)

Uploaded Python 3

File details

Details for the file narq-0.2.0.tar.gz.

File metadata

  • Download URL: narq-0.2.0.tar.gz
  • Upload date:
  • Size: 33.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.2 CPython/3.9.18

File hashes

Hashes for narq-0.2.0.tar.gz
Algorithm Hash digest
SHA256 46e3a586656fce51a821c3c997a5b059415563239f0f0015bdfa4184f7408874
MD5 de366baf4ab777bd6f4667ecd2395a8a
BLAKE2b-256 0cc8180e9a4ea97bdf09ac63f4952c694fa8e14500614b04809f0d606c43b4c9

See more details on using hashes here.

File details

Details for the file narq-0.2.0-py3-none-any.whl.

File metadata

  • Download URL: narq-0.2.0-py3-none-any.whl
  • Upload date:
  • Size: 49.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.2 CPython/3.9.18

File hashes

Hashes for narq-0.2.0-py3-none-any.whl
Algorithm Hash digest
SHA256 d2a66c71ae66b0853177cb67c84148e3d627dc27613ab9cfa496dfb310ff9de9
MD5 85f5b4a56960c53c19115b54fa532c0d
BLAKE2b-256 3895225dea027d896a407a90859e28b1c6884b431c23383e893517d9c592d870

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page