Wiji is an asyncio distributed task processor/queue.
Project description
wiji
Codacy Badge CircleCI codecov Code style: black
Wiji is an asyncio distributed task processor/queue.
It’s name is derived from the late Kenyan hip hop artiste, Gwiji.
It is a bit like Celery
wiji has no third-party dependencies and it requires python version 3.7+
wiji is work in progress and very early. It’s API may change in
backward incompatible ways.
Installation
pip install wiji
Usage
1. As a library
import asyncio
import wiji
class AdderTask(wiji.task.Task):
the_broker = wiji.broker.InMemoryBroker()
queue_name = "AdderTaskQueue1"
async def run(self, a, b):
result = a + b
print("\nresult: {0}\n".format(result))
return result
# queue some tasks
myAdderTask = AdderTask( )
myAdderTask.synchronous_delay(a=4, b=37)
myAdderTask.synchronous_delay(a=67, b=847)
# run the workers
worker = wiji.Worker(the_task=myAdderTask)
asyncio.run(worker.consume_tasks())
2. As a cli app
wiji also ships with a commandline app called wiji-cli.
create a wiji app file(which is just any python file that has a
class instance of wiji.app.App), eg;
examples/my_app.py
import wiji
class AdderTask(wiji.task.Task):
the_broker = wiji.broker.InMemoryBroker()
queue_name = "AdderTaskQueue1"
async def run(self, a, b):
res = a + b
print()
print("res:: ", res)
print()
return res
MyAppInstance = wiji.app.App(task_classes=[AdderTask])
NB: the directory where your place that file(in this case;
examples/) ought to be in your PYTHONPATH
then run wiji-cli pointing it to the dotted path of the
wiji.app.App instance:
wiji-cli --app examples.my_app.MyAppInstance
Writing tests
Lets say you have wiji tasks in your project and you want to write integration or unit tests for them and their use.
# my_tasks.py
import wiji
import MyRedisBroker # a custom broker using redis
DATABASE = {}
class AdderTask(wiji.task.Task):
the_broker = MyRedisBroker()
queue_name = "AdderTask"
async def run(self, a, b):
"""
adds two numbers and stores the resut in a database
"""
result = a + b
DATABASE["result"] = result
return result
class ExampleView:
def post(self, request):
a = request["a"]
b = request["b"]
AdderTask().synchronous_delay(a=a, b=b)
In the example above we have a view with one post method. When
that method is called it queues a task that adds two numbers and then
stores the result of that addition in a database.
That task uses a broker(MyRedisBroker) that is backed by redis.
One way to write your tests would be;
# test_tasks.py
from my_tasks import ExampleView
from unittest import TestCase
class TestExampleView(TestCase):
def test_view(self):
view = ExampleView()
view.post(request={"a": 45, "b": 46})
# do your asserts here
The problem with the above approach is that this will require you to
have an instance of redis running for that test to run succesfully.
This may not be what you want. Ideally you do not want your tests be
dependent on external services.
wiji ships with an in-memory broker that you can use in your
tests.
So the test above can be re-written in this manner;
# test_tasks.py
import asyncio
from my_tasks import ExampleView, AdderTask, DATABASE
from unittest import TestCase, mock
class TestExampleView(TestCase):
@staticmethod
def _run(coro):
"""
helper function that runs any coroutine in an event loop.
see:: https://blog.miguelgrinberg.com/post/unit-testing-asyncio-code
"""
loop = asyncio.get_event_loop()
return loop.run_until_complete(coro)
def test_view(self):
with mock.patch.object(
# ie, substitute the redis broker with an in-memory one during test runs
AdderTask, "the_broker", wiji.broker.InMemoryBroker()
) as mock_broker:
view = ExampleView()
view.post(request={"a": 45, "b": 46})
# do your asserts here
def test_whole_flow(self):
with mock.patch.object(
AdderTask, "the_broker", wiji.broker.InMemoryBroker()
) as mock_broker:
# 1. assert that the database is initially empty
self.assertDictEqual(DATABASE, {})
# 2. when `post` is called it will queue a task
view = ExampleView()
view.post(request={"a": 45, "b": 46})
# 3. we need to run workers
worker = wiji.Worker(the_task=AdderTask())
self._run(worker.consume_tasks(TESTING=True))
# 4. assert that database has been updated succesfully.
self.assertDictEqual(DATABASE, {"result": 91})
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
wiji-0.3.0.tar.gz
(29.1 kB
view details)
Built Distribution
wiji-0.3.0-py3-none-any.whl
(32.5 kB
view details)
File details
Details for the file wiji-0.3.0.tar.gz
.
File metadata
- Download URL: wiji-0.3.0.tar.gz
- Upload date:
- Size: 29.1 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/1.13.0 pkginfo/1.5.0.1 requests/2.21.0 setuptools/40.8.0 requests-toolbelt/0.9.1 tqdm/4.31.1 CPython/3.7.0
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 7cf9a6a921e77d0d00e2d5db892e435a911e73edba2758a1b52f02c512d7aacc |
|
MD5 | 7061e068fda1504602a2b66e77aba6d6 |
|
BLAKE2b-256 | d04ba3d8bd25a1cef0f9761ee1ca921a5f616abfc3adb5dbe3d6537d11614a15 |
File details
Details for the file wiji-0.3.0-py3-none-any.whl
.
File metadata
- Download URL: wiji-0.3.0-py3-none-any.whl
- Upload date:
- Size: 32.5 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/1.13.0 pkginfo/1.5.0.1 requests/2.21.0 setuptools/40.8.0 requests-toolbelt/0.9.1 tqdm/4.31.1 CPython/3.7.0
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | a30542f616f47ed2914bcdbda32e2d0fcb321c9eb6b4b2dcdb68996899379f6e |
|
MD5 | aec236d334cb2985f417c69af2f638c7 |
|
BLAKE2b-256 | d80402d8bbd7cf64632743de86d58af057bfcf406a070f08a2a314b6729ef78a |