Skip to main content

Asynchronous data flow

Project description

async-data-flow

Module allow to bundle coroutine functions and synchronous functions into single package inside which functions are executed sequentially (one-by-one). DataFlow package is executed as coroutine. Synchronous function inside package are executed in separated threads. Functions inside package must use only keyword arguments (technically POSITIONAL_OR_KEYWORD arguments) and must return dictionary which can be unpacked and passed to next fuction in package as the arguments. Module depends on asyncio module.

import asyncio
from asyncdataflow import DataFlow

async def foo(a, b):
    return {'c': a, 'd': b}

async def bar(c, d):
    return {'e': c+d}

async def main():
    dataflow = DataFlow((foo, bar))
    result = await dataflow(a=1, b=1)

asyncio.run(main())

DataFlow package is defined as tuple (foo, bar) during DataFlow class instantiation. Initial argumets are passed to DataFlow package during calling DataFlow class instance. Dictionary returned by first function is unpacked and passed as argumetns to next function. Package return dictionary which was returned by last function.

argument visibility

During defining DataFlow object we can specify argumets visibility (args_visibility: str):

  • 'None': initial arguments are visible only by first function in Data Flow, returned values are visible only by next functions in Data Flow
  • 'Initial': initial arguments are visible by all function in Data Flow, returned values are visible only by next functions in Data Flow
  • 'All': initial arguments and returned values are visible by all next functions in Data Flow

Example:

import asyncio
from asyncdataflow import DataFlow

async def foo(a):
    return {'c': a}

async def bar(b, c):
    return {'e': b+c}

async def main():
    dataflow = DataFlow((foo, bar), args_visibility = 'Initial')
    result = await dataflow(a=1, b=1)

asyncio.run(main())

amapper

To use in DataFlow package function which do not return dictionary or we want to map keyword arguments to another key we can use amapper decorator:

from asyncdataflow import amapper

async def foo(a):
    return a
foo = amapper(foo, input={'a': 'in'}, output='out')  
foo(in=...) -> {'out': a}

async def bar(a):
    return a, a*2
bar = args_mapper(bar, input={'a': 'in'}, output=('out1', 'out2'))  
bar(int=...) -> {'out1': a, 'out2': a*2}

async def baz(a):
    return {'o1': a, 'o2': a*2}
baz = args_mapper(baz, input={'a': 'in'}, output={'o1': 'out1', 'o2': 'out2'})  
baz(int=...) -> {'out1': a, 'out2': a*2}

fdispatch

To dispatch function in DataFlow packare we can use fdispatch decorator:

import asyncio
from asyncdataflow import DataFlow, fdispatch

@fdispatch
def foo(key): pass

@foo.register('bar')
async def _(a): return {'b': a}

@foo.register('baz')
async def _(b): return {'c': b}

dataflow = DataFlow((foo('bar'),foo('baz')))

Concurrent execution inside DataFlow package

DataFlow package is defined as a tuple inside which functions are executed sequentially (one by one). We can add nested tuple inside which functions will be executed concurrently:

import asyncio
from asyncdataflow import DataFlow

async def foo(a):
    return {'foo': a}

async def bar(a):
    return {'bar': a}

async def merge(foo, bar):
    return {'merged': foo+bar}

dataflow = DataFlow(((foo, bar), merge))

foo and bar functions are executed concurrently, returned dictionary by them are merged to one. When we add next nested tuple, inside them - function will be executed sequencially, and so on.

Error handling

DataFlow exception hierarchy:

+-- DataFlowException(Exception):
    +-- DataFlowError:
        +-- DataFlowRunItemError:
            +-- DataFlowMergeResultError:
            +-- DataFlowFunctionResultError:
        +-- DataFlowDefinitionError:
            +-- DataFlowFunctionArgsError:
            +-- DataFlowNotCallableError:
            +-- DataFlowNotTupleError:
            +-- DataFlowEmptyError:
        +-- ArgsMapperError:
            +-- ArgsMapperInputKeyError:
            +-- ArgsMapperOutputKeyError:
            +-- ArgsMapperArgsError:
    +-- DispatchError:

Desciption:

  • DataFlowMergeResultError: raised when returned dictionaries cannot be merged
  • DataFlowFunctionResultError: raised when function return other value than dictionary
  • DataFlowFunctionArgsError: raised when function has another arguments than POSITIONAL_OR_KEYWORD arguments
  • DataFlowNotCallableError: raised when DataFlow contain not callable objects
  • DataFlowNotTupleError: raised when DataFlow is defined not as tuple
  • DataFlowEmptyError: raised when DataFlow or sub-DataFlow is empty
  • ArgsMapperInputKeyError: raised when mapping defined in input argument do not correspond to initial function arguments
  • ArgsMapperOutputKeyError: raised when mapping defined in output argument do not correspond to returned from function dictionary
  • ArgsMapperArgsError: raised when passed arguments to functions do not corespond to origin arguments
  • DispatchError: raised when dispatched function wasn't registered

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

async-data-flow-0.0.1.tar.gz (10.7 kB view details)

Uploaded Source

Built Distribution

async_data_flow-0.0.1-py3-none-any.whl (13.1 kB view details)

Uploaded Python 3

File details

Details for the file async-data-flow-0.0.1.tar.gz.

File metadata

  • Download URL: async-data-flow-0.0.1.tar.gz
  • Upload date:
  • Size: 10.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.4.2 importlib_metadata/4.8.1 pkginfo/1.7.1 requests/2.25.1 requests-toolbelt/0.9.1 tqdm/4.62.3 CPython/3.9.2

File hashes

Hashes for async-data-flow-0.0.1.tar.gz
Algorithm Hash digest
SHA256 81a8b7db4e6ea1e59fa012585315a94be16d359f084190995af05351e18a1f36
MD5 29b415d06fc22e0cacd3e19ec879fc5f
BLAKE2b-256 4f4b245a0a1fb3027341a38448d78e7c349ee41606f5a722b3e4264b72467270

See more details on using hashes here.

File details

Details for the file async_data_flow-0.0.1-py3-none-any.whl.

File metadata

  • Download URL: async_data_flow-0.0.1-py3-none-any.whl
  • Upload date:
  • Size: 13.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.4.2 importlib_metadata/4.8.1 pkginfo/1.7.1 requests/2.25.1 requests-toolbelt/0.9.1 tqdm/4.62.3 CPython/3.9.2

File hashes

Hashes for async_data_flow-0.0.1-py3-none-any.whl
Algorithm Hash digest
SHA256 09fac3ffe2a9b30ac67bba2193c9de0905d2469d1c3ca96455a7efadf7cbb7c3
MD5 221183dc16322aa0513779f409232687
BLAKE2b-256 a534032a3813e196d8c9df93ee0fe6b6d2f3a8cded752d286c1e7a9c92cd4312

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page