A concurrent streaming package
Project description
A concurrent streaming package
- Dataflow based functional syntax.
- Implicitly parallelism for both async and non-async functions.
- Composable for both flows and tasks.
- Extensible with middlewares.
Installation
pip install aiosaber
Example
- check tests for more examples.
from aiosaber import *
@task
def add(self, num):
for i in range(100000):
num += 1
return num
@task
async def multiply(num1, num2):
return num1 * num2
@flow
def sub_flow(num):
return add(num) | map_(lambda x: x ** 2) | add
@flow
def my_flow(num):
[sub_flow(num), sub_flow(num)] | multiply | view
num_ch = Channel.values(*list(range(100)))
f = my_flow(num_ch)
asyncio.run(f.start())
Middleware example
from aiosaber import *
class NameBuilder(BaseBuilder):
def __call__(self, com, *args, **kwargs):
super().__call__(com, *args, **kwargs)
com.context['name'] = type(com).__name__ + str(id(com))
class ClientProvider(BaseExecutor):
async def __call__(self, com, **kwargs):
if not context.context.get('client'):
context.context['client'] = 'client'
return await super().__call__(com, **kwargs)
class Filter(BaseHandler):
async def __call__(self, com, get, put, **kwargs):
async def filter_put(data):
if data is END or data > 3:
await put(data)
return await super().__call__(com, get, filter_put, **kwargs)
@task
async def add(self, num):
print(self.context['name'])
print(context.context['client'])
return num + 1
@flow
def myflow(num_ch):
return num_ch | add | view
context.context.update({
'builders': [NameBuilder],
'executors': [ClientProvider],
'handlers': [Filter]
})
f = myflow(Channel.values(1, 2, 3, 4, 5))
context.context.clear()
asyncio.run(f.start())
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
aiosaber-0.0.1.1.tar.gz
(25.1 kB
view details)
Built Distribution
File details
Details for the file aiosaber-0.0.1.1.tar.gz
.
File metadata
- Download URL: aiosaber-0.0.1.1.tar.gz
- Upload date:
- Size: 25.1 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/3.4.1 importlib_metadata/4.0.1 pkginfo/1.7.0 requests/2.25.1 requests-toolbelt/0.9.1 tqdm/4.61.0 CPython/3.9.5
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 41a7e51c0ce35bb39bca0adb80a6439d665b60e02bc75f5a94326027b7dff613 |
|
MD5 | eb7a4c3dbb633ec979318e821a413b4a |
|
BLAKE2b-256 | b8bbb7935e165d85cbf8de15cf5fdb9db7987ef1cd80e19303cbe686705ea0af |
File details
Details for the file aiosaber-0.0.1.1-py3-none-any.whl
.
File metadata
- Download URL: aiosaber-0.0.1.1-py3-none-any.whl
- Upload date:
- Size: 28.1 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/3.4.1 importlib_metadata/4.0.1 pkginfo/1.7.0 requests/2.25.1 requests-toolbelt/0.9.1 tqdm/4.61.0 CPython/3.9.5
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 989e1eadc03c2df00e47302aa63046a8e6f19100f01a397bc12ac839af457d44 |
|
MD5 | 735c26f76c11f1a816ce4c34f605e173 |
|
BLAKE2b-256 | 91aa68c5ed34a97c50b52037a82f88b800ab9418649ef4ca67285853f9fc3d94 |