Skip to main content

Wiji is an asyncio distributed task processor/queue.

Project description

wiji

Codacy Badge CircleCI codecov Code style: black

Wiji is an asyncio distributed task processor/queue.
It’s name is derived from the late Kenyan hip hop artiste, Gwiji.

It is a bit like Celery

wiji has no third-party dependencies and it requires python version 3.7+

wiji is work in progress and very early. It’s API may change in backward incompatible ways.

Installation

pip install wiji

Usage

1. As a library

import asyncio
import wiji

class AdderTask(wiji.task.Task):
    the_broker = wiji.broker.InMemoryBroker()
    queue_name = "AdderTaskQueue1"

    async def run(self, a, b):
        result = a + b
        print("\nresult: {0}\n".format(result))
        return result

# queue some tasks
myAdderTask = AdderTask( )
myAdderTask.synchronous_delay(a=4, b=37)
myAdderTask.synchronous_delay(a=67, b=847)

# run the workers
worker = wiji.Worker(the_task=myAdderTask)
asyncio.run(worker.consume_tasks())

2. As a cli app

wiji also ships with a commandline app called wiji-cli.

create a wiji app file(which is just any python file that has a class instance of wiji.app.App), eg;
examples/my_app.py
import wiji

class AdderTask(wiji.task.Task):
    the_broker = wiji.broker.InMemoryBroker()
    queue_name = "AdderTaskQueue1"

    async def run(self, a, b):
        res = a + b
        print()
        print("res:: ", res)
        print()
        return res

MyAppInstance = wiji.app.App(task_classes=[AdderTask])
NB: the directory where your place that file(in this case; examples/) ought to be in your PYTHONPATH
then run wiji-cli pointing it to the dotted path of the wiji.app.App instance:
wiji-cli --app examples.my_app.MyAppInstance

Writing tests

Lets say you have wiji tasks in your project and you want to write integration or unit tests for them and their use.

# my_tasks.py

import wiji
import MyRedisBroker # a custom broker using redis

DATABASE = {}

class AdderTask(wiji.task.Task):
    the_broker = MyRedisBroker()
    queue_name = "AdderTask"

    async def run(self, a, b):
        """
        adds two numbers and stores the resut in a database
        """
        result = a + b
        DATABASE["result"] = result
        return result

class ExampleView:
    def post(self, request):
        a = request["a"]
        b = request["b"]
        AdderTask().synchronous_delay(a=a, b=b)
In the example above we have a view with one post method. When that method is called it queues a task that adds two numbers and then stores the result of that addition in a database.
That task uses a broker(MyRedisBroker) that is backed by redis.
One way to write your tests would be;
# test_tasks.py

from my_tasks import ExampleView
from unittest import TestCase

class TestExampleView(TestCase):
    def test_view(self):
        view = ExampleView()
        view.post(request={"a": 45, "b": 46})
        # do your asserts here
The problem with the above approach is that this will require you to have an instance of redis running for that test to run succesfully.
This may not be what you want. Ideally you do not want your tests be dependent on external services.
wiji ships with an in-memory broker that you can use in your tests.
So the test above can be re-written in this manner;
# test_tasks.py

import asyncio
from my_tasks import ExampleView, AdderTask, DATABASE
from unittest import TestCase, mock

class TestExampleView(TestCase):

    @staticmethod
    def _run(coro):
        """
        helper function that runs any coroutine in an event loop.
        see:: https://blog.miguelgrinberg.com/post/unit-testing-asyncio-code
        """
        loop = asyncio.get_event_loop()
        return loop.run_until_complete(coro)

    def test_view(self):
        with mock.patch.object(
            # ie, substitute the redis broker with an in-memory one during test runs
            AdderTask, "the_broker", wiji.broker.InMemoryBroker()
        ) as mock_broker:
            view = ExampleView()
            view.post(request={"a": 45, "b": 46})
            # do your asserts here

    def test_whole_flow(self):
        with mock.patch.object(
            AdderTask, "the_broker", wiji.broker.InMemoryBroker()
        ) as mock_broker:
            # 1. assert that the database is initially empty
            self.assertDictEqual(DATABASE, {})

            # 2. when `post` is called it will queue a task
            view = ExampleView()
            view.post(request={"a": 45, "b": 46})

            # 3. we need to run workers
            worker = wiji.Worker(the_task=AdderTask())
            self._run(worker.consume_tasks(TESTING=True))

            # 4. assert that database has been updated succesfully.
            self.assertDictEqual(DATABASE, {"result": 91})

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

wiji-0.3.0.tar.gz (29.1 kB view details)

Uploaded Source

Built Distribution

wiji-0.3.0-py3-none-any.whl (32.5 kB view details)

Uploaded Python 3

File details

Details for the file wiji-0.3.0.tar.gz.

File metadata

  • Download URL: wiji-0.3.0.tar.gz
  • Upload date:
  • Size: 29.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/1.13.0 pkginfo/1.5.0.1 requests/2.21.0 setuptools/40.8.0 requests-toolbelt/0.9.1 tqdm/4.31.1 CPython/3.7.0

File hashes

Hashes for wiji-0.3.0.tar.gz
Algorithm Hash digest
SHA256 7cf9a6a921e77d0d00e2d5db892e435a911e73edba2758a1b52f02c512d7aacc
MD5 7061e068fda1504602a2b66e77aba6d6
BLAKE2b-256 d04ba3d8bd25a1cef0f9761ee1ca921a5f616abfc3adb5dbe3d6537d11614a15

See more details on using hashes here.

File details

Details for the file wiji-0.3.0-py3-none-any.whl.

File metadata

  • Download URL: wiji-0.3.0-py3-none-any.whl
  • Upload date:
  • Size: 32.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/1.13.0 pkginfo/1.5.0.1 requests/2.21.0 setuptools/40.8.0 requests-toolbelt/0.9.1 tqdm/4.31.1 CPython/3.7.0

File hashes

Hashes for wiji-0.3.0-py3-none-any.whl
Algorithm Hash digest
SHA256 a30542f616f47ed2914bcdbda32e2d0fcb321c9eb6b4b2dcdb68996899379f6e
MD5 aec236d334cb2985f417c69af2f638c7
BLAKE2b-256 d80402d8bbd7cf64632743de86d58af057bfcf406a070f08a2a314b6729ef78a

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page