A concurrent streaming package
Project description
A concurrent streaming package
- Dataflow based functional syntax.
- Implicitly parallelism for both async and non-async functions.
- Composable for both flows and tasks.
- Extensible with middlewares.
Installation
pip install aiosaber
Example
- check tests for more examples.
from aiosaber import *
@task
def add(self, num):
for i in range(100000):
num += 1
return num
@task
async def multiply(num1, num2):
return num1 * num2
@flow
def sub_flow(num):
return add(num) | map_(lambda x: x ** 2) | add
@flow
def my_flow(num):
[sub_flow(num), sub_flow(num)] | multiply | view
num_ch = Channel.values(*list(range(100)))
f = my_flow(num_ch)
asyncio.run(f.start())
Middleware example
from aiosaber import *
class NameBuilder(BaseBuilder):
def __call__(self, com, *args, **kwargs):
super().__call__(com, *args, **kwargs)
com.context['name'] = type(com).__name__ + str(id(com))
class ClientProvider(BaseExecutor):
async def __call__(self, com, **kwargs):
if not context.context.get('client'):
context.context['client'] = 'client'
return await super().__call__(com, **kwargs)
class Filter(BaseHandler):
async def __call__(self, com, get, put, **kwargs):
async def filter_put(data):
if data is END or data > 3:
await put(data)
return await super().__call__(com, get, filter_put, **kwargs)
@task
async def add(self, num):
print(self.context['name'])
print(context.context['client'])
return num + 1
@flow
def myflow(num_ch):
return num_ch | add | view
context.context.update({
'builders': [NameBuilder],
'executors': [ClientProvider],
'handlers': [Filter]
})
f = myflow(Channel.values(1, 2, 3, 4, 5))
context.context.clear()
asyncio.run(f.start())
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
aiosaber-0.0.1.1.tar.gz
(25.1 kB
view details)
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file aiosaber-0.0.1.1.tar.gz.
File metadata
- Download URL: aiosaber-0.0.1.1.tar.gz
- Upload date:
- Size: 25.1 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/3.4.1 importlib_metadata/4.0.1 pkginfo/1.7.0 requests/2.25.1 requests-toolbelt/0.9.1 tqdm/4.61.0 CPython/3.9.5
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
41a7e51c0ce35bb39bca0adb80a6439d665b60e02bc75f5a94326027b7dff613
|
|
| MD5 |
eb7a4c3dbb633ec979318e821a413b4a
|
|
| BLAKE2b-256 |
b8bbb7935e165d85cbf8de15cf5fdb9db7987ef1cd80e19303cbe686705ea0af
|
File details
Details for the file aiosaber-0.0.1.1-py3-none-any.whl.
File metadata
- Download URL: aiosaber-0.0.1.1-py3-none-any.whl
- Upload date:
- Size: 28.1 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/3.4.1 importlib_metadata/4.0.1 pkginfo/1.7.0 requests/2.25.1 requests-toolbelt/0.9.1 tqdm/4.61.0 CPython/3.9.5
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
989e1eadc03c2df00e47302aa63046a8e6f19100f01a397bc12ac839af457d44
|
|
| MD5 |
735c26f76c11f1a816ce4c34f605e173
|
|
| BLAKE2b-256 |
91aa68c5ed34a97c50b52037a82f88b800ab9418649ef4ca67285853f9fc3d94
|