No project description provided
Project description
rsq
Really Simple Queue is a really simple queue. It allows to push work in an async queue. Workers can consume that queue, and submit results.
Requirements
- Mongo >= 3
Usage
Python
from traxix.rsq import RSQ, Const
# Init
rsq = RSQ(mongo_url="mongodb://localhost:27017", db_name="hello_word")
# Create tasks
rsq.push() # simple task
rsq.push(random_key="random_value", foo="bar")
# pretty print
rsq.print_tasks()
work = rsq.pull() # would pull any task
rsq.done(_id=work[Const.ID], result="It worked")
# who is an optional parameter to troubleshoot which worker is handling a task
work = rsq.pull(who="my-worker-id", random_key="random_value")
if work:
rsq.done(_id=work[Const.ID], success=False, result="failed")
rsq.remove(_id=work[Const.ID]) # 1 task only
else:
print("No task available corresponding")
# List tasks DONE
print("tasks>done>", rsq.list_tasks(state=Const.State.DONE))
print("tasks>fail>", rsq.list_tasks(state=Const.State.FAIL))
# cleanup
rsq.remove(_id=Const.REMOVE_EVERYTHING) # all
import math
from traxix.rsq import RSQ, Const, Conditions
def is_prime(n: int) -> bool:
if n < 2 or n % 2 == 0:
return n == 2
root = int(math.sqrt(n)) + 1
for i in range(3, root, 2):
if n % i == 0:
return False
return True
def main(mongo_url: str = "localhost:27017"):
rsq = RSQ(mongo_url=mongo_url)
rsq.remove()
# Reducer task: runs after all child tasks are done
reducer_id = rsq.push(
state=Const.State.BACK_LOG,
kind="reduce_primes",
)
# Child tasks
for n in range(1, 100):
rsq.push(
conditions=Conditions(reducer=reducer_id),
kind="is_prime",
n=n,
)
# Workers: process prime checks
while work := rsq.pull(kind="is_prime"):
n = work[Const.DATA]["n"]
rsq.done(_id=work[Const.ID], result=is_prime(n))
# Reducer: collect results
reducer = rsq.pull(kind="reduce_primes")
children = rsq.reducers(reducer[Const.ID])
primes = sorted(
child[Const.DATA]["n"] for child in children if child.get(Const.RESULT)
)
rsq.done(_id=reducer[Const.ID], result=primes)
print("primes:", primes)
if __name__ == "__main__":
main()
Cli
$ rsq list_tasks
$ rsq push --foo=bar
61128c086f94b0d168e6a339
$ rsq list_tasks
{'_id': ObjectId('61128c086f94b0d168e6a339'), 'state': 'todo', 'data': {'foo': 'bar'}}
$ rsq pull -- -v # fire remove keys beginning with "_" hence the -v
_id: 61128c086f94b0d168e6a339
data: {"foo": "bar"}
$ rsq done 61128c086f94b0d168e6a339 42
$ rsq list_tasks
{'_id': ObjectId('61128c086f94b0d168e6a339'), 'state': 'done', 'data': {'foo': 'bar'}, 'result': 42}
Install
pip install traxix.rsq
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
traxix_rsq-0.3.0.tar.gz
(7.5 kB
view details)
File details
Details for the file traxix_rsq-0.3.0.tar.gz.
File metadata
- Download URL: traxix_rsq-0.3.0.tar.gz
- Upload date:
- Size: 7.5 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.12.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
3118125c2807f9cb5fc439e5adc06c45f57cf024d9135c36f78cd4b3f8b264a7
|
|
| MD5 |
3bd9ab8b17fc523c42dc3eb97a3575a5
|
|
| BLAKE2b-256 |
7abc249668f18048f902018ab8f1fb17158a811a3f1e20eacaaa46d3efb9323e
|