Skip to main content

Orchestrate anything with low overhead, fast, synchronized scheduling.

Project description

Symphonizer

Task Orchestration using Directed Acyclic Graphs

codecov Code style: black CodeFactor License: Apache 2.0

Orchestrate anything with low overhead, fast, synchronized scheduling. Symphonizer is very well suited to orchestrating distributed API requests and dependency ordered tasks. This makes Symphonizer well suited in orchestrating machine learning for model execution, LLVM agent chaining, and any other processes/tasks that can be represented as a directed acyclic graph.

Ideal use cases include for Symphonizer:

  • Idempotent, ordered, processes and flows
  • Remote API requests
  • Orchestrating calls to hybrid cloud, Lambda and Serverless Functions
  • Machine learning and model execution
  • LLVM agent chaining
  • Any dependency ordered API requests

Use cases to avoid:

  • Distributed transactions with ACID guarantees
  • Data pipelines with large data payloads
  • Symphonizer is NOT a workflow engine
  • Symphonizer is NOT a distributed transaction coordinator
  • Symphonizer is NOT a transaction database

Use cases that may require consideration before using Symphonizer:

  • Distributed transactions with eventual consistency
  • Very long running processes - Hours, Days

Symphonizer in the wild

Symphonizer was initially developed to facilitate the orchestration of autonomous applications in an unstable distributed system and has subsequently found a nice home in orchestrating machine LLM agents and prompt engineering.

Symphonizer Model

DAGNote: Hashable -> Is a Graph Node with a JSON serializable payload.

NodeRunner: Callable[..., Any] -> Is a Function or callable class that takes a DAGNote payload as an argument.

  • NodeRunners are most effective when their actions are idempotent.
  • NodeRunners are intended to serve as execution proxies having low compute overhead.
  • An instance of a node runner can only be executed once and is then immutable, this is to ensure idempotency. In order to retry or execute a NodeRunner again, a new instance is required. the NodeRunner.retry_task() method will clone a new child NodeRunner.

Composition: object -> Is a class instantiated with a Dict[Hashable, Set[Hashable]] that represents a directed acyclic graph. Read further in the graphlib standard library documentation.

  • Each DAGNote's NodeRunner is executed in topological order until all nodes have been executed.
  • All execution is synchronized and asynchronously run in memory.

Perform: object -> Is a class that orchestrates the execution of a Compositions. Perform is instantiated with a high water and low water mark. When the number of concurrent running Compositions reach the high water mark, new composition execution is blocked until the number of running Compositions falls below the low water mark. During this time all new Compositions are queued until the low water mark is reached.

Getting started

pip install symphonizer

Example

Here is a basic end to end example of Symphonizer. This example is also available in the examples directory.

import asyncio
import time
import random

from symphonizer.node_runner import NodeRunner
from symphonizer.composition import Composition, DAGNote
from symphonizer.perform import Perform

With this example we will customise the Composition class in and add a sleep time to the processing of each node.

class Compose(Composition):
    @classmethod
    def configure_node_runner(cls, node: DAGNote):
        """ Configure the node runner as appropriate for the DAGNote, the flexibility of this method allows for
        different node runners to be used for different nodes in the DAG or alternatively the same node runner
        where the execute method is configured differently for different nodes in the DAG.
        """
        async def execute(**params):
            sleep_time = random.uniform(0.001, 0.1)
            # print(f"Processing node {node}, sleep for {sleep_time}")
            await asyncio.sleep(sleep_time)

        return NodeRunner().prepare(node=node).run(execute)

Next is the main body of the example which you would ammend to suit your needs.

async def main():
    sample_graph = {
        DAGNote("D"): {DAGNote("B"), DAGNote("C")},
        DAGNote("C"): {DAGNote("A")},
        DAGNote("B"): {DAGNote("A")}
    }
    dag_count_target = 1000
    dag_count_completed = 0
    dag_tracker = {}
    test_stop_future = asyncio.Future()
    perform = Perform()
    start_time = time.time()

    def scheduler_done_cb(instance, status, error=None, elapsed_time: float = 0):
        nonlocal dag_count_completed
        dag_tracker.pop(instance.instance_id, None)
        dag_count_completed += 1
        if dag_count_completed == dag_count_target:
            test_stop_future.set_result(None)

    async def schedule_dag():
        dag = Compose(
            sample_graph,
            schedule_done_cb=scheduler_done_cb,
        )
        asyncio.create_task(perform.add(dag))
        await asyncio.sleep(0)

    print("Starting to schedule DAGs")

    _ = list([await schedule_dag() for _ in range(dag_count_target)])

    print(f"added {dag_count_target} DAGs to perform")
    await asyncio.sleep(0.001)
    await test_stop_future
    end_time = time.time()
    print(f"All DAGs processed in {end_time-start_time} seconds")
    print(f"Average DAG processing time {round((end_time-start_time)/dag_count_target, 4)} seconds")

if __name__ == "__main__":
    asyncio.run(main())

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

symphonizer-0.1.2.tar.gz (18.9 kB view details)

Uploaded Source

Built Distribution

symphonizer-0.1.2-py3-none-any.whl (19.7 kB view details)

Uploaded Python 3

File details

Details for the file symphonizer-0.1.2.tar.gz.

File metadata

  • Download URL: symphonizer-0.1.2.tar.gz
  • Upload date:
  • Size: 18.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.5.1 CPython/3.11.4 Darwin/22.6.0

File hashes

Hashes for symphonizer-0.1.2.tar.gz
Algorithm Hash digest
SHA256 0c66cd4981acdac3e14c23dc4138ca9e2a7de905b500a1fe79c4994a0c79c34b
MD5 643ad649947aa327ce142956f2fd7e39
BLAKE2b-256 139c5219c1190c5a50a5f3000d9c123747b70dc9b9b74bdf19e9643d3188ea78

See more details on using hashes here.

File details

Details for the file symphonizer-0.1.2-py3-none-any.whl.

File metadata

  • Download URL: symphonizer-0.1.2-py3-none-any.whl
  • Upload date:
  • Size: 19.7 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.5.1 CPython/3.11.4 Darwin/22.6.0

File hashes

Hashes for symphonizer-0.1.2-py3-none-any.whl
Algorithm Hash digest
SHA256 f18c7ca83fe5169af5fc5ad600afd129660bc468d2e4f44ca586809f556de565
MD5 6d0d05ab21d35464013f569b58b0076d
BLAKE2b-256 bb1bff9d06dac333d372b39c45e2eec5002f883b034a617cc8362cc1268a89ba

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page