A distributed task queue built with asyncio and redis, with built-in web interface
Project description
Introduction
narq is a distributed task queue with asyncio and redis, which is built upon ReArq (itself a rewrite of arq)
Motivations
This project is an independent fork of ReArq because it is fundamentally different in its goals. Narq is intended as a simple to reason about production-grade task queue.
Features
- AsyncIO support, easy integration with FastAPI.
- Delayed tasks, cron tasks and async task support.
- Full-featured built-in web interface.
- Built-in distributed task lock to ensure a given task is ran one at a time.
- Other powerful features to be discovered.
Web Interface
Requirements
- Redis >= 5.0
Quick Start
Task Definition
# main.py
from narq import Narq
narq = Narq(db_url='mysql://root:123456@127.0.0.1:3306/narq')
@narq.on_shutdown
async def on_shutdown():
# you can do some clean up work here like close db and so on...
print("shutdown")
@narq.on_startup
async def on_startup():
# you can do some initialization work here
print("startup")
@narq.task(queue="q1")
async def add(self, a, b):
return a + b
@narq.task(cron="*/5 * * * * * *") # run task per 5 seconds
async def timer(self):
return "timer"
Run narq worker
> narq main:narq worker -q q1 -q q2 # consume tasks from q1 and q2 as the same time
2021-03-29 09:54:50.464 | INFO | narq.worker:_main:95 - Started worker successfully on queue: narq:queue:default
2021-03-29 09:54:50.465 | INFO | narq.worker:_main:96 - Registered tasks: add, sleep, timer_add
2021-03-29 09:54:50.465 | INFO | narq.worker:log_redis_info:86 - redis_version=6.2.1 mem_usage=1.43M clients_connected=5 db_keys=6
Run narq timer
If you have timing task or delay task, you should run another command also:
> narq main:narq timer
2021-03-29 09:54:43.878 | INFO | narq.worker:_main:275 - Start timer successfully
2021-03-29 09:54:43.887 | INFO | narq.worker:_main:277 - Registered timer tasks: timer_add
2021-03-29 09:54:43.894 | INFO | narq.worker:log_redis_info:86 - redis_version=6.2.1 mem_usage=1.25M clients_connected=2 db_keys=6
Also, you can run timer with worker together by narq main:narq worker -t
.
Integration with FastAPI
from fastapi import FastAPI
app = FastAPI()
@app.on_event("shutdown")
async def shutdown() -> None:
await narq.close()
# then run task in view
@app.get("/test")
async def test():
job = await add.delay(args=(1, 2))
# or
job = await add.delay(kwargs={"a": 1, "b": 2})
# or
job = await add.delay(1, 2)
# or
job = await add.delay(a=1, b=2)
result = await job.result(timeout=5) # wait result for 5 seconds
print(result.result)
return result
Start web interface
> narq main:narq server
Usage: narq server [OPTIONS]
Start rest api server.
Options:
--host TEXT Listen host. [default: 0.0.0.0]
-p, --port INTEGER Listen port. [default: 8000]
-h, --help Show this message and exit..
After starting the server, check https://127.0.0.1:8000/docs to see all endpoints and https://127.0.0.1:8000 to use the web interface.
Other options will be passed into uvicorn
directly, such as --root-path
etc.
narq main:narq server --host 0.0.0.0 --root-path /narq
Mount as FastAPI sub app
If you have an existing FastAPI service, to simplify your deployment you might want to mount the narq server as a FastAPI sub app.
from fastapi import FastAPI
from examples.tasks import narq
from narq.server.app import app as narq
app = FastAPI()
app.mount("/narq", narq_app)
narq_app.set_narq(narq)
Start worker inside app
You can also start worker inside your app.
@app.on_event("startup")
async def startup():
await narq.init()
await narq.start_worker(with_timer=True, block=False)
ThanksTo
- arq, Fast job queuing and RPC in python with asyncio and redis.
- ReArq, Improved arq rewrite with an API + web interface
License
This project is licensed under the Apache-2.0 License.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file narq-0.2.1.tar.gz
.
File metadata
- Download URL: narq-0.2.1.tar.gz
- Upload date:
- Size: 33.3 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/4.0.2 CPython/3.9.18
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | c08321e8396d33902f556256c901850757af6bb7ec7b5dcea081c02b7e0ad11a |
|
MD5 | 3dd6d4716bdf18bd8f734fd4905bac56 |
|
BLAKE2b-256 | 57bdcf7f1994fdb266976948915cde1282b6e9270d8dc54ecca39ff64fecacdf |
File details
Details for the file narq-0.2.1-py3-none-any.whl
.
File metadata
- Download URL: narq-0.2.1-py3-none-any.whl
- Upload date:
- Size: 49.3 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/4.0.2 CPython/3.9.18
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 7177619650bbc9e3b549f2fd362e6d42abdbc6bb06b8dee783a89cb8e1c29737 |
|
MD5 | e352f9f551c1ddf86fbd7e739fe127e0 |
|
BLAKE2b-256 | 6b4939c375ea8a07caa516e2ef4e7b6432119ff53937cf6f00be2a1adbab1e2c |