Asyncio Wrapper
Project description
WorkForce Async
Asyncio Wrapper
Install in your project
$ pip install workforce-async
https://pypi.org/project/workforce-async/
Select your python version
$ poetry env use 3.7
Install Dependencies
$ poetry install
Run example
$ poetry run python examples/http_server.py
Run tests
$ poetry run pytest
Interfaces
Just run async functions
workforce = WorkForce()
async def foo():
await asyncio.sleep(0.8)
bar.count += 1
f1 = workforce.schedule(foo)
Just run normal functions in another thread
def foo():
bar.count += 1
f = workforce.schedule(foo)
Function-based tasks
.s() supports both normal and async functions
workforce = WorkForce()
def callback(wf, task):
bar.result = task.result()
@workforce.task(callback=callback)
async def add(a, b):
return a + b
task = add.s(4, 5)()
@workforce.task()
async def sleep(sec):
await asyncio.sleep(sec)
workforce.queue('channel1')
queue = sleep.q(0.5)('channel1')
Create queues of tasks
workforce = WorkForce()
queue = workforce.queue('channel1')
queue.put(foo())
queue.put(foo())
queue.put(foo())
assert len(queue) == 3
Class-based framework
Make your own workforce that distributes workitems to Workers
class Company(WorkForce):
def get_worker(self, workitem):
try:
worker_name = {
'NewFeature': 'developer',
'Hire': 'hr',
'EmployeeCounseling': 'hr'
}[type(workitem).__name__]
return super().get_worker(worker_name)
except KeyError:
raise self.WorkerNotFound
company = Company()
Make your own workers that perform tasks based on the workitem they receive
@company.worker
class Developer(Worker):
def handle_workitem(self, workitem, *args, **kwargs):
callback = getattr(workitem, 'callback', None)
# All tasks here run concurrent
coros = (getattr(self, task_name)(workitem)
for task_name in workitem.tasks)
# Hack because asyncio.gather is not recognised as a coroutine
async def gather(*aws, **kwargs):
return await asyncio.gather(*aws, **kwargs)
return gather(*coros), callback
async def design(self, workitem):
await asyncio.sleep(3)
bar.arr.append('design')
async def code(self, workitem):
await asyncio.sleep(2)
bar.arr.append('code')
async def test(self, workitem):
await asyncio.sleep(1)
bar.arr.append('test')
def make_pr(self, task, wf):
time.sleep(0.2)
bar.arr.append('make_pr')
company.schedule_workflow(NewFeature('New trendy ML'))
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file workforce-async-0.12.0.tar.gz.
File metadata
- Download URL: workforce-async-0.12.0.tar.gz
- Upload date:
- Size: 6.6 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/1.1.13 CPython/3.10.2 Linux/5.15.19_1
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
cc22d06b8bb309fad1bd2fa5e486b6d9a124f7c2a4cd3264f7bff932fab2920c
|
|
| MD5 |
01fac3080262331b5a60d204a707a1d6
|
|
| BLAKE2b-256 |
cf6a2b1108042d3f8ec157f98cc5152783ab272eb67893277ba153ce0ff5ab00
|
File details
Details for the file workforce_async-0.12.0-py3-none-any.whl.
File metadata
- Download URL: workforce_async-0.12.0-py3-none-any.whl
- Upload date:
- Size: 6.8 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/1.1.13 CPython/3.10.2 Linux/5.15.19_1
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
b17cc68505c76c7781117fc342ebd5840239caa6bd8eb9ee421a77a7839aaf63
|
|
| MD5 |
e318d8c4428a3b3c15bff0ebad951390
|
|
| BLAKE2b-256 |
db62752bc62249b2184e27d5497b050c07aac291e444383eca2c37cf9f93a288
|