Skip to main content

Pure asynchronous orchestrations

Project description

Syncopate

Tools for higher-order, pure orchestrations

Usage

  1. Define actions:

    @dataclass
    class Greet:
        name: str
        
    @dataclass
    class Fetch:
        container: str
        num_items: int
        
    Action = Greet | Fetch
    
  2. Define orchestration:

    def orchestration() -> Generator[Action, Any, list[str]]:
        users = yield Fetch(container='users', num_items=3)
        for user in users:
            yield Greet(user)
        return users
    
  3. Define executor:

     def executor(action: Action):
         match action:
             case Greet():
                 print(f"Hi, {action.name}!")
             case Fetch():
                 print(f"Fetching {action.num_items} from '{action.container}'")
                 return [f"User{i}" for i in range(action.num_items)]
    
  4. Run!:

    import syncopate
    
    await syncopate.run(orchestration, executor)
    # Fetching 3 from 'users'
    # Hi, User0!
    # Hi, User1!
    # Hi, User2!
    

Installation

pip install syncopate

Orchestrations and Executors

An executor is just a function of type

Executor = Callable[[Action], Result | Awaitable[Result]]

Action and Result can be anything, as long as the orchestration you pair it with can handle them. An orchestration is just a generator function with type

Orchestration = Generator[Action, Result, Return] # sends Action, receives Result, returns Return

Combinators

Complexer behavior can be easily injected via syncopate.combinators. A combinator is just a higher-order function of type

Combinator = Callable[Executor[Action, Result], Executor[Action2, Result2]]

Example

For instance, parallel execution can be achieved using combinators.parallelize, roughly implemented as:

def parallelize(executor: Executor[Action, Result]) -> Executor[Action | list[Action], Result | list[Result]]:
    async def _executor(actions):
        if isinstance(actions, list):
            return await asyncio.gather(*map(executor, actions))
        else:
            return await executor(action)
    
    return _executor

You can then use the same simple action executor but yield multiple actions to it for simultaneous execution:

def parallel_orchestration() -> Generator[Action | list[Action], Any, list[str]]:
    users = yield Fetch(container='users', num_items=3)
    yield [Greet(user) for user in users]
    return users

syncopate.run(parallel_orchestration(), parallelize(executor))

Composition

Multiple combinators (both in syncopate.combinators and custom) can be composed to create a new executor:

from syncopate.combinators import parallel, logged

def custom_combinator(executor: Executor[...]) -> Executor[...]:
    ...

def executor(action: Action) -> Result:
    ...

custom_logged_parallel_executor = custom_combinator(logged(parallel(executor))) # please don't give variables such long names :)

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distributions

No source distribution files available for this release.See tutorial on generating distribution archives.

Built Distribution

syncopate-0.1.4-py3-none-any.whl (4.7 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page