Skip to main content

Orchestrate anything with low overhead, fast, synchronized scheduling.

Project description

Symphonizer

Task Orchestration using Directed Acyclic Graphs

codecov Code style: black CodeFactor License: Apache 2.0

Orchestrate anything with low overhead, fast, synchronized scheduling. Symphonizer is very well suited to orchestrating distributed API requests and dependency ordered tasks. This makes Symphonizer well suited in orchestrating machine learning for model execution, LLVM agent chaining, and any other processes/tasks that can be represented as a directed acyclic graph.

Ideal use cases include for Symphonizer:

  • Idempotent, ordered, processes and flows
  • Remote API requests
  • Orchestrating calls to hybrid cloud, Lambda and Serverless Functions
  • Machine learning and model execution
  • LLVM agent chaining
  • Any dependency ordered API requests

Use cases to avoid:

  • Distributed transactions with ACID guarantees
  • Data pipelines with large data payloads
  • Symphonizer is NOT a workflow engine
  • Symphonizer is NOT a distributed transaction coordinator
  • Symphonizer is NOT a transaction database

Use cases that may require consideration before using Symphonizer:

  • Distributed transactions with eventual consistency
  • Very long running processes - Hours, Days

Symphonizer in the wild

Symphonizer was initially developed to facilitate the orchestration of autonomous applications in an unstable distributed system and has subsequently found a nice home in orchestrating machine LLM agents and prompt engineering.

Symphonizer Model

DAGNote: Hashable -> Is a Graph Node with a JSON serializable payload.

NodeRunner: Callable[..., Any] -> Is a Function or callable class that takes a DAGNote payload as an argument.

  • NodeRunners are most effective when their actions are idempotent.
  • NodeRunners are intended to serve as execution proxies having low compute overhead.
  • An instance of a node runner can only be executed once and is then immutable, this is to ensure idempotency. In order to retry or execute a NodeRunner again, a new instance is required. the NodeRunner.retry_task() method will clone a new child NodeRunner.

Composition: object -> Is a class instantiated with a Dict[Hashable, Set[Hashable]] that represents a directed acyclic graph. Read further in the graphlib standard library documentation.

  • Each DAGNote's NodeRunner is executed in topological order until all nodes have been executed.
  • All execution is synchronized and asynchronously run in memory.

Perform: object -> Is a class that orchestrates the execution of a Compositions. Perform is instantiated with a high water and low water mark. When the number of concurrent running Compositions reach the high water mark, new composition execution is blocked until the number of running Compositions falls below the low water mark. During this time all new Compositions are queued until the low water mark is reached.

Getting started

pip install symphonizer

Example

Here is a basic end to end example of Symphonizer. This example is also available in the examples directory.

import asyncio
import time
import random

from symphonizer.node_runner import NodeRunner
from symphonizer.composition import Composition, DAGNote
from symphonizer.perform import Perform

With this example we will customise the Composition class in and add a sleep time to the processing of each node.

class Compose(Composition):
    @classmethod
    def configure_node_runner(cls, node: DAGNote):
        """ Configure the node runner as appropriate for the DAGNote, the flexibility of this method allows for
        different node runners to be used for different nodes in the DAG or alternatively the same node runner
        where the execute method is configured differently for different nodes in the DAG.
        """
        async def execute(**params):
            sleep_time = random.uniform(0.001, 0.1)
            # print(f"Processing node {node}, sleep for {sleep_time}")
            await asyncio.sleep(sleep_time)

        return NodeRunner().prepare(node=node).run(execute)

Next is the main body of the example which you would ammend to suit your needs.

async def main():
    sample_graph = {
        DAGNote("D"): {DAGNote("B"), DAGNote("C")},
        DAGNote("C"): {DAGNote("A")},
        DAGNote("B"): {DAGNote("A")}
    }
    dag_count_target = 1000
    dag_count_completed = 0
    dag_tracker = {}
    test_stop_future = asyncio.Future()
    perform = Perform()
    start_time = time.time()

    def scheduler_done_cb(instance, status, error=None, elapsed_time: float = 0):
        nonlocal dag_count_completed
        dag_tracker.pop(instance.instance_id, None)
        dag_count_completed += 1
        if dag_count_completed == dag_count_target:
            test_stop_future.set_result(None)

    async def schedule_dag():
        dag = Compose(
            sample_graph,
            schedule_done_cb=scheduler_done_cb,
        )
        asyncio.create_task(perform.add(dag))
        await asyncio.sleep(0)

    print("Starting to schedule DAGs")

    _ = list([await schedule_dag() for _ in range(dag_count_target)])

    print(f"added {dag_count_target} DAGs to perform")
    await asyncio.sleep(0.001)
    await test_stop_future
    end_time = time.time()
    print(f"All DAGs processed in {end_time-start_time} seconds")
    print(f"Average DAG processing time {round((end_time-start_time)/dag_count_target, 4)} seconds")

if __name__ == "__main__":
    asyncio.run(main())

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

symphonizer-0.1.1.tar.gz (18.9 kB view details)

Uploaded Source

Built Distribution

symphonizer-0.1.1-py3-none-any.whl (19.7 kB view details)

Uploaded Python 3

File details

Details for the file symphonizer-0.1.1.tar.gz.

File metadata

  • Download URL: symphonizer-0.1.1.tar.gz
  • Upload date:
  • Size: 18.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.5.1 CPython/3.11.4 Darwin/22.6.0

File hashes

Hashes for symphonizer-0.1.1.tar.gz
Algorithm Hash digest
SHA256 eb096ae13a92eaf06f103886fbb6c52b4bc78da010331d70ec8a789fdfd831e2
MD5 c1d2d58a0884a3689f457f623206cd36
BLAKE2b-256 10cf3a9ca280961afbd7f8a4bd6caad116922a5380497747776118cfba02a3c1

See more details on using hashes here.

File details

Details for the file symphonizer-0.1.1-py3-none-any.whl.

File metadata

  • Download URL: symphonizer-0.1.1-py3-none-any.whl
  • Upload date:
  • Size: 19.7 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.5.1 CPython/3.11.4 Darwin/22.6.0

File hashes

Hashes for symphonizer-0.1.1-py3-none-any.whl
Algorithm Hash digest
SHA256 fb7269276b937cb1fea46bce4a3ec77e87019a144353c3906975e2c9ac773ec2
MD5 f2f923198f108cf48d845dd9708fccb7
BLAKE2b-256 88548ced8c970425766a1c8589d069504518e36aacee89074f617d752e679aec

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page