Skip to main content

Asynchronous pipeline builder

Project description

Coroflow: Easy and Fast Pipelines

Coroflow makes it easy to run pipelines with coroutines and also support mixing in blocking functions and generators.

Coroflow does a lot of heavy-lifting for you:

  • Manage all tasks in the pipelinen concurently in one thread using coroutines
  • Pass data between tasks with queues
  • Easily specify concurrency limits
  • Connect stages of the pipeline with fan-out/fan-in patterns or load-balancer patterns
  • Define tasks as coroutines, normal (blocking) functions, async generators or normal generators; coroflow will run it appropriately in either the event-loop, a thread pool, or optionally in a processes pool
  • Provides an apache-ariflow-like api for connecting tasks

Getting Started

Coroflow makes it easy to run pipelines with coroutines and also support mixing in blocking functions and generators

    from coroflow import Node, Pipeline
    import asyncio
    import time


    class GenNode(Node):
        async def execute():
            """
            The execute method of the first/root Node has to be a generator,
            either async or synchronous.
            """
            for url in ['img_url_1', 'img_url_2', 'img_url_3']:
                print(f"Yielding {url}")
                await asyncio.sleep(1)
                yield url
            print("Generator is exhausted")
            return


    class DoSomething(Node):
        async def execute(inpt, param=None):
            """
            The execute method of all non-root Nodes should be a async
            or synchronous method.
            """
            # do your async pipelined work
            await asyncio.sleep(1)  # simulated IO delay
            outp = inpt
            print(f"func1: T1 sending {inpt}")
            return outp


    p = Pipeline()
    t0 = GenNode('gen', p)
    t1 = DoSomething('func1', p, kwargs={'param': 'param_t1'})
    t2 = DoSomething('func2', p, kwargs={'param': 'param_t2'})
    t0.set_downstream(t1)
    t1.set_downstream(t2)


    start_time = time.time()
    p.run()
    print(f"Asynchronous duration: {time.time() - start_time}s.")

Tests

Run like so:

$ pytest

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

coroflow-4.0.1.tar.gz (8.9 kB view details)

Uploaded Source

Built Distribution

coroflow-4.0.1-py3-none-any.whl (8.2 kB view details)

Uploaded Python 3

File details

Details for the file coroflow-4.0.1.tar.gz.

File metadata

  • Download URL: coroflow-4.0.1.tar.gz
  • Upload date:
  • Size: 8.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.1.11 CPython/3.7.3 Linux/5.10.68+

File hashes

Hashes for coroflow-4.0.1.tar.gz
Algorithm Hash digest
SHA256 5e550cf9779799b0af6fe5f83e3ae23b11bf18bd5dbed4762f59b46a7ab11912
MD5 8361b136b09111c7f34680f1a815c367
BLAKE2b-256 262d7de067dd493cdbb6e1d93a6f124aa9da1ac95235ecc673d42fd3d52dc56c

See more details on using hashes here.

File details

Details for the file coroflow-4.0.1-py3-none-any.whl.

File metadata

  • Download URL: coroflow-4.0.1-py3-none-any.whl
  • Upload date:
  • Size: 8.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.1.11 CPython/3.7.3 Linux/5.10.68+

File hashes

Hashes for coroflow-4.0.1-py3-none-any.whl
Algorithm Hash digest
SHA256 577679d41efdc5d02716791da35f841dccc180086a9ec80db26b10021295f298
MD5 685d95fcfa3e15696c43a95f4814ca38
BLAKE2b-256 4bc869bd268da2b97456c81ecc23b1928d56f80b6519448f263b9319f5b08134

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page