a simple but robust task queue
Project description
delayed
Delayed is a simple but robust task queue inspired by rq.
Features
- Robust: all the enqueued tasks will run exactly once, even if the worker got killed at any time.
- Clean: finished tasks (including failed) take no space of your Redis.
- Distributed: workers as more as needed can run in the same time without further config.
- Portable: its Go and Python version can call each other.
Requirements
- Python 3.7 or later, tested on CPython 3.7 - 3.12. Versions before 1.0 have also been tested on CPython 2.7, PyPy and PyPy3.
- To gracefully stop the workers, Unix-like systems (with Unix signal) are required, tested on Ubuntu 22.04 and macOS Monterey 12.
- Redis 2.6.0 or later (with Lua scripts).
Getting started
-
Run a redis server:
$ redis-server
-
Install delayed:
$ pip install delayed
-
Create a task queue:
import redis from delayed.queue import Queue conn = redis.Redis() queue = Queue(name='default', conn=conn)
-
Enqueue tasks:
-
Four ways to enqueue Python tasks:
-
Define a task function and enqueue it:
from delayed.delay import delayed delayed = delayed(queue) i = 0 @delayed def delayed_add(a, b): return a + b @delayed(retry=3) def retry_div(x): global i i += 1 return x / (i - 1) delayed_add.delay(1, 2) # enqueue delayed_add delayed_add.delay(1, b=2) # same as above delayed_add(1, 2) # call it immediately retry_div.delay(1) # enqueue retry_div
-
Directly enqueue a function:
from delayed.delay import delayed delayed = delayed(queue) def add(a, b): return a + b delayed(add).delay(1, 2) delayed(add).delay(1, b=2) # same as above delayed(retry=3)(add).delay(1, b=2) delayed(add, retry=3).delay(1, b=2) # same as above
-
Create a task and enqueue it:
from delayed.task import PyTask def add(a, b): return a + b task = PyTask(func=add, args=(1,), kwargs={'b': 2}, retry=1) queue.enqueue(task)
-
Enqueue a predefined task function without importing it (the fastest and lightest way):
from delayed.task import PyTask task = PyTask(func='test:add', args=(1,), kwargs={'b': 2}, retry=1) queue.enqueue(task)
-
-
Enqueue Go tasks:
from delayed.task import GoTask task = GoTask(func_path='syscall.Kill', args=(0, 1)) queue.enqueue(task) task = GoTask(func_path='fmt.Printf', args=('%d %s\n', [1, 'test'])) # the variadic argument needs to be a list or tuple queue.enqueue(task) task = GoTask('fmt.Println', (1, 'test')) # if the variadic argument is the only argument, it's not required to wrap it with a list or tuple queue.enqueue(task)
-
-
Run a task worker (or more) in a separated process:
import redis from delayed.queue import Queue from delayed.worker import Worker conn = redis.Redis() queue = Queue(name='default', conn=conn) worker = Worker(queue=queue) worker.run()
-
Run a task sweeper in a separated process to recovery lost tasks (mainly due to the worker got killed):
import redis from delayed.queue import Queue from delayed.sweeper import Sweeper conn = redis.Redis() queue = Queue(name='default', conn=conn) sweeper = Sweeper(queues=[queue]) sweeper.run()
Examples
See examples.
```bash
$ redis-server &
$ pip install delayed
$ python -m examples.sweeper &
$ python -m examples.worker &
$ python -m examples.caller
```
QA
-
Q: What's the limitation on a task function?
A: A Python task function should be defined in module level (except the__main__
module). Itsargs
andkwargs
should be serializable by MessagePack. After deserializing, the type ofargs
andkwargs
passed to the task function might be changed (tuple -> list), so it should take care of this change. -
Q: What's the
name
param of a queue?
A: It's the key used to store the tasks of the queue. A queue with name "default" will use those keys:- default: list, enqueued tasks.
- default_noti: list, the same length as enqueued tasks.
- default_processing: hash, the processing task of workers.
-
Q: What's lost tasks?
A: There are 2 situations a task might get lost:- a worker popped a task notification, then got killed before dequeueing the task.
- a worker dequeued a task, then got killed before releasing the task.
-
Q: How to recovery lost tasks?
A: Runs a sweeper. It dose two things:- it keeps the task notification length the same as the task queue.
- it checks the processing list, if the worker is dead, moves the processing task back to the task queue.
-
Q: How to turn on the debug logs?
A: Adds alogging.DEBUG
level handler todelayed.logger.logger
. The simplest way is to calldelayed.logger.setup_logger()
:from delayed.logger import setup_logger setup_logger()
Release notes
-
1.2:
- Adds
retry
param to functions wrapped bydelayed.delay()
. - Adds
retry
param toTask()
. - Adds
release
param toQueue.enqueue()
. - The
Worker
won't retry a failed task infinitely by default now. You can setretry=-1
toTask()
instead. (BREAKING CHANGE)
- Adds
-
1.1:
- Adds
log_level
param todelayed.logger.setup_logger()
. - Prevents different online workers have the same id.
- Adds
-
1.0:
- Python 2.7 is not supported anymore. (BREAKING CHANGE)
- Supports Go, adds
GoTask
. - Use MessagePack instead of pickle to serialize / deserialize tasks. (BREAKING CHANGE)
- Removes
ForkedWorker
andPreforkedWorker
. You can useWorker
instead. (BREAKING CHANGE) - Changes params of
Queue()
, removesdefault_timeout
,requeue_timeout
andbusy_len
, addsdequeue_timeout
andkeep_alive_timeout
. (BREAKING CHANGE) - Rename
Task
toPyTask
. (BREAKING CHANGE) - Removes those properties of
PyTask
:id
,func_path
,args
andkwargs
. (BREAKING CHANGE) - Removes those params of
PyTask()
:id
,timeout
,prior
anderror_handler_path
. (BREAKING CHANGE) - Removes
PyTask.create()
. You can usePyTask()
instead. (BREAKING CHANGE) - Rename
func_path
param ofPyTask()
tofunc
, it accepts bothcallable
andstr
. (BREAKING CHANGE) - Removes
delayed.delay()
. Removes params ofdelayed.delayed()
. (BREAKING CHANGE)
-
0.11:
- Sleeps random time when a
Worker
fails to pop atask
before retrying.
- Sleeps random time when a
-
0.10:
- The
Sweeper
can handle multiple queues now. Itsqueue
param has been changed toqueues
. (BREAKING CHANGE) - Changes the separator between
module_path
andfunc_name
from.
to:
. (BREAKING CHANGE)
- The
-
0.9:
- Adds
prior
anderror_handler
params todeleyed.delayed()
, removes itstimeout()
method. (BREAKING CHANGE) - Adds examples.
- Adds
-
0.8:
- The
Task
struct has been changed, it's not compatible with older versions. (BREAKING CHANGE)- Removes
module_name
andfunc_name
fromTask
, addsfunc_path
instead. - Adds
error_handler_path
toTask
.
- Removes
- Removes
success_handler
anderror_handler
fromWorker
. (BREAKING CHANGE)
- The
-
0.7:
- Implements prior task.
-
0.6:
- Adds
dequeued_len()
andindex
toQueue
.
- Adds
-
0.5:
- Adds
delayed.task.set_pickle_protocol_version()
.
- Adds
-
0.4:
- Refactories and fixes bugs.
-
0.3:
- Changes param
second
totimeout
fordelayed.delayed()
. (BREAKING CHANGE) - Adds debug log.
- Changes param
-
0.2:
- Adds
timeout()
todelayed.delayed()
.
- Adds
-
0.1:
- Init version.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file delayed-1.2.0b2.tar.gz
.
File metadata
- Download URL: delayed-1.2.0b2.tar.gz
- Upload date:
- Size: 15.6 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/5.0.0 CPython/3.12.2
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | f894652e725a3ecf204caafc3594408da6b32e2e56e92d7e451c2cfcd48c17dd |
|
MD5 | 6ff080de72524227bd7094b5a9b873d9 |
|
BLAKE2b-256 | 04c4aa6808c623c4b2e6974d9be624fbb29d42d551e9c7ba7939b2748e407ae4 |
File details
Details for the file delayed-1.2.0b2-py2.py3-none-any.whl
.
File metadata
- Download URL: delayed-1.2.0b2-py2.py3-none-any.whl
- Upload date:
- Size: 12.4 kB
- Tags: Python 2, Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/5.0.0 CPython/3.12.2
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | e27269f97f97dd97fc48c01afb5902a2095819aaf239dae62f751796f1f5a60e |
|
MD5 | 635cc30a74b0869397d8663cb46a3519 |
|
BLAKE2b-256 | 7663af9476ec26c3279c859d08d0aab3601f9e49fd0ca960c66c0247481cd5ca |