Skip to main content

a multi-processes task queue using redis streams.

Project description

pyxqueue

点击中文版 README进入中文文档。

a multi-processes task queue using redis streams.

install

pyxqueue uses some feature of python 3.7, so you should use python 3.7 or newer.

pip install pyxqueue

Usage

background task

import time

import redis
from pyxqueue import TaskQueue

client = redis.Redis()
queue = TaskQueue(client, stream_key='background-task')


@queue.task
def sleep(n):
    print(f'going to sleep for {n} seconds')
    time.sleep(n)
    print(f'woke up after {n} seconds')


if __name__ == '__main__':
    import sys
    usage = 'Usage: python background-task.py (worker | test)'
    if len(sys.argv) != 2:
        print(usage)
    elif sys.argv[1] == 'worker':
        queue.run()
    elif sys.argv[1] == 'test':
        sleep(2)
    else:
        print(usage)

rpc

import redis
from pyxqueue import TaskQueue

client = redis.Redis()
queue = TaskQueue(client, stream_key='rpc')


@queue.task
def fib(n):
    a, b = 0, 1
    for _ in range(n):
        a, b = b, a + b
    return b


if __name__ == '__main__':
    import sys
    usage = 'Usage: python rpc.py (worker | test)'
    if len(sys.argv) != 2:
        print(usage)
    elif sys.argv[1] == 'worker':
        queue.run()
    elif sys.argv[1] == 'test':
        fib_100k_result = fib(100_000)
        print(f'100kth fib number starts ends with: {str(fib_100k_result.get())[-6:]}')
    else:
        print(usage)

task info

>>> task_id = b'1551943344215-0'
>>> queue.get_task(task_id)
{
    'task_id': b'1551943344215-0',
    'info': {
        'state': 2,
        'value': None,
        'worker': 'worker-1-66202570-40a9-11e9-bc87-00163e0eb975',
        'update_time': 1551943344
    },
    'data': {
        b'task': b'{"task_name": "queues.spider.download", "args": [75], "kwargs": {}}'
    }
}

progress

import time

import redis
from pyxqueue import TaskQueue

client = redis.Redis()
queue = TaskQueue(client, stream_key='progress')


@queue.task
def long_work():
    for i in range(100):
        queue.update_task_progress(i)
        time.sleep(1)


if __name__ == '__main__':
    import sys
    usage = 'Usage: python progress.py (worker | test)'
    if len(sys.argv) != 2:
        print(usage)
    elif sys.argv[1] == 'worker':
        queue.run()
    elif sys.argv[1] == 'test':
        long_work()
    else:
        print(usage)

notes

this package is heavily inspired from: http://charlesleifer.com/blog/multi-process-task-queue-using-redis-streams/

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pyxqueue-0.1.1.tar.gz (6.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

pyxqueue-0.1.1-py3-none-any.whl (6.2 kB view details)

Uploaded Python 3

File details

Details for the file pyxqueue-0.1.1.tar.gz.

File metadata

  • Download URL: pyxqueue-0.1.1.tar.gz
  • Upload date:
  • Size: 6.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.4.2 importlib_metadata/4.8.1 pkginfo/1.7.1 requests/2.26.0 requests-toolbelt/0.9.1 tqdm/4.62.2 CPython/3.9.6

File hashes

Hashes for pyxqueue-0.1.1.tar.gz
Algorithm Hash digest
SHA256 1b460f8f15095d91d8595fd4d3c5ab021d8bb7da82a9b49af378ff2e396a3103
MD5 523a512cf1721ee6cee2a3d75e7e6f46
BLAKE2b-256 616ecbf612d23726083023be782c15e54bcec3bb85fd121d0233de09c01ffad4

See more details on using hashes here.

File details

Details for the file pyxqueue-0.1.1-py3-none-any.whl.

File metadata

  • Download URL: pyxqueue-0.1.1-py3-none-any.whl
  • Upload date:
  • Size: 6.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.4.2 importlib_metadata/4.8.1 pkginfo/1.7.1 requests/2.26.0 requests-toolbelt/0.9.1 tqdm/4.62.2 CPython/3.9.6

File hashes

Hashes for pyxqueue-0.1.1-py3-none-any.whl
Algorithm Hash digest
SHA256 03f7fdf45c32bbf0076be943ac468f14cc12f7422c0c273ba68697c3dc68cf1d
MD5 7abed547f55b677c5282b96f0e04c0d1
BLAKE2b-256 e64fc2bb499780353ca781fab51c4169b8dc51d5e5649d79e8fc403b74d7ba96

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page