Skip to main content

Async flow made easy and fun

Project description

PyPI Status Python Version License

Read the documentation at https://aflowey.readthedocs.io/ Tests Codecov

pre-commit Black

Features

  • Utilities to describe and execute flow with coroutine functions

  • Easily launch several flows simultaneously

  • Strong focus on readability

https://aflowey.readthedocs.io

Requirements

  • python 3.7 +

This library is easier to use with third party libraries for manipulating function such as fn (flip function, function composition…), and tenacity (retry library).

Installation

You can install Aflowey via pip from PyPI:

$ pip install aflowey

Usage

Chain function to execute an async flow !

from aflowey import aflow, CANCEL_FLOW, aexec, flog, partial

db = ... # get client db

# add some other library
from tenacity import retry

async def fetch_url(url):
    return await ...

def process_data(html):
    processed_data = ...  # process data
    if processed_data is None:
        return CANCEL_FLOW

    return processed_data

async def insert_into_db(content):
    return await db.insert_one(content)

def get_url_flow(url):
    # defining a flow for working with url
    return (
        aflow.from_args("http://goole.fr")
        >> retry(wait=2)(fetch_url)
        >> flog("url fetched", print_arg=True)
        >> process_data  # this one is synchronous but may included in the flow
        >> insert_into_db
    )

Execute the flow for one url:

result = await get_url_flow("http://google.com/...").run()

Execute several flows asynchronously:

from fn import flip

name = "Marco"

user_flow = (
    aflow.empty()
    >> partial(db.find_one, search={"username": name})
    >> User.from_dict
    # the impure indicate that this step does not return a new result
    # i.e the result of User.from_dict will be sended
    >> impure(partial(flip(setattr), datetime.now(), 'created_at'))
)

organization_id = "Not employed"

organization_flow = (
    aflow.empty()
    >> partial(db_find_one, search={"organization_id": organization_id})
    >> Organization.from_dict
)

urls = [
    "http://google.com/...",
    "http://google.com/...",
    "http://google.com/...",
    "http://google.com/...",
]

url_flows = [get_url_flow(url) for url in urls]

# execute all flow with asyncio gather method
executor = aexec().from_flows(url_flows) | user_flow | organization_flow
(url1, url2, url3, url4), user, organization = await executor.run()

It can be boring to create function that exactly matches arity of the flow. Aflowey provide some higher order functions to help, see:

  • lift: create a new method accepting transformed arguments

  • F0: from a 0 argument function, create one argument function to fit the arity of the flow

  • F1: create a new function with an extra parameter to process input of the flow step

  • spread: create a new function which spread an iterable of arguments into the given function

  • spread_kw: create a new function which spread kw arguments into the given function

The fn library provide other interesting functions like:

  • flip

  • first

If you have any other ideas…

Contributing

Contributions are very welcome. To learn more, see the Contributor Guide.

License

Distributed under the terms of the MIT license, Aflowey is free and open source software.

Issues

If you encounter any problems, please file an issue along with a detailed description.

Credits

This project was generated from @cjolowicz’s Hypermodern Python Cookiecutter template.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

aflowey-0.1.10.tar.gz (13.9 kB view hashes)

Uploaded Source

Built Distribution

aflowey-0.1.10-py3-none-any.whl (13.6 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page