Asyncio Wrapper
Project description
WorkForce Async
Asyncio Wrapper
Install in your project
$ pip install workforce-async
https://pypi.org/project/workforce-async/
Install Dependencies
$ poetry install
Run example
$ poetry run python examples/http_server.py
Run tests
$ poetry run pytest
Interfaces
Just run async functions
workforce = WorkForce()
async def foo():
await asyncio.sleep(0.8)
bar.count += 1
f1 = workforce.schedule(foo)
Just run normal functions in another thread
def foo():
bar.count += 1
f = workforce.schedule(foo)
Function-based tasks
.s()
supports both normal and async functions
workforce = WorkForce()
def callback(wf, task):
bar.result = task.result()
@workforce.task(callback=callback)
async def add(a, b):
return a + b
task = add.s(4, 5)()
@workforce.task()
async def sleep(sec):
await asyncio.sleep(sec)
workforce.queue('channel1')
queue = sleep.q(0.5)('channel1')
Create queues of tasks
workforce = WorkForce()
queue = workforce.queue('channel1')
queue.put(foo())
queue.put(foo())
queue.put(foo())
assert len(queue) == 3
Class-based framework
Make your own workforce that distributes workitems to Workers
class Company(WorkForce):
def get_worker(self, workitem):
try:
worker_name = {
'NewFeature': 'developer',
'Hire': 'hr',
'EmployeeCounseling': 'hr'
}[type(workitem).__name__]
return super().get_worker(worker_name)
except KeyError:
raise self.WorkerNotFound
company = Company()
Make your own workers that perform tasks based on the workitem they receive
@company.worker
class Developer(Worker):
def handle_workitem(self, workitem, *args, **kwargs):
callback = getattr(workitem, 'callback', None)
# All tasks here run concurrent
coros = (getattr(self, task_name)(workitem)
for task_name in workitem.tasks)
# Hack because asyncio.gather is not recognised as a coroutine
async def gather(*aws, **kwargs):
return await asyncio.gather(*aws, **kwargs)
return gather(*coros), callback
async def design(self, workitem):
await asyncio.sleep(3)
bar.arr.append('design')
async def code(self, workitem):
await asyncio.sleep(2)
bar.arr.append('code')
async def test(self, workitem):
await asyncio.sleep(1)
bar.arr.append('test')
def make_pr(self, task, wf):
time.sleep(0.2)
bar.arr.append('make_pr')
company.schedule_workflow(NewFeature('New trendy ML'))
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
workforce-async-0.11.0.tar.gz
(6.3 kB
view hashes)
Built Distribution
Close
Hashes for workforce_async-0.11.0-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 40f7a0fc3c1f24fbd11d14e64003ab8d36c9e5ee5d6cc98badf7b66b8971efa5 |
|
MD5 | 7bcff28e62cf4eca53f1e0414d881a4c |
|
BLAKE2b-256 | 894a81dbad3d7d29ef092e9e909d3d73b2de7961fcd8fab59cc1a19747d6fa49 |