a multi-processes task queue using redis streams.
Project description
pyxqueue
点击中文版 README进入中文文档。
a multi-processes task queue using redis streams.
install
pyxqueue uses some feature of python 3.7, so you should use python 3.7 or newer.
pip install pyxqueue
Usage
background task
import time
import redis
from pyxqueue import TaskQueue
client = redis.Redis()
queue = TaskQueue(client, stream_key='background-task')
@queue.task
def sleep(n):
print(f'going to sleep for {n} seconds')
time.sleep(n)
print(f'woke up after {n} seconds')
if __name__ == '__main__':
import sys
usage = 'Usage: python background-task.py (worker | test)'
if len(sys.argv) != 2:
print(usage)
elif sys.argv[1] == 'worker':
queue.run()
elif sys.argv[1] == 'test':
sleep(2)
else:
print(usage)
rpc
import redis
from pyxqueue import TaskQueue
client = redis.Redis()
queue = TaskQueue(client, stream_key='rpc')
@queue.task
def fib(n):
a, b = 0, 1
for _ in range(n):
a, b = b, a + b
return b
if __name__ == '__main__':
import sys
usage = 'Usage: python rpc.py (worker | test)'
if len(sys.argv) != 2:
print(usage)
elif sys.argv[1] == 'worker':
queue.run()
elif sys.argv[1] == 'test':
fib_100k_result = fib(100_000)
print(f'100kth fib number starts ends with: {str(fib_100k_result.get())[-6:]}')
else:
print(usage)
task info
>>> task_id = b'1551943344215-0'
>>> queue.get_task(task_id)
{
'task_id': b'1551943344215-0',
'info': {
'state': 2,
'value': None,
'worker': 'worker-1-66202570-40a9-11e9-bc87-00163e0eb975',
'update_time': 1551943344
},
'data': {
b'task': b'{"task_name": "queues.spider.download", "args": [75], "kwargs": {}}'
}
}
progress
import time
import redis
from pyxqueue import TaskQueue
client = redis.Redis()
queue = TaskQueue(client, stream_key='progress')
@queue.task
def long_work():
for i in range(100):
queue.update_task_progress(i)
time.sleep(1)
if __name__ == '__main__':
import sys
usage = 'Usage: python progress.py (worker | test)'
if len(sys.argv) != 2:
print(usage)
elif sys.argv[1] == 'worker':
queue.run()
elif sys.argv[1] == 'test':
long_work()
else:
print(usage)
notes
this package is heavily inspired from: http://charlesleifer.com/blog/multi-process-task-queue-using-redis-streams/
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file pyxqueue-0.1.1.tar.gz.
File metadata
- Download URL: pyxqueue-0.1.1.tar.gz
- Upload date:
- Size: 6.4 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/3.4.2 importlib_metadata/4.8.1 pkginfo/1.7.1 requests/2.26.0 requests-toolbelt/0.9.1 tqdm/4.62.2 CPython/3.9.6
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
1b460f8f15095d91d8595fd4d3c5ab021d8bb7da82a9b49af378ff2e396a3103
|
|
| MD5 |
523a512cf1721ee6cee2a3d75e7e6f46
|
|
| BLAKE2b-256 |
616ecbf612d23726083023be782c15e54bcec3bb85fd121d0233de09c01ffad4
|
File details
Details for the file pyxqueue-0.1.1-py3-none-any.whl.
File metadata
- Download URL: pyxqueue-0.1.1-py3-none-any.whl
- Upload date:
- Size: 6.2 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/3.4.2 importlib_metadata/4.8.1 pkginfo/1.7.1 requests/2.26.0 requests-toolbelt/0.9.1 tqdm/4.62.2 CPython/3.9.6
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
03f7fdf45c32bbf0076be943ac468f14cc12f7422c0c273ba68697c3dc68cf1d
|
|
| MD5 |
7abed547f55b677c5282b96f0e04c0d1
|
|
| BLAKE2b-256 |
e64fc2bb499780353ca781fab51c4169b8dc51d5e5649d79e8fc403b74d7ba96
|