Skip to main content

Pure asynchronous orchestrations

Project description

Syncopate

Tools for higher-order, pure orchestrations

Usage

  1. Define actions:

    @dataclass
    class Greet:
        name: str
        
    @dataclass
    class Fetch:
        container: str
        num_items: int
        
    Action = Greet | Fetch
    
  2. Define orchestration:

    def orchestration() -> Generator[Action, Any, list[str]]:
        users = yield Fetch(container='users', num_items=3)
        for user in users:
            yield Greet(user)
        return users
    
  3. Define executor:

     def executor(action: Action):
         match action:
             case Greet():
                 print(f"Hi, {action.name}!")
             case Fetch():
                 print(f"Fetching {action.num_items} from '{action.container}'")
                 return [f"User{i}" for i in range(action.num_items)]
    
  4. Run!:

    import syncopate
    
    await syncopate.run(orchestration, executor)
    # Fetching 3 from 'users'
    # Hi, User0!
    # Hi, User1!
    # Hi, User2!
    

Installation

pip install syncopate

Orchestrations and Executors

An executor is just a function of type

Executor = Callable[[Action], Result | Awaitable[Result]]

Action and Result can be anything, as long as the orchestration you pair it with can handle them. An orchestration is just a generator function with type

Orchestration = Generator[Action, Result, Return] # sends Action, receives Result, returns Return

Combinators

Complexer behavior can be easily injected via syncopate.combinators. A combinator is just a higher-order function of type

Combinator = Callable[Executor[Action, Result], Executor[Action2, Result2]]

Example

For instance, parallel execution can be achieved using combinators.parallelize, roughly implemented as:

def parallelize(executor: Executor[Action, Result]) -> Executor[Action | list[Action], Result | list[Result]]:
    async def _executor(actions):
        if isinstance(actions, list):
            return await asyncio.gather(*map(executor, actions))
        else:
            return await executor(action)
    
    return _executor

You can then use the same simple action executor but yield multiple actions to it for simultaneous execution:

def parallel_orchestration() -> Generator[Action | list[Action], Any, list[str]]:
    users = yield Fetch(container='users', num_items=3)
    yield [Greet(user) for user in users]
    return users

syncopate.run(parallel_orchestration(), parallelize(executor))

Composition

Multiple combinators (both in syncopate.combinators and custom) can be composed to create a new executor:

from syncopate.combinators import parallel, logged

def custom_combinator(executor: Executor[...]) -> Executor[...]:
    ...

def executor(action: Action) -> Result:
    ...

custom_logged_parallel_executor = custom_combinator(logged(parallel(executor))) # please don't give variables such long names :)

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distributions

No source distribution files available for this release.See tutorial on generating distribution archives.

Built Distribution

syncopate-0.1.4-py3-none-any.whl (4.7 kB view details)

Uploaded Python 3

File details

Details for the file syncopate-0.1.4-py3-none-any.whl.

File metadata

  • Download URL: syncopate-0.1.4-py3-none-any.whl
  • Upload date:
  • Size: 4.7 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.2 CPython/3.11.6

File hashes

Hashes for syncopate-0.1.4-py3-none-any.whl
Algorithm Hash digest
SHA256 d5eeebd7dbe0101bbf6e5031626de5cc2fda8632a5b27f73ff1c46665fb3dd13
MD5 8280dd5500168b1213d1c383c6beaf8a
BLAKE2b-256 5c97e89d1e82028f651a934b925537db9e3b56c9af8d61b4261c3831e58c0867

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page