Skip to main content

An asynchronous, multicore graph execution framework for Python.

Project description

Cascata

Parallel graph execution framework for Python


Introduction

Cascata is an asynchronous, event-driven, parallel graph execution framework for Python, powered by asyncio and multiprocess. Cascata enables the creation of networks of components, each operating in their own coroutine. Components in a Cascata graph can run on one or more CPU cores, enabling high-performance applications which benefit from parallel and/or asynchronous execution.

Features

  • Parallel, asynchronous graph execution
  • Declarative component and graph definition DSL
  • Automatic workload distribution
  • Configurable port iteration and synchronization

Quick-start guide

Install Cascata

Install cascata with pip:

pip3 install cascata

Define components

Components in Cascata are defined with four decorators: @inport, @outport, @persist, and @component. These decorators are applied to an async function definition, transforming the definition into a Component object. This function accepts the same arguments as the ports and persistent values. The data from the ports, not the ports themselves, are provided as arguments to the function at runtime. Iteration over the ports is handled internally by the component in a user-defined pattern when building the graph. Thus, the runner function needs only to process data and send data.

from cascata import * #imports the @inport, @outport, @component decorators, and the Graph object.


@component # must be at the top of the declaration
@inport('data_in', capacity=10) # Define an InputPort named "data_in", with a capacity of 10.
@inport('constant_in', default=1) # Define an InputPort which is initialized with a default value.
@outport('data_out') #Define an OutputPort, used for sending data to downstream components.
async def NumberIncrementer(data_in, constant_in, data_out): # Always use async def when defining component runners.

    await data_out.send(data_in + constant_in) # Send the sum of the values of the data received by the inports through the output port

@component
@inport('range_in')
@outport('data_out')
async def RangeGenerator(range_in, data_out):

    for i in range(range_in):
        await data_out.send(i) # Send an item through the output port

@component
@inport('data_in')
async def DataPrinter(data_in):

    print(data_in)

Persistent state can be declared with @persist:

@component
@inport('image_in')
@outport('image_out')
@persist('count', lambda: 0)
async def count_images(image_in, image_out, count):
    count.set(count.get() + 1)
    await image_out.send(image_in)

Individual components can be tested outside of a graph using the Component.test classmethod. Provide values for each inport and the method returns a dictionary of lists containing the items sent to each outport:

NumberIncrementer.test(data_in=1, constant_in=2)
{"data_out": [3]}

Build and run a graph

Graphs in Cascata are defined with a simple shorthand DSL:

  • the assignment operator = is used to add components to a graph : my_graph.my_components_name = component_1
  • assigning a component to a graph will inject its name into the local namespace.
  • the less-than operator < is used to initialize ports of a component: increment_numbers_instance.constant_in < 2
  • the right-shift operator >> is used to connect components: increment_numbers_instance.data_out >> print_data.data_in
  • the augmented right-shift >>= connects ports and delivers all outputs as a single list
  • use out_port >> N >= in_port to batch every N sends into a list

Example batching syntax:

producer.out >>= collector.items         # all items at once
producer.out >> 3 >= collector.items    # batches of 3

Batching works for connections originating from component groups as well.

from cascata import * #imports the @inport, @outport, @component decorators, and the Graph object.

graph=Graph()

#add our previously defined components in the graph

graph.arange=RangeGenerator
graph.add_numbers=NumberIncrementer
graph.print_results=DataPrinter

# Doing this injects the names "arange", "add_numbers",
# and "print_results into the local namespace"

#set initial value of the arange.range_in port

arange.range_in < 10 #arange is available in the local namespace

#set initial value of the add_numbers component

add_numbers.constant_in < 5

# define the connections of the graph

arange.data_out >> add_numbers.data_in #outport >> inport
add_numbers.data_out >> print_results.data_in

# run the graph

graph.run(num_workers=3) # optionally specify a number of workers
#by default, uses multiprocess.cpu_count() or the number of components in the graph, whichever is lower.

Other notes and advanced usage:

Components have two methods for defining the iteration pattern of a component, sync() and clk().

  • some_component.clk(some_component.some_inport) will instruct the component to execute its runner only when data is received on the specified port.
  • some_component.sync(some_component.some_inport, some_component.another_inport) is the same as clk(), except the component will execute the runner on the zipped iteration of both ports, treating them as a single clocked group.
  • by default, all ports use the sync behavior unless they are initialized with a static value.
  • multiple groups of ports can be specified with clk and sync.

see the examples folder for more advanced usage.

Subgraphs

Cascata supports nested and reusable subgraphs. You can define a Graph as usual, then add it into another graph via attribute assignment. Export input/output ports on the subgraph to expose them in the parent graph:

# Define a subgraph for preprocessing
sub = Graph()
sub.loader = FileLoaderComponent
sub.processor = DataProcessorComponent
loader.output >> processor.input
sub.export(sub.loader.input, 'load_input')
sub.export(sub.processor.output, 'processed')

# Use subgraph within a larger graph
main = Graph()
main.prep = sub   # copies subgraph into main graph under prefix 'prep'
main.prep_load_input < 'data.csv'
main.prep_processed >> main.final_consumer.data_in
main.run()

Exports make subgraphs act like a component, enabling modular composition of complex pipelines.

Grouped Components

For data-parallel or replication patterns, Cascata provides component groups via the * operator on a component class. This creates multiple parallel instances automatically:

# Create 4 parallel workers
worker_group = WorkerComponent * 4

graph.workers = worker_group  # adds workers_0 ... workers_3 to the graph

# Broadcast input to all workers
input_source.data_out >> workers.data_in

# Collect all outputs back to a single printer (framework inserts a GroupConnector)
workers.result_out >> printer.data_in

graph.run()

Grouping handles one-to-many and many-to-one wiring under the hood, with automatic GroupConnector insertion.

Graph Serialization

Graphs can be serialized to a JSON compatible dictionary and later restored.

g = Graph()
g.worker = WorkerComponent
data = g.to_json()
with open('graph.json', 'w') as f:
    json.dump(data, f)

# Later
with open('graph.json') as f:
    g2 = Graph.from_json(json.load(f))

Subgraphs and component groups are fully preserved by the round trip.

Logger

Cascata includes a built-in, context-aware logger (cascata.log) that automatically tags log messages with component context and process ID. Simply import and use:

import cascata.log as log

@component
@inport('x')
@outport('y')
async def MyComp(x, y):
    log.info("Starting processing")
    await y.send(x * 2)
    log.debug("Finished processing")

Log format:

INFO - MyComp@MyCompClass : Starting processing
DEBUG - MyComp@MyCompClass : Finished processing

Levels and component class names are colorized for readability, and the logger requires no additional setup.

Internals of Cascata

Channels

At its core, Cascata uses a Channel primitive to facilitate communication between components. A complete, running graph in Cascata may be regarded as a network of functions which produce and consume data via iteration over these channels.

A Channel is defined as a thread-safe, non-blocking queue, which provides an open() AsyncContextManager to signal the number of active publishers, an awaitable put() method, and an __aiter__/__anext__ asynchronous iterator which takes from the channel until no more open() contexts are active, and no more items remain in the queue. Channels are capable of a "many-to-many" publisher/consumer pattern.

channel_nine=Channel()

async with open(channel_nine) as ch:
    await channel_nine.put(something)

# In another coroutine:

async for item in channel_nine:
    do_something_to(item)

Components and Ports

Ports in Cascata are responsible for transmitting and receiving data through channels. InputPorts each contain their own Channel, and OutputPorts contain a list of InputPorts to which data sent through the OutputPort is transmitted.

Components in Cascata contain one or more InputPort or OutputPort objects and an awaitable runner function. A component iterates over groups of these ports and executes its runner on the incoming data in a user-defined pattern. A component terminates when all of its iterators have exited.

Graphs

Graphs in Cascata are used to facilitate composition of components into interconnected networks and run them in a group of workers. They are not responsible for any state management, communication, or monitoring of the components themselves. Graphs in Cascata are simply containers of components and their connections.

Performance considerations

  • Cascata uses an asynchronous version of multiprocessing.Queue provided by aioprocessing for inter-process communication, which serializes objects with dill. Use of shared memory for transporting large amounts of data between processes is highly encouraged.
  • Since all components are defined as coroutines, the full feature set of asyncio can be leveraged to accelerate I/O bound code.

Requirements and dependencies:

  • Python 3.10+
  • Currently working only on Linux.
  • Python package dependencies:
	aioitertools
	aioprocessing
	multiprocess
	networkx
	graphviz

Acknowledgements

The component definition syntax for Cascata, as well as a large portion of the ontology of the code itself, is inspired by Rill.

Cascata draws its name from the beautiful language of the mostly peninsular nation of Italy. Cascata means "Waterfall" in Italian.

Contributions and future development

Please report any issues you find with Cascata in the issues page on this repository.

Contributions to Cascata are highly encouraged and merge requests welcomed.

If you are a large corporation, and you find Cascata useful in your highly profitable software development ventures, consider sending the author like a million dollars. TIA

License

Cascata is released under the MIT License. This license permits free use, modification, distribution, and private use of Cascata, but with limited liability and warranty as detailed in the license terms.

Citing Cascata

If you use Cascata in your research, academic projects, or publications, we would appreciate citing it. You can use the following BibTeX entry:

@misc{cascata2023,
  title={Cascata: Parallel Graph Execution Framework for Python},
  author={Stuart MacLeod},
  year={2023},
  howpublished={\url{https://github.com/simacleod/cascata}}
}

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

cascata-0.1.8.tar.gz (38.6 kB view details)

Uploaded Source

Built Distributions

If you're not sure about the file name format, learn more about wheel file names.

cascata-0.1.8-cp313-cp313-manylinux_2_28_x86_64.whl (106.8 kB view details)

Uploaded CPython 3.13manylinux: glibc 2.28+ x86-64

cascata-0.1.8-cp312-cp312-manylinux_2_28_x86_64.whl (106.8 kB view details)

Uploaded CPython 3.12manylinux: glibc 2.28+ x86-64

cascata-0.1.8-cp311-cp311-manylinux_2_28_x86_64.whl (105.9 kB view details)

Uploaded CPython 3.11manylinux: glibc 2.28+ x86-64

cascata-0.1.8-cp310-cp310-manylinux_2_28_x86_64.whl (104.8 kB view details)

Uploaded CPython 3.10manylinux: glibc 2.28+ x86-64

File details

Details for the file cascata-0.1.8.tar.gz.

File metadata

  • Download URL: cascata-0.1.8.tar.gz
  • Upload date:
  • Size: 38.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.9.23

File hashes

Hashes for cascata-0.1.8.tar.gz
Algorithm Hash digest
SHA256 360bd0d2b786239899d1ebb4a9b48d3a54536fa810608971d400bdad584b37f9
MD5 28466e860f3bfad578d1830fe4ea2b31
BLAKE2b-256 657763415067fe8730c7c1105f7e606bb8f4a8bcd6a6186520ad137007634683

See more details on using hashes here.

File details

Details for the file cascata-0.1.8-cp313-cp313-manylinux_2_28_x86_64.whl.

File metadata

File hashes

Hashes for cascata-0.1.8-cp313-cp313-manylinux_2_28_x86_64.whl
Algorithm Hash digest
SHA256 2d9de112c8c48552fcf640fa5447e12e2b9e7a3db7be8916a7146bf1c8326d9f
MD5 d2028897e137cd17331f0396cd884558
BLAKE2b-256 5275e86df0e58a58fd1eff7c283dd187ddeeb8abf46158c94b94cb8ced17035a

See more details on using hashes here.

File details

Details for the file cascata-0.1.8-cp312-cp312-manylinux_2_28_x86_64.whl.

File metadata

File hashes

Hashes for cascata-0.1.8-cp312-cp312-manylinux_2_28_x86_64.whl
Algorithm Hash digest
SHA256 7a48f56f6414c050d69e088e36e922dd871bc71897f7596ab1a99e320cbdfc69
MD5 795eb37009b8dc3df0e8677d46b50460
BLAKE2b-256 86dc51bdcf91d7a9f9fbbdb428a7272e8583cd55d955a042c2edb39e50453efc

See more details on using hashes here.

File details

Details for the file cascata-0.1.8-cp311-cp311-manylinux_2_28_x86_64.whl.

File metadata

File hashes

Hashes for cascata-0.1.8-cp311-cp311-manylinux_2_28_x86_64.whl
Algorithm Hash digest
SHA256 1bff5eb10f4779e1154d53866329225ec01747edb4bbdde497b08aaa1c3dec82
MD5 3064eb174113bb7418e05d4938b12e6e
BLAKE2b-256 70657a920d6e43cb19dc98da6cf4c2790ec0baa3d32ccedde98b2cff3f12f7a0

See more details on using hashes here.

File details

Details for the file cascata-0.1.8-cp310-cp310-manylinux_2_28_x86_64.whl.

File metadata

File hashes

Hashes for cascata-0.1.8-cp310-cp310-manylinux_2_28_x86_64.whl
Algorithm Hash digest
SHA256 5d992ca5a04572d9caab644a7d503196e814a3aff410b5dc4808ca3dc3e85f8a
MD5 afc6cf2ba96938641431122e51752f43
BLAKE2b-256 dd2fe35934a94d4078fe0bb98f6d796fd7879c12b3d66cb1f708b5313445ce1a

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page