Skip to main content

Asyncio Wrapper

Project description

WorkForce Async

Asyncio Wrapper

Install in your project

$ pip install workforce-async

https://pypi.org/project/workforce-async/

Select your python version

$ poetry env use 3.7

Install Dependencies

$ poetry install

Run example

$ poetry run python examples/http_server.py

Run tests

$ poetry run pytest

Interfaces

Snippets taken from tests

Just run async functions

workforce = WorkForce()

async def foo():
    await asyncio.sleep(0.8)
    bar.count += 1

f1 = workforce.schedule(foo)

Just run normal functions in another thread

def foo():
    bar.count += 1

f = workforce.schedule(foo)

Function-based tasks

.s() supports both normal and async functions

workforce = WorkForce()

def callback(wf, task):
    bar.result = task.result()

@workforce.task(callback=callback)
async def add(a, b):
    return a + b

task = add.s(4, 5)()

@workforce.task()
async def sleep(sec):
    await asyncio.sleep(sec)

workforce.queue('channel1')
queue = sleep.q(0.5)('channel1')

Create queues of tasks

workforce = WorkForce()
queue = workforce.queue('channel1')
queue.put(foo())
queue.put(foo())
queue.put(foo())
assert len(queue) == 3

Class-based framework

Make your own workforce that distributes workitems to Workers

class Company(WorkForce):
    def get_worker(self, workitem):
        try:
            worker_name = {
                'NewFeature': 'developer',
                'Hire': 'hr',
                'EmployeeCounseling': 'hr'
            }[type(workitem).__name__]

            return super().get_worker(worker_name)
        except KeyError:
            raise self.WorkerNotFound

company = Company()

Make your own workers that perform tasks based on the workitem they receive

@company.worker
class Developer(Worker):
    def handle_workitem(self, workitem, *args, **kwargs):
        callback = getattr(workitem, 'callback', None)

        # All tasks here run concurrent
        coros = (getattr(self, task_name)(workitem)
                 for task_name in workitem.tasks)

        # Hack because asyncio.gather is not recognised as a coroutine
        async def gather(*aws, **kwargs):
            return await asyncio.gather(*aws, **kwargs)

        return gather(*coros), callback

    async def design(self, workitem):
        await asyncio.sleep(3)
        bar.arr.append('design')

    async def code(self, workitem):
        await asyncio.sleep(2)
        bar.arr.append('code')

    async def test(self, workitem):
        await asyncio.sleep(1)
        bar.arr.append('test')

    def make_pr(self, task, wf):
        time.sleep(0.2)
        bar.arr.append('make_pr')

company.schedule_workflow(NewFeature('New trendy ML'))

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

workforce-async-0.12.0.tar.gz (6.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

workforce_async-0.12.0-py3-none-any.whl (6.8 kB view details)

Uploaded Python 3

File details

Details for the file workforce-async-0.12.0.tar.gz.

File metadata

  • Download URL: workforce-async-0.12.0.tar.gz
  • Upload date:
  • Size: 6.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.1.13 CPython/3.10.2 Linux/5.15.19_1

File hashes

Hashes for workforce-async-0.12.0.tar.gz
Algorithm Hash digest
SHA256 cc22d06b8bb309fad1bd2fa5e486b6d9a124f7c2a4cd3264f7bff932fab2920c
MD5 01fac3080262331b5a60d204a707a1d6
BLAKE2b-256 cf6a2b1108042d3f8ec157f98cc5152783ab272eb67893277ba153ce0ff5ab00

See more details on using hashes here.

File details

Details for the file workforce_async-0.12.0-py3-none-any.whl.

File metadata

  • Download URL: workforce_async-0.12.0-py3-none-any.whl
  • Upload date:
  • Size: 6.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.1.13 CPython/3.10.2 Linux/5.15.19_1

File hashes

Hashes for workforce_async-0.12.0-py3-none-any.whl
Algorithm Hash digest
SHA256 b17cc68505c76c7781117fc342ebd5840239caa6bd8eb9ee421a77a7839aaf63
MD5 e318d8c4428a3b3c15bff0ebad951390
BLAKE2b-256 db62752bc62249b2184e27d5497b050c07aac291e444383eca2c37cf9f93a288

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page