A Python 3.12+ library for defining and executing typed DAG task workflows
Project description
Taskmaestro
A Python 3.12+ library for defining and executing typed DAG task workflows with Pydantic models, lifecycle hooks, and fail-fast semantics.
Installation
python3 -m venv .venv
source .venv/bin/activate
pip install -e ".[dev]"
Quick Start
Linear Pipeline
from pydantic import BaseModel
from taskmaestro import Task, Workflow, Job, Runner, ExecutionContext
class NumberInput(BaseModel):
value: int
class NumberOutput(BaseModel):
value: int
class AddOne(Task[NumberInput, NumberOutput]):
def run(self, input: NumberInput, ctx: ExecutionContext) -> NumberOutput:
return NumberOutput(value=input.value + 1)
class Double(Task[NumberOutput, NumberOutput]):
def run(self, input: NumberOutput, ctx: ExecutionContext) -> NumberOutput:
return NumberOutput(value=input.value * 2)
workflow = Workflow(name="math", tasks=[AddOne, Double])
job = Job(workflow=workflow, config=NumberInput(value=5))
result = Runner().run(job)
print(result.status) # "completed"
print(result.result.value) # 12
DAG Workflow (Fan-In)
from pydantic import BaseModel
from taskmaestro import Task, Workflow, Job, Runner, ExecutionContext
class Input(BaseModel):
value: int
class Output(BaseModel):
value: int
class MergedInput(BaseModel):
a: Output
b: Output
class MergedOutput(BaseModel):
total: int
class BranchA(Task[Input, Output]):
def run(self, input: Input, ctx: ExecutionContext) -> Output:
return Output(value=input.value + 1)
class BranchB(Task[Input, Output]):
def run(self, input: Input, ctx: ExecutionContext) -> Output:
return Output(value=input.value * 2)
class Merge(Task[MergedInput, MergedOutput]):
def run(self, input: MergedInput, ctx: ExecutionContext) -> MergedOutput:
return MergedOutput(total=input.a.value + input.b.value)
workflow = (
Workflow.builder(name="fan_in")
.add_task(BranchA)
.add_task(BranchB)
.add_task(Merge, depends_on={"a": BranchA, "b": BranchB})
.build()
)
job = Job(workflow=workflow, config=Input(value=5))
result = Runner().run(job)
print(result.status) # "completed"
print(result.result.total) # 16 (6 + 10)
Core Concepts
You define Tasks (typed units of work), compose them into a Workflow (linear chain or DAG), bind input data via a Job, and hand it to a Runner for execution. Type safety is enforced at build time — input/output models are validated across the entire graph. Fail-fast semantics stop execution on the first error, and Hooks provide cross-cutting lifecycle observations without coupling to task logic.
| Concept | Description |
|---|---|
| Task | Subclass Task[I, O] with Pydantic models for input and output, then implement run(input, ctx). Each task can declare an optional timeout_seconds. For tasks with multiple named outputs, use inline Inputs/Outputs classes inside the task body. |
| Workflow | Build a linear pipeline with Workflow(tasks=[...]) or a DAG with Workflow.builder(). The builder accepts depends_on for single dependencies, fan-in dicts ({"field": UpstreamTask}), and (Task, "field") tuples for output field routing. Use config_fields to declare which input fields come from JobConfiguration. Workflows are validated at build time for cycles, type compatibility, and input completeness. |
| Job | Binds a Workflow to a typed config (the root task's input). Tracks status (pending → running → completed/failed), the final result, any error, and per-task task_results. Optionally accepts a JobConfiguration for per-task static config values. A job can only be run once. |
| Runner | Executes tasks in topological order, stopping on the first failure (fail-fast). Supports per-task and per-job timeouts via signal.alarm (Unix only). Dispatches lifecycle events to registered hooks. |
| ExecutionContext | Passed to every run() call. Provides a logger, an auto-generated correlation_id (UUID), a scratch_dir (temporary directory), and a service registry (register()/resolve()) for injecting shared resources like DB connections. |
| Hooks | Subclass BaseHook and override methods like on_job_start, on_task_complete, etc. Hook errors are swallowed and reported via warnings.warn(), so they never crash the job. Built-ins: LoggingHook, TimingHook, ResultPersistenceHook. |
| ObjectModel | Generic ObjectModel[T] base model for wrapping arbitrary (non-Pydantic) objects. Enables arbitrary_types_allowed so fields can hold native library objects like database connections or API clients. |
Named Task Instances
The same Task class can appear multiple times in a workflow with different names. Use the name= parameter in add_task():
workflow = (
Workflow.builder(name="parallel_wells")
.add_task(LoadModel)
.add_task(LoadWellPath, name="well_1", depends_on=LoadModel)
.add_task(LoadWellPath, name="well_2", depends_on=LoadModel)
.add_task(Process, name="proc_1", depends_on="well_1")
.add_task(Process, name="proc_2", depends_on="well_2")
.build()
)
Named instances can be referenced as string dependencies (depends_on="well_1") or in fan-in dicts.
Per-Task Configuration
JobConfiguration provides static config values for individual tasks, merged with upstream outputs at runtime. Declare which fields come from config with config_fields:
from taskmaestro import EmptyConfig, Job, JobConfiguration, Workflow
workflow = (
Workflow.builder(name="configured")
.add_task(LoadModel, config_fields=["path"]) # root: all input from config
.add_task(Transform, depends_on=LoadModel, config_fields=["scale_factor"]) # mixed
.build()
)
job_config = JobConfiguration({
"load_model": {"path": "/data/model.egrid"},
"transform": {"scale_factor": 2.5},
})
job = Job(workflow=workflow, config=EmptyConfig(), job_configuration=job_config)
result = Runner().run(job)
Output Field Routing
Route a specific field from an upstream task's output (rather than the whole output) using (Task, "field") tuples:
class ExtractKeywords(Task):
class Inputs(BaseModel):
content: TextContent
class Outputs(BaseModel):
keywords: KeywordsOutput
num_words_removed: int
def run(self, input: Inputs, ctx: ExecutionContext) -> Outputs: ...
workflow = (
Workflow.builder(name="analysis")
.add_task(ExtractKeywords, depends_on=PrepareText)
.add_task(
BuildReport,
depends_on={
"keywords": (ExtractKeywords, "keywords"), # routes .keywords field
"num_words_removed": (ExtractKeywords, "num_words_removed"), # routes .num_words_removed
"stats": ComputeWordStats, # whole output
},
)
.build()
)
ObjectModel
ObjectModel[T] wraps arbitrary (non-Pydantic) objects so they can flow through workflows. Use it as a type alias for simple wrappers, or subclass it to add extra fields:
from taskmaestro import ObjectModel
# Type alias — no extra fields needed
GridCase = ObjectModel[rips.EclipseCase]
WellPath = ObjectModel[rips.WellPath]
# Subclass — adds fields alongside the wrapped object
class AddPerforationInput(ObjectModel[rips.WellPath]):
start_md: float
end_md: float
# Access the wrapped object via .value
grid = GridCase(value=eclipse_case)
print(grid.value.name)
YAML Configuration
Workflows can be defined entirely in YAML instead of Python. The loader dynamically imports task classes and validates the full configuration:
# workflow.yaml
workflow:
name: text_analysis
tasks:
- task: pipeline.PrepareText
- task: pipeline.GenerateStopWords
- task: pipeline.ComputeWordStats
depends_on:
content: pipeline.PrepareText
stop_words: pipeline.GenerateStopWords
- task: pipeline.ExtractKeywords
depends_on:
content: pipeline.PrepareText
stop_words: pipeline.GenerateStopWords
- task: pipeline.ScoreReadability
depends_on: pipeline.PrepareText
- task: pipeline.BuildReport
depends_on:
stats: pipeline.ComputeWordStats
keywords: [pipeline.ExtractKeywords, keywords] # output field routing
readability: pipeline.ScoreReadability
num_words_removed: [pipeline.ExtractKeywords, num_words_removed] # output field routing
runner:
hooks:
- hook: taskmaestro.hooks.logging.LoggingHook
- hook: taskmaestro.hooks.timing.TimingHook
context:
services:
title: "Python Overview"
# input.yaml
text: "Python is a high-level programming language..."
title: "Python Overview"
Load and run:
from taskmaestro import load_workflow_from_yaml, run_workflow_from_yaml
# Load for inspection, then run
loaded = load_workflow_from_yaml("workflow.yaml", "input.yaml")
result = loaded.run()
# Or run directly
result = run_workflow_from_yaml("workflow.yaml", "input.yaml")
YAML supports named task instances (name:), per-task input config (keyed by task name in the input file), fan-in dicts, and output field routing via [task, field] lists.
Visualization
Generate Mermaid diagrams of workflow topology:
print(workflow.to_mermaid())
# or with config nodes:
print(workflow.to_mermaid(job_configuration=job_config))
Output:
---
title: text_analysis
---
graph TD
_start_(("start"))
_end_(("end"))
prepare_text["prepare_text"]
generate_stop_words["generate_stop_words"]
compute_word_stats["compute_word_stats"]
build_report["build_report"]
_start_ -->|TextInput| prepare_text
_start_ -->|TextInput| generate_stop_words
prepare_text -->|content: TextContent| compute_word_stats
compute_word_stats -->|WordStatsOutput| build_report
build_report -->|AnalysisReport| _end_
Edges are labeled with data types. Fan-in edges show field names, and field routing edges show .field: Type. When a JobConfiguration is provided, configured tasks get dashed edges from a JobConfiguration node.
Error Handling
WorkflowRunnerError (base)
├── WorkflowDefinitionError # Invalid workflow definition
│ ├── CycleDetectedError # Dependency cycle
│ └── IncompleteInputError # Missing fan-in field mappings
├── JobStateError # e.g., re-running a completed job
├── ConfigLoadError # YAML config loading failure
└── TaskExecutionError # Runtime task failure
├── TaskOutputTypeError # Output type mismatch
└── TaskTimeoutError # Task exceeded timeout
Examples
Two full example pipelines are included in the examples/ directory:
| Example | Features |
|---|---|
examples/text_analysis/ |
DAG with fan-out/fan-in, output field routing, inline Inputs/Outputs classes, YAML config, Mermaid visualization |
examples/resinsight/ |
ObjectModel[T] for gRPC objects, JobConfiguration with per-task config, named task instances, config_fields, YAML config |
Run an example:
python examples/text_analysis/pipeline.py # Python API
python examples/text_analysis/pipeline.py --yaml # YAML config
Development
source .venv/bin/activate
pytest -v # run tests
ruff check . # lint
ruff format . # format
mypy taskmaestro # type check (strict)
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file taskmaestro-0.1.0.tar.gz.
File metadata
- Download URL: taskmaestro-0.1.0.tar.gz
- Upload date:
- Size: 1.7 MB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
2f6d34aa760beae8a6cbadb7485917868e0d241688dbcb9c9f2318d63da225af
|
|
| MD5 |
1c720c7c777683a78b3d7d3af0297e95
|
|
| BLAKE2b-256 |
1acb3a366129a5245560531bb334a4754cfafe8fd2c0e5d1c8a920489f957fe1
|
Provenance
The following attestation bundles were made for taskmaestro-0.1.0.tar.gz:
Publisher:
publish.yml on OPM/taskmaestro
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
taskmaestro-0.1.0.tar.gz -
Subject digest:
2f6d34aa760beae8a6cbadb7485917868e0d241688dbcb9c9f2318d63da225af - Sigstore transparency entry: 1507438262
- Sigstore integration time:
-
Permalink:
OPM/taskmaestro@30702bb67c23f59bf57f9896c3456ab04efb9ea6 -
Branch / Tag:
refs/tags/v0.1.0 - Owner: https://github.com/OPM
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@30702bb67c23f59bf57f9896c3456ab04efb9ea6 -
Trigger Event:
push
-
Statement type:
File details
Details for the file taskmaestro-0.1.0-py3-none-any.whl.
File metadata
- Download URL: taskmaestro-0.1.0-py3-none-any.whl
- Upload date:
- Size: 40.9 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
4289afdec47bca3c25bd474edd05a582e9cf1f177a03eedbff25fbb74573f733
|
|
| MD5 |
6ac4ec34c0265b857462df6abcfcaf46
|
|
| BLAKE2b-256 |
5cdc3f9cce9e2c731b5d75d6d24e1e4b185fce7b042f04c85ec4e7d9f5e7aebd
|
Provenance
The following attestation bundles were made for taskmaestro-0.1.0-py3-none-any.whl:
Publisher:
publish.yml on OPM/taskmaestro
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
taskmaestro-0.1.0-py3-none-any.whl -
Subject digest:
4289afdec47bca3c25bd474edd05a582e9cf1f177a03eedbff25fbb74573f733 - Sigstore transparency entry: 1507438358
- Sigstore integration time:
-
Permalink:
OPM/taskmaestro@30702bb67c23f59bf57f9896c3456ab04efb9ea6 -
Branch / Tag:
refs/tags/v0.1.0 - Owner: https://github.com/OPM
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@30702bb67c23f59bf57f9896c3456ab04efb9ea6 -
Trigger Event:
push
-
Statement type: