Pure asynchronous orchestrations
Project description
Syncopate
Tools for higher-order, pure orchestrations
Usage
-
Define actions:
@dataclass class Greet: name: str @dataclass class Fetch: container: str num_items: int Action = Greet | Fetch
-
Define orchestration:
def orchestration(): users = yield Fetch(container='users', num_items=3) for user in users: yield Greet(user) return users
-
Define executor:
def executor(action: Action): match action: case Greet(): print(f"Hi, {action.name}!") case Fetch(): print(f"Fetching {action.num_items} from '{action.container}'") return [f"User{i}" for i in range(action.num_items)]
-
Run!:
from syncopate import conduct await conduct(orchestration, executor) # Fetching 3 from 'users' # Hi, User0! # Hi, User1! # Hi, User2!
Installation
pip install syncopate
Orchestrations and Executors
An executor is just a function of type
Executor = Callable[[Action], Result | Awaitable[Result]]
Action
and Result
can be anything, as long as the orchestration you pair it with can handle them. An orchestration is just a generator function with type
Orchestration = Generator[Action, Result, Return] # sends Action, receives Result, returns Return
Combinators
Complexer behavior can be easily injected via syncopate.combinators
. A combinator is just a higher-order function of type
Combinator = Callable[Executor[Action, Result], Executor[Action2, Result2]]
Example
For instance, parallel execution can be achieved using combinators.parallelize
, roughly implemented as:
def parallelize(executor: Executor[Action, Result]) -> Executor[Action | list[Action], Result | list[Result]]:
async def _executor(actions):
if isinstance(actions, list):
return await asyncio.gather(*map(executor, actions))
else:
return await executor(action)
return _executor
You can then use the same simple action executor but yield multiple actions to it for simultaneous execution:
def parallel_orchestration():
users = yield Fetch(container='users', num_items=3)
yield [Greet(user) for user in users]
return users
conduct(parallel_orchestration(), parallelize(executor))
Composition
Multiple combinators (both in syncopate.combinators
and custom) can be composed to create a new executor:
from syncopate.combinators import parallel, logged
def custom_combinator(executor: Executor[...]) -> Executor[...]:
...
def executor(action: Action) -> Result:
...
custom_logged_parallel_executor = custom_combinator(logged(parallel(logged))) # please don't give variables such long names :)
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distributions
Built Distribution
Hashes for syncopate-0.1.0-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | ca9e723e2f295cc3a05fcae119f073f06b5c3e655c9732d1b79be30ee4ec6ac2 |
|
MD5 | 931223cacf7265240b872d247f390c9d |
|
BLAKE2b-256 | 05b620e1e2a4ebc57d4ba58b872cdb96c8d627e8778a9e41f1e40186ad4cc7b2 |