An asynchronous, multicore graph execution framework for Python.
Project description
Cascata
Parallel graph execution framework for Python
Introduction
Cascata is an asynchronous, event-driven, parallel graph execution framework for Python, powered by asyncio and multiprocess. Cascata enables the creation of networks of components, each operating in their own coroutine. Components in a Cascata graph can run on one or more CPU cores, enabling high-performance applications which benefit from parallel and/or asynchronous execution.
Features
- Parallel, asynchronous graph execution
- Declarative component and graph definition DSL
- Automatic workload distribution
- Configurable port iteration and synchronization
Quick-start guide
Install Cascata
Install cascata with pip:
pip3 install cascata
Define components
Components in Cascata are defined with three decorators: @inport
, @outport
, and @component
. These decorators are applied to an async function definition, transforming the definition into a Component object. This function accepts the same arguments as the ports. The data from the ports, not the ports themselves, are provided as arguments to the function at runtime. Iteration over the ports is handled internally by the component in a user-defined pattern when building the graph. Thus, the runner function needs only to process data, and send data.
from cascata import * #imports the @inport, @outport, @component decorators, and the Graph object.
@component # must be at the top of the declaration
@inport('data_in', capacity=10) # Define an InputPort named "data_in", with a capacity of 10.
@inport('constant_in', default=1) # Define an InputPort which is initialized with a default value.
@outport('data_out') #Define an OutputPort, used for sending data to downstream components.
async def NumberIncrementer(data_in, constant_in, data_out): # Always use async def when defining component runners.
await data_out.send(data_in + constant_in) # Send the sum of the values of the data received by the inports through the output port
@component
@inport('range_in')
@outport('data_out')
async def RangeGenerator(range_in, data_out):
for i in range(range_in):
await data_out.send(i) # Send an item through the output port
@component
@inport('data_in')
async def DataPrinter(data_in):
print(data_in)
Build and run a graph
Graphs in Cascata are defined with a simple shorthand DSL:
- the assignment operator
=
is used to add components to a graph :my_graph.my_components_name = component_1
- assigning a component to a graph will inject its name into the local namespace.
- the less-than operator
<
is used to initialize ports of a component:increment_numbers_instance.constant_in < 2
- the right-shift operator
>>
is used to connect components:increment_numbers_instance.data_out >> print_data.data_in
from cascata import * #imports the @inport, @outport, @component decorators, and the Graph object.
graph=Graph()
#add our previously defined components in the graph
graph.arange=RangeGenerator
graph.add_numbers=NumberIncrementer
graph.print_results=DataPrinter
# Doing this injects the names "arange", "add_numbers",
# and "print_results into the local namespace"
#set initial value of the arange.range_in port
arange.range_in < 10 #arange is available in the local namespace
#set initial value of the add_numbers component
add_numbers.constant_in < 5
# define the connections of the graph
arange.data_out >> add_numbers.data_in #outport >> inport
add_numbers.data_out >> print_results.data_in
# run the graph
graph.run(num_workers=3) # optionally specify a number of workers
#by default, uses multiprocess.cpu_count() or the number of components in the graph, whichever is lower.
Other notes and advanced usage:
Components have two methods for defining the iteration pattern of a component, sync()
and clk()
.
some_component.clk(some_component.some_inport)
will instruct the component to execute its runner only when data is recieved on the specified port.some_component.sync(some_component.some_inport, some_component.another_inport)
is the same asclk()
, except the component will execute the runner on the zipped iteration of both ports, treating them as a single clocked group.- by default, all ports use the sync behavior unless they are initialized with a static value.
- multiple groups of ports can be specified with
clk
andsync
.
see the examples folder for more advanced usage.
Internals of Cascata
Channels
At its core, Cascata uses a Channel
primitive to facilitate communication between components. A complete, running graph in Cascata may be regarded as a network of functions which produce and consume data via iteration over these channels.
A Channel
is defined as a thread-safe, non-blocking queue, which provides an open()
AsyncContextManager
to signal the number of active publishers, an awaitable put()
method, and an __aiter__
/__anext__
asynchronous iterator which takes from the channel until no more open()
contexts are active, and no more items remain in the queue. Channels are capable of a "many-to-many" publisher/consumer pattern.
channel_nine=Channel()
async with open(channel_nine) as ch:
await channel_nine.put(something)
# In another coroutine:
async for item in channel_nine:
do_something_to(item)
Components and Ports
Ports in Cascata are resposible for transmitting and receiving data through channels. InputPorts each contain their own Channel, and OutputPorts contain a list of InputPorts to which data sent through the OutputPort is transmitted.
Components in Cascata contain one or more InputPort or OutputPort objects and an awaitable runner function. A component iterates over groups of these ports and executes its runner on the incoming data in a user-defined pattern. A component terminates when all of its iterators have exited.
Graphs
Graphs in Cascata are used to facilitate composition of components into interconnected networks and run them in a group of workers. They are not responsible for any state management, communication, or monitoring of the components themselves. Graphs in Cascata are simply containers of components and their connections.
Performance considerations
- Cascata uses an asynchronous version of
multiprocessing.Queue
provided byaioprocessing
for inter-process communication, which serializes objects withdill
. Use of shared memory for transporting large amounts of data between processes is highly encouraged. - Since all components are defined as coroutines, the full feature set of
asyncio
can be leveraged to accelerate I/O bound code.
Requirements and dependencies:
- Python 3.7 or 3.8
- Currently working only on Linux.
- Python package dependencies:
aioitertools
aioprocessing
multiprocess
networkx
graphviz
Acknowlegements
The component definition syntax for Cascata, as well as a large portion of the ontology of the code itself, is inspired by Rill.
Cascata draws its name from the beautiful language of the mostly penninsular nation of Italy. Cascata means "Waterfall" in Italian.
Contributions and future development
Please report any issues you find with Cascata in the issues page on this repository.
Contributions to Cascata are highly encouraged and merge requests welcomed.
Features to be implemented:
- Windows, MacOS support
- Support for Subgraphs
- Support for grouped components
- Python3.9+ support
- Deadlock detection
- Logging facilities
If you are a large corporation, and you find Cascata useful in your highly profitable software development ventures, consider sending the author like a million dollars. TIA
License
Cascata is released under the MIT License. This license permits free use, modification, distribution, and private use of Cascata, but with limited liability and warranty as detailed in the license terms.
Citing Cascata
If you use Cascata in your research, academic projects, or publications, we would appreciate citing it. You can use the following BibTeX entry:
@misc{cascata2023,
title={Cascata: Parallel Graph Execution Framework for Python},
author={Stuart MacLeod},
year={2023},
howpublished={\url{https://github.com/simacleod/cascata}}
}
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file cascata-0.1.2.tar.gz
.
File metadata
- Download URL: cascata-0.1.2.tar.gz
- Upload date:
- Size: 14.6 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/4.0.2 CPython/3.9.18
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 66c9c392b151a6f656ca78ab410bc4fe629f54054dd8821206eabd547170641e |
|
MD5 | 3484f79bbdd8a9e7e441bbe05adaa97f |
|
BLAKE2b-256 | f87ac7e5866ea8bb53cd8c1d033cba16a3b834b75c5854e6052c7a064a8e591a |
File details
Details for the file cascata-0.1.2-py3-none-any.whl
.
File metadata
- Download URL: cascata-0.1.2-py3-none-any.whl
- Upload date:
- Size: 12.1 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/4.0.2 CPython/3.9.18
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | a38a1400e2f4acd9e758b30b83d8e38071097f5e6a3becc32582eb9f8d01ff53 |
|
MD5 | d45db6768cb1e15c72ce92070fd0a527 |
|
BLAKE2b-256 | 1583e5d96656988f8db001afba5774f899ea92bcc2a2cedfcad4047b914b02bf |