Asyncio Wrapper
Project description
WorkForce
Asyncio Wrapper
Install Dependencies
$ poetry install
Run example
$ poetry run python examples/http_server.py
Run tests
$ poetry run pytest
Interfaces
Just run async functions
workforce = WorkForce()
async def foo():
await asyncio.sleep(0.8)
bar.count += 1
f1 = workforce.schedule(foo)
Just run normal functions in another thread
def foo():
bar.count += 1
f = workforce.schedule(foo)
Function-based tasks
.s()
supports both normal and async functions
workforce = WorkForce()
def callback(wf, task):
bar.result = task.result()
@workforce.task(callback=callback)
async def add(a, b):
return a + b
task = add.s(4, 5)()
Create queues of tasks
workforce = WorkForce()
queue = workforce.queue('channel1')
queue.put(foo())
queue.put(foo())
queue.put(foo())
assert len(queue) == 3
Class-based framework
Make your own workforce that distributes workitems to Workers
class Company(WorkForce):
def get_worker(self, workitem):
"""
You could make this conditional-less by attaching a worker name
to a task or the worker itself
"""
if isinstance(workitem, NewFeature):
return self.workers['Developer']
elif (isinstance(workitem, Hire)
or isinstance(workitem, EmployeeCounseling)):
return self.workers['HR']
else:
raise self.WorkerNotFound
company = Company()
Make your own workers that perform tasks based on the workitem they receive
@company.worker
class Developer(Worker):
def start_workflow(self, workitem, *eargs, **ekwargs):
callback = getattr(self, workitem.callback, None)
# All tasks here run concurrent
coros = [self.run_coro_async(
getattr(self, task_name)(workitem),
*eargs, timeout=3.2, **ekwargs
) for task_name in workitem.tasks]
return self.run_coro_async(
asyncio.gather(*coros, loop=ekwargs['loop']),
*eargs, callback=callback, timeout=3.2, **ekwargs
)
async def design(self, workitem):
await asyncio.sleep(3)
bar.arr.append('design')
async def code(self, workitem):
await asyncio.sleep(2)
bar.arr.append('code')
async def test(self, workitem):
await asyncio.sleep(1)
bar.arr.append('test')
def make_pr(self, task, wf):
time.sleep(0.2)
bar.arr.append('make_pr')
company.schedule_workflow(NewFeature('New trendy ML'))
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
workforce-async-0.9.2.tar.gz
(4.5 kB
view hashes)
Built Distribution
Close
Hashes for workforce_async-0.9.2-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | b595d14fdfe0c74d199fa086ce974296b4925d2379ff6c2a567e86bd6f1865b6 |
|
MD5 | b89e3a422e7a3066b76c252cb80a4006 |
|
BLAKE2b-256 | cddd538e21a0c54ce3b1df925aa7cbcfbfc3ec836d4c5b5f148b9b191d4894fb |