Skip to main content

A concurrent streaming package

Project description

A concurrent streaming package

  • Dataflow based functional syntax.
  • Implicitly parallelism for both async and non-async functions.
  • Composable for both flows and tasks.
  • Extensible with middlewares.

Installation

pip install aiosaber

Example

  • check tests for more examples.
from aiosaber import *

@task
def add(self, num):
    for i in range(100000):
        num += 1
    return num

@task
async def multiply(num1, num2):
    return num1 * num2

@flow
def sub_flow(num):
    return add(num) | map_(lambda x: x ** 2) | add

@flow
def my_flow(num):
    [sub_flow(num), sub_flow(num)] | multiply | view

num_ch = Channel.values(*list(range(100)))
f = my_flow(num_ch)
asyncio.run(f.start())

Middleware example

from aiosaber import *

class NameBuilder(BaseBuilder):
    def __call__(self, com, *args, **kwargs):
        super().__call__(com, *args, **kwargs)
        com.context['name'] = type(com).__name__ + str(id(com))

class ClientProvider(BaseExecutor):
    async def __call__(self, com, **kwargs):
        if not context.context.get('client'):
            context.context['client'] = 'client'
        return await super().__call__(com, **kwargs)

class Filter(BaseHandler):
    async def __call__(self, com, get, put, **kwargs):
        async def filter_put(data):
            if data is END or data > 3:
                await put(data)

        return await super().__call__(com, get, filter_put, **kwargs)

@task
async def add(self, num):
    print(self.context['name'])
    print(context.context['client'])
    return num + 1

@flow
def myflow(num_ch):
    return num_ch | add | view

context.context.update({
    'builders': [NameBuilder],
    'executors': [ClientProvider],
    'handlers': [Filter]
})
f = myflow(Channel.values(1, 2, 3, 4, 5))
context.context.clear()
asyncio.run(f.start())

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

aiosaber-0.0.1.1.tar.gz (25.1 kB view details)

Uploaded Source

Built Distribution

aiosaber-0.0.1.1-py3-none-any.whl (28.1 kB view details)

Uploaded Python 3

File details

Details for the file aiosaber-0.0.1.1.tar.gz.

File metadata

  • Download URL: aiosaber-0.0.1.1.tar.gz
  • Upload date:
  • Size: 25.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.4.1 importlib_metadata/4.0.1 pkginfo/1.7.0 requests/2.25.1 requests-toolbelt/0.9.1 tqdm/4.61.0 CPython/3.9.5

File hashes

Hashes for aiosaber-0.0.1.1.tar.gz
Algorithm Hash digest
SHA256 41a7e51c0ce35bb39bca0adb80a6439d665b60e02bc75f5a94326027b7dff613
MD5 eb7a4c3dbb633ec979318e821a413b4a
BLAKE2b-256 b8bbb7935e165d85cbf8de15cf5fdb9db7987ef1cd80e19303cbe686705ea0af

See more details on using hashes here.

File details

Details for the file aiosaber-0.0.1.1-py3-none-any.whl.

File metadata

  • Download URL: aiosaber-0.0.1.1-py3-none-any.whl
  • Upload date:
  • Size: 28.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.4.1 importlib_metadata/4.0.1 pkginfo/1.7.0 requests/2.25.1 requests-toolbelt/0.9.1 tqdm/4.61.0 CPython/3.9.5

File hashes

Hashes for aiosaber-0.0.1.1-py3-none-any.whl
Algorithm Hash digest
SHA256 989e1eadc03c2df00e47302aa63046a8e6f19100f01a397bc12ac839af457d44
MD5 735c26f76c11f1a816ce4c34f605e173
BLAKE2b-256 91aa68c5ed34a97c50b52037a82f88b800ab9418649ef4ca67285853f9fc3d94

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page