A workflow engine with sugar syntax
Project description
🐇 Pyoco
pyoco is a minimal, pure-Python DAG engine for defining and running simple task-based workflows.
Overview
Pyoco is designed to be significantly smaller, lighter, and have fewer dependencies than full-scale workflow engines like Airflow. It is optimized for local development and single-machine execution.
You can define tasks and their dependencies entirely in Python code using decorators and a simple API. There is no need for complex configuration files or external databases.
It is ideal for small jobs, development environments, and personal projects where a full-stack workflow engine would be overkill.
✨ Features
- Pure Python: No external services or heavy dependencies required.
- Minimal DAG model: Tasks and dependencies are defined directly in code.
- Task-oriented: Focus on "small workflows" that should be easy to read and maintain.
- Graph DSL controls:
>>pipeline +pipe/switch/repeat/foreach/untilfor branching and loops inflow.yaml. - Friendly trace logs: Runs can be traced step by step from the terminal with cute (or plain) logs.
- Parallel Execution: Automatically runs independent tasks in parallel.
- Artifact Management: Easily save and manage task outputs and files.
- Observability: Track execution with unique Run IDs and detailed state transitions.
- Control: Cancel running workflows gracefully with
Ctrl+C.
📦 Installation
pip install pyoco
🚀 Usage
Here is a minimal example of a pure-Python workflow.
from pyoco import task
from pyoco.core.models import Flow
from pyoco.core.engine import Engine
@task
def fetch_data(ctx):
print("🐰 Fetching data...")
return {"id": 1, "value": "carrot"}
@task
def process_data(ctx, data):
print(f"🥕 Processing: {data['value']}")
return data['value'].upper()
@task
def save_result(ctx, result):
print(f"✨ Saved: {result}")
# Define the flow
flow = Flow(name="hello_pyoco")
flow >> fetch_data >> process_data >> save_result
# Wire inputs (explicitly for this example)
process_data.task.inputs = {"data": "$node.fetch_data.output"}
save_result.task.inputs = {"result": "$node.process_data.output"}
if __name__ == "__main__":
engine = Engine()
engine.run(flow)
Run it:
python examples/hello_pyoco.py
Output:
🐇 pyoco > start flow=hello_pyoco
🏃 start node=fetch_data
🐰 Fetching data...
✅ done node=fetch_data (0.30 ms)
🏃 start node=process_data
🥕 Processing: carrot
✅ done node=process_data (0.23 ms)
🏃 start node=save_result
✨ Saved: CARROT
✅ done node=save_result (0.30 ms)
🥕 done flow=hello_pyoco
See examples/hello_pyoco.py for the full code.
🧾 flow.yaml Graph DSL
Pyoco also supports workflow definition in flow.yaml with a >>-based graph DSL.
version: 1
pipes:
setup: "prepare >> choose_mode"
tasks:
prepare:
callable: "tasks:prepare"
choose_mode:
callable: "tasks:choose_mode"
run_batch:
callable: "tasks:run_batch"
process_item:
callable: "tasks:process_item"
poll_status:
callable: "tasks:poll_status"
finish:
callable: "tasks:finish"
flow:
defaults:
mode: "batch"
items: ["A", "B", "C"]
done: false
graph: |
pipe(setup)
>> switch(on={{mode}}){
batch: repeat(count=2){ run_batch };
default: run_batch;
}
>> foreach(over={{items}}, item=it, index=idx){ process_item }
>> until(cond={{params.done}}, max_iter=5){ poll_status }
>> finish
>>: sequential dependencypipe(NAME): inline expansion from top-levelpipesswitch(on=...){ ... }: single-branch selectionrepeat/foreach/until: control loops
🏗️ Architecture
Pyoco is designed with a simple flow:
+-----------+ +------------------+ +-----------------+
| User Code | ---> | pyoco.core.Flow | ---> | trace/logger |
| (Tasks) | | (Engine) | | (Console/File) |
+-----------+ +------------------+ +-----------------+
- User Code: You define tasks and workflows using Python decorators.
- Core Engine: The engine resolves dependencies and executes tasks (in parallel where possible).
- Trace: Execution events are sent to the trace backend for logging (cute or plain).
🎭 Modes
Pyoco has two output modes:
- Cute Mode (Default): Uses emojis and friendly messages. Best for local development and learning.
- Non-Cute Mode: Plain text logs. Best for CI/CD and production monitoring.
You can switch modes using an environment variable:
export PYOCO_CUTE=0 # Disable cute mode
Or via CLI flag:
pyoco run --non-cute ...
🔭 Observability / Server (Archived)
Observability and server-related docs are archived and out of scope for the current requirements.
See docs/archive/observability.md and docs/archive/roadmap.md.
🧩 Plug-ins
Need to share domain-specific tasks? Publish an entry point under pyoco.tasks and pyoco will auto-load it. We recommend Task subclasses first (callables still work with warnings). See docs/plugins.md for examples, quickstart, and pyoco plugins list / pyoco plugins lint.
Big data note: pass handles, not copies. For large tensors/images, stash paths or handles in ctx.artifacts/ctx.scratch and let downstream tasks materialize only when needed. For lazy pipelines (e.g., DataPipe), log the pipeline when you actually iterate (typically the training task) instead of materializing upstream.
🧭 Task Discovery (Security)
Pyoco does not allow configuring discovery scope in flow.yaml (the discovery: key is rejected) to reduce the risk of importing unexpected code.
- Entry point plug-ins: auto-loaded from
importlib.metadata.entry_points(group="pyoco.tasks") - Extra imports (ops-controlled): set
PYOCO_DISCOVERY_MODULES(comma/space-separated module names), e.g.PYOCO_DISCOVERY_MODULES=tasks,myapp.extra_tasks - Explicit tasks: prefer
tasks.<name>.callableinflow.yaml(see tutorials)
📚 Documentation
💖 Contributing
We love contributions! Please feel free to submit a Pull Request.
Made with 🥕 by the Pyoco Team.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file pyoco-0.7.0.tar.gz.
File metadata
- Download URL: pyoco-0.7.0.tar.gz
- Upload date:
- Size: 56.6 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.10
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
dfd0bc40849fdeefa65954257ac1192d90b696dd4afada1f8c871a01e1a7d29a
|
|
| MD5 |
6f82c55bf7eb94145162ec57a2932694
|
|
| BLAKE2b-256 |
fa41ff4174006fca1ec6453884162d7f19a937c34aae075800e3d20c521ded75
|
File details
Details for the file pyoco-0.7.0-py3-none-any.whl.
File metadata
- Download URL: pyoco-0.7.0-py3-none-any.whl
- Upload date:
- Size: 59.7 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.10
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
60154a2f8d561ce7ed2e7d0f6454b28c2d8f7021968bcc22c567e9743ef93757
|
|
| MD5 |
7a0d52d61c7a3c2047b91a4fae620c39
|
|
| BLAKE2b-256 |
f2426b2c50cd4c7ce334526867d66cb6b58eea113418c8f9778cbfe87393fbc2
|