Pure asynchronous orchestrations
Project description
Syncopate
Tools for higher-order, pure orchestrations
Usage
-
Define actions:
@dataclass class Greet: name: str @dataclass class Fetch: container: str num_items: int Action = Greet | Fetch
-
Define orchestration:
def orchestration() -> Generator[Action, Any, list[str]]: users = yield Fetch(container='users', num_items=3) for user in users: yield Greet(user) return users
-
Define executor:
def executor(action: Action): match action: case Greet(): print(f"Hi, {action.name}!") case Fetch(): print(f"Fetching {action.num_items} from '{action.container}'") return [f"User{i}" for i in range(action.num_items)]
-
Run!:
import syncopate await syncopate.run(orchestration, executor) # Fetching 3 from 'users' # Hi, User0! # Hi, User1! # Hi, User2!
Installation
pip install syncopate
Orchestrations and Executors
An executor is just a function of type
Executor = Callable[[Action], Result | Awaitable[Result]]
Action
and Result
can be anything, as long as the orchestration you pair it with can handle them. An orchestration is just a generator function with type
Orchestration = Generator[Action, Result, Return] # sends Action, receives Result, returns Return
Combinators
Complexer behavior can be easily injected via syncopate.combinators
. A combinator is just a higher-order function of type
Combinator = Callable[Executor[Action, Result], Executor[Action2, Result2]]
Example
For instance, parallel execution can be achieved using combinators.parallelize
, roughly implemented as:
def parallelize(executor: Executor[Action, Result]) -> Executor[Action | list[Action], Result | list[Result]]:
async def _executor(actions):
if isinstance(actions, list):
return await asyncio.gather(*map(executor, actions))
else:
return await executor(action)
return _executor
You can then use the same simple action executor but yield multiple actions to it for simultaneous execution:
def parallel_orchestration() -> Generator[Action | list[Action], Any, list[str]]:
users = yield Fetch(container='users', num_items=3)
yield [Greet(user) for user in users]
return users
syncopate.run(parallel_orchestration(), parallelize(executor))
Composition
Multiple combinators (both in syncopate.combinators
and custom) can be composed to create a new executor:
from syncopate.combinators import parallel, logged
def custom_combinator(executor: Executor[...]) -> Executor[...]:
...
def executor(action: Action) -> Result:
...
custom_logged_parallel_executor = custom_combinator(logged(parallel(executor))) # please don't give variables such long names :)
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distributions
Built Distribution
File details
Details for the file syncopate-0.1.4-py3-none-any.whl
.
File metadata
- Download URL: syncopate-0.1.4-py3-none-any.whl
- Upload date:
- Size: 4.7 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/4.0.2 CPython/3.11.6
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | d5eeebd7dbe0101bbf6e5031626de5cc2fda8632a5b27f73ff1c46665fb3dd13 |
|
MD5 | 8280dd5500168b1213d1c383c6beaf8a |
|
BLAKE2b-256 | 5c97e89d1e82028f651a934b925537db9e3b56c9af8d61b4261c3831e58c0867 |