Skip to main content

Asyncio Wrapper

Project description

WorkForce Async

Asyncio Wrapper

Install in your project

$ pip install workforce-async

https://pypi.org/project/workforce-async/

Select your python version

$ poetry env use 3.7

Install Dependencies

$ poetry install

Run example

$ poetry run python examples/http_server.py

Run tests

$ poetry run pytest

Interfaces

Snippets taken from tests

Just run async functions

workforce = WorkForce()

async def foo():
    await asyncio.sleep(0.8)
    bar.count += 1

f1 = workforce.schedule(foo)

Just run normal functions in another thread

def foo():
    bar.count += 1

f = workforce.schedule(foo)

Function-based tasks

.s() supports both normal and async functions

workforce = WorkForce()

def callback(wf, task):
    bar.result = task.result()

@workforce.task(callback=callback)
async def add(a, b):
    return a + b

task = add.s(4, 5)()

@workforce.task()
async def sleep(sec):
    await asyncio.sleep(sec)

workforce.queue('channel1')
queue = sleep.q(0.5)('channel1')

Create queues of tasks

workforce = WorkForce()
queue = workforce.queue('channel1')
queue.put(foo())
queue.put(foo())
queue.put(foo())
assert len(queue) == 3

Class-based framework

Make your own workforce that distributes workitems to Workers

class Company(WorkForce):
    def get_worker(self, workitem):
        try:
            worker_name = {
                'NewFeature': 'developer',
                'Hire': 'hr',
                'EmployeeCounseling': 'hr'
            }[type(workitem).__name__]

            return super().get_worker(worker_name)
        except KeyError:
            raise self.WorkerNotFound

company = Company()

Make your own workers that perform tasks based on the workitem they receive

@company.worker
class Developer(Worker):
    def handle_workitem(self, workitem, *args, **kwargs):
        callback = getattr(workitem, 'callback', None)

        # All tasks here run concurrent
        coros = (getattr(self, task_name)(workitem)
                 for task_name in workitem.tasks)

        # Hack because asyncio.gather is not recognised as a coroutine
        async def gather(*aws, **kwargs):
            return await asyncio.gather(*aws, **kwargs)

        return gather(*coros), callback

    async def design(self, workitem):
        await asyncio.sleep(3)
        bar.arr.append('design')

    async def code(self, workitem):
        await asyncio.sleep(2)
        bar.arr.append('code')

    async def test(self, workitem):
        await asyncio.sleep(1)
        bar.arr.append('test')

    def make_pr(self, task, wf):
        time.sleep(0.2)
        bar.arr.append('make_pr')

company.schedule_workflow(NewFeature('New trendy ML'))

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

workforce-async-0.12.0.tar.gz (6.6 kB view hashes)

Uploaded Source

Built Distribution

workforce_async-0.12.0-py3-none-any.whl (6.8 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page