Skip to main content

No project description provided

Project description

rsq

Really Simple Queue is a really simple queue. It allows to push work in an async queue. Workers can consume that queue, and submit results.

Requirements

  • Mongo >= 3

Usage

Python

from traxix.rsq import RSQ, Const

# Init
rsq = RSQ(mongo_url="mongodb://localhost:27017", db_name="hello_word")

# Create tasks
rsq.push()  # simple task
rsq.push(random_key="random_value", foo="bar")

# pretty print
rsq.print_tasks()

work = rsq.pull()  # would pull any task
rsq.done(_id=work[Const.ID], result="It worked")

# who is an optional parameter to troubleshoot which worker is handling a task
work = rsq.pull(who="my-worker-id", random_key="random_value")

if work:
    rsq.done(_id=work[Const.ID], success=False, result="failed")
    rsq.remove(_id=work[Const.ID])  # 1 task only
else:
    print("No task available corresponding")


# List tasks DONE
print("tasks>done>", rsq.list_tasks(state=Const.State.DONE))
print("tasks>fail>", rsq.list_tasks(state=Const.State.FAIL))

# cleanup
rsq.remove(_id=Const.REMOVE_EVERYTHING)  # all
import math

from traxix.rsq import RSQ, Const, Conditions


def is_prime(n: int) -> bool:
    if n < 2 or n % 2 == 0:
        return n == 2
    root = int(math.sqrt(n)) + 1
    for i in range(3, root, 2):
        if n % i == 0:
            return False
    return True


def main(mongo_url: str = "localhost:27017"):
    rsq = RSQ(mongo_url=mongo_url)
    rsq.remove()

    # Reducer task: runs after all child tasks are done
    reducer_id = rsq.push(
        state=Const.State.BACK_LOG,
        kind="reduce_primes",
    )

    # Child tasks
    for n in range(1, 100):
        rsq.push(
            conditions=Conditions(reducer=reducer_id),
            kind="is_prime",
            n=n,
        )

    # Workers: process prime checks
    while work := rsq.pull(kind="is_prime"):
        n = work[Const.DATA]["n"]
        rsq.done(_id=work[Const.ID], result=is_prime(n))

    # Reducer: collect results
    reducer = rsq.pull(kind="reduce_primes")
    children = rsq.reducers(reducer[Const.ID])
    primes = sorted(
        child[Const.DATA]["n"] for child in children if child.get(Const.RESULT)
    )
    rsq.done(_id=reducer[Const.ID], result=primes)

    print("primes:", primes)


if __name__ == "__main__":
    main()

Cli

$ rsq list_tasks
$ rsq push --foo=bar
61128c086f94b0d168e6a339
$ rsq list_tasks
{'_id': ObjectId('61128c086f94b0d168e6a339'), 'state': 'todo', 'data': {'foo': 'bar'}}
$ rsq pull -- -v # fire remove keys beginning with "_" hence the -v
_id: 61128c086f94b0d168e6a339
data: {"foo": "bar"}
$ rsq done 61128c086f94b0d168e6a339 42
$ rsq list_tasks
{'_id': ObjectId('61128c086f94b0d168e6a339'), 'state': 'done', 'data': {'foo': 'bar'}, 'result': 42}

Install

pip install traxix.rsq

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

traxix_rsq-0.3.0.tar.gz (7.5 kB view details)

Uploaded Source

File details

Details for the file traxix_rsq-0.3.0.tar.gz.

File metadata

  • Download URL: traxix_rsq-0.3.0.tar.gz
  • Upload date:
  • Size: 7.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.12.3

File hashes

Hashes for traxix_rsq-0.3.0.tar.gz
Algorithm Hash digest
SHA256 3118125c2807f9cb5fc439e5adc06c45f57cf024d9135c36f78cd4b3f8b264a7
MD5 3bd9ab8b17fc523c42dc3eb97a3575a5
BLAKE2b-256 7abc249668f18048f902018ab8f1fb17158a811a3f1e20eacaaa46d3efb9323e

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page