Skip to main content

A concurrent streaming package

Project description

A concurrent streaming package

  • Dataflow based functional syntax.
  • Implicitly parallelism for both async and non-async functions.
  • Composable for both flows and tasks.
  • Extensible with middlewares.

Installation

pip install aiosaber

Example

  • check tests for more examples.
from aiosaber import *

@task
def add(self, num):
    for i in range(100000):
        num += 1
    return num

@task
async def multiply(num1, num2):
    return num1 * num2

@flow
def sub_flow(num):
    return add(num) | map_(lambda x: x ** 2) | add

@flow
def my_flow(num):
    [sub_flow(num), sub_flow(num)] | multiply | view

num_ch = Channel.values(*list(range(100)))
f = my_flow(num_ch)
asyncio.run(f.start())

Middleware example

from aiosaber import *

class NameBuilder(BaseBuilder):
    def __call__(self, com, *args, **kwargs):
        super().__call__(com, *args, **kwargs)
        com.context['name'] = type(com).__name__ + str(id(com))

class ClientProvider(BaseExecutor):
    async def __call__(self, com, **kwargs):
        if not context.context.get('client'):
            context.context['client'] = 'client'
        return await super().__call__(com, **kwargs)

class Filter(BaseHandler):
    async def __call__(self, com, get, put, **kwargs):
        async def filter_put(data):
            if data is END or data > 3:
                await put(data)

        return await super().__call__(com, get, filter_put, **kwargs)

@task
async def add(self, num):
    print(self.context['name'])
    print(context.context['client'])
    return num + 1

@flow
def myflow(num_ch):
    return num_ch | add | view

context.context.update({
    'builders': [NameBuilder],
    'executors': [ClientProvider],
    'handlers': [Filter]
})
f = myflow(Channel.values(1, 2, 3, 4, 5))
context.context.clear()
asyncio.run(f.start())

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

aiosaber-0.0.1.1.tar.gz (25.1 kB view hashes)

Uploaded Source

Built Distribution

aiosaber-0.0.1.1-py3-none-any.whl (28.1 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page