a multi-processes task queue using redis streams.
Project description
pyxqueue
a multi-processes task queue using redis streams.
heavily inspired from: http://charlesleifer.com/blog/multi-process-task-queue-using-redis-streams/
install
pip install pyxqueue
Usage
background task
import time
import redis
from pyxqueue import TaskQueue
client = redis.Redis()
queue = TaskQueue(client, stream_key='background-task')
@queue.task
def sleep(n):
print(f'going to sleep for {n} seconds')
time.sleep(n)
print(f'woke up after {n} seconds')
if __name__ == '__main__':
import sys
usage = 'Usage: python background-task.py (worker | test)'
if len(sys.argv) != 2:
print(usage)
elif sys.argv[1] == 'worker':
queue.run()
elif sys.argv[1] == 'test':
sleep(2)
else:
print(usage)
rpc
import redis
from pyxqueue import TaskQueue
client = redis.Redis()
queue = TaskQueue(client, stream_key='rpc')
@queue.task
def fib(n):
a, b = 0, 1
for _ in range(n):
a, b = b, a + b
return b
if __name__ == '__main__':
import sys
usage = 'Usage: python rpc.py (worker | test)'
if len(sys.argv) != 2:
print(usage)
elif sys.argv[1] == 'worker':
queue.run()
elif sys.argv[1] == 'test':
fib_100k_result = fib(100_000)
print(f'100kth fib number starts ends with: {str(fib_100k_result.get())[-6:]}')
else:
print(usage)
progress
import time
import redis
from pyxqueue import TaskQueue
client = redis.Redis()
queue = TaskQueue(client, stream_key='progress')
@queue.task
def long_work():
for i in range(100):
queue.update_task_progress(i)
time.sleep(1)
if __name__ == '__main__':
import sys
usage = 'Usage: python progress.py (worker | test)'
if len(sys.argv) != 2:
print(usage)
elif sys.argv[1] == 'worker':
queue.run()
elif sys.argv[1] == 'test':
long_work()
else:
print(usage)
to get task progress:
queue.get_task_progress(task_id)
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Filename, size | File type | Python version | Upload date | Hashes |
---|---|---|---|---|
Filename, size pyxqueue-0.0.13-py2.py3-none-any.whl (5.7 kB) | File type Wheel | Python version py2.py3 | Upload date | Hashes View |
Filename, size pyxqueue-0.0.13.tar.gz (4.8 kB) | File type Source | Python version None | Upload date | Hashes View |
Close
Hashes for pyxqueue-0.0.13-py2.py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 93565a20d0f99610ffd022799ba3a51c15985a021cd49e862d0d2ed975bf0489 |
|
MD5 | 96ffe865dbccaeb577af7168f31485a3 |
|
BLAKE2-256 | 938b8608fac5ac790fa9e90bb40c9b43786b8a652a0f98f7f18be44d5fabaa60 |