Skip to main content

A Python 3.12+ library for defining and executing typed DAG task workflows

Project description

Taskmaestro

Taskmaestro

A Python 3.12+ library for defining and executing typed DAG task workflows with Pydantic models, lifecycle hooks, and fail-fast semantics.

Installation

python3 -m venv .venv
source .venv/bin/activate
pip install -e ".[dev]"

Quick Start

Linear Pipeline

from pydantic import BaseModel
from taskmaestro import Task, Workflow, Job, Runner, ExecutionContext

class NumberInput(BaseModel):
    value: int

class NumberOutput(BaseModel):
    value: int

class AddOne(Task[NumberInput, NumberOutput]):
    def run(self, input: NumberInput, ctx: ExecutionContext) -> NumberOutput:
        return NumberOutput(value=input.value + 1)

class Double(Task[NumberOutput, NumberOutput]):
    def run(self, input: NumberOutput, ctx: ExecutionContext) -> NumberOutput:
        return NumberOutput(value=input.value * 2)

workflow = Workflow(name="math", tasks=[AddOne, Double])
job = Job(workflow=workflow, config=NumberInput(value=5))
result = Runner().run(job)

print(result.status)        # "completed"
print(result.result.value)  # 12

DAG Workflow (Fan-In)

from pydantic import BaseModel
from taskmaestro import Task, Workflow, Job, Runner, ExecutionContext

class Input(BaseModel):
    value: int

class Output(BaseModel):
    value: int

class MergedInput(BaseModel):
    a: Output
    b: Output

class MergedOutput(BaseModel):
    total: int

class BranchA(Task[Input, Output]):
    def run(self, input: Input, ctx: ExecutionContext) -> Output:
        return Output(value=input.value + 1)

class BranchB(Task[Input, Output]):
    def run(self, input: Input, ctx: ExecutionContext) -> Output:
        return Output(value=input.value * 2)

class Merge(Task[MergedInput, MergedOutput]):
    def run(self, input: MergedInput, ctx: ExecutionContext) -> MergedOutput:
        return MergedOutput(total=input.a.value + input.b.value)

workflow = (
    Workflow.builder(name="fan_in")
    .add_task(BranchA)
    .add_task(BranchB)
    .add_task(Merge, depends_on={"a": BranchA, "b": BranchB})
    .build()
)

job = Job(workflow=workflow, config=Input(value=5))
result = Runner().run(job)

print(result.status)        # "completed"
print(result.result.total)  # 16 (6 + 10)

Core Concepts

You define Tasks (typed units of work), compose them into a Workflow (linear chain or DAG), bind input data via a Job, and hand it to a Runner for execution. Type safety is enforced at build time — input/output models are validated across the entire graph. Fail-fast semantics stop execution on the first error, and Hooks provide cross-cutting lifecycle observations without coupling to task logic.

Concept Description
Task Subclass Task[I, O] with Pydantic models for input and output, then implement run(input, ctx). Each task can declare an optional timeout_seconds. For tasks with multiple named outputs, use inline Inputs/Outputs classes inside the task body.
Workflow Build a linear pipeline with Workflow(tasks=[...]) or a DAG with Workflow.builder(). The builder accepts depends_on for single dependencies, fan-in dicts ({"field": UpstreamTask}), and (Task, "field") tuples for output field routing. Use config_fields to declare which input fields come from JobConfiguration. Workflows are validated at build time for cycles, type compatibility, and input completeness.
Job Binds a Workflow to a typed config (the root task's input). Tracks status (pendingrunningcompleted/failed), the final result, any error, and per-task task_results. Optionally accepts a JobConfiguration for per-task static config values. A job can only be run once.
Runner Executes tasks in topological order, stopping on the first failure (fail-fast). Supports per-task and per-job timeouts via signal.alarm (Unix only). Dispatches lifecycle events to registered hooks.
ExecutionContext Passed to every run() call. Provides a logger, an auto-generated correlation_id (UUID), a scratch_dir (temporary directory), and a service registry (register()/resolve()) for injecting shared resources like DB connections.
Hooks Subclass BaseHook and override methods like on_job_start, on_task_complete, etc. Hook errors are swallowed and reported via warnings.warn(), so they never crash the job. Built-ins: LoggingHook, TimingHook, ResultPersistenceHook.
ObjectModel Generic ObjectModel[T] base model for wrapping arbitrary (non-Pydantic) objects. Enables arbitrary_types_allowed so fields can hold native library objects like database connections or API clients.

Named Task Instances

The same Task class can appear multiple times in a workflow with different names. Use the name= parameter in add_task():

workflow = (
    Workflow.builder(name="parallel_wells")
    .add_task(LoadModel)
    .add_task(LoadWellPath, name="well_1", depends_on=LoadModel)
    .add_task(LoadWellPath, name="well_2", depends_on=LoadModel)
    .add_task(Process, name="proc_1", depends_on="well_1")
    .add_task(Process, name="proc_2", depends_on="well_2")
    .build()
)

Named instances can be referenced as string dependencies (depends_on="well_1") or in fan-in dicts.

Per-Task Configuration

JobConfiguration provides static config values for individual tasks, merged with upstream outputs at runtime. Declare which fields come from config with config_fields:

from taskmaestro import EmptyConfig, Job, JobConfiguration, Workflow

workflow = (
    Workflow.builder(name="configured")
    .add_task(LoadModel, config_fields=["path"])          # root: all input from config
    .add_task(Transform, depends_on=LoadModel, config_fields=["scale_factor"])  # mixed
    .build()
)

job_config = JobConfiguration({
    "load_model": {"path": "/data/model.egrid"},
    "transform": {"scale_factor": 2.5},
})

job = Job(workflow=workflow, config=EmptyConfig(), job_configuration=job_config)
result = Runner().run(job)

Output Field Routing

Route a specific field from an upstream task's output (rather than the whole output) using (Task, "field") tuples:

class ExtractKeywords(Task):
    class Inputs(BaseModel):
        content: TextContent

    class Outputs(BaseModel):
        keywords: KeywordsOutput
        num_words_removed: int

    def run(self, input: Inputs, ctx: ExecutionContext) -> Outputs: ...

workflow = (
    Workflow.builder(name="analysis")
    .add_task(ExtractKeywords, depends_on=PrepareText)
    .add_task(
        BuildReport,
        depends_on={
            "keywords": (ExtractKeywords, "keywords"),              # routes .keywords field
            "num_words_removed": (ExtractKeywords, "num_words_removed"),  # routes .num_words_removed
            "stats": ComputeWordStats,                              # whole output
        },
    )
    .build()
)

ObjectModel

ObjectModel[T] wraps arbitrary (non-Pydantic) objects so they can flow through workflows. Use it as a type alias for simple wrappers, or subclass it to add extra fields:

from taskmaestro import ObjectModel

# Type alias — no extra fields needed
GridCase = ObjectModel[rips.EclipseCase]
WellPath = ObjectModel[rips.WellPath]

# Subclass — adds fields alongside the wrapped object
class AddPerforationInput(ObjectModel[rips.WellPath]):
    start_md: float
    end_md: float

# Access the wrapped object via .value
grid = GridCase(value=eclipse_case)
print(grid.value.name)

YAML Configuration

Workflows can be defined entirely in YAML instead of Python. The loader dynamically imports task classes and validates the full configuration:

# workflow.yaml
workflow:
  name: text_analysis
  tasks:
    - task: pipeline.PrepareText
    - task: pipeline.GenerateStopWords
    - task: pipeline.ComputeWordStats
      depends_on:
        content: pipeline.PrepareText
        stop_words: pipeline.GenerateStopWords
    - task: pipeline.ExtractKeywords
      depends_on:
        content: pipeline.PrepareText
        stop_words: pipeline.GenerateStopWords
    - task: pipeline.ScoreReadability
      depends_on: pipeline.PrepareText
    - task: pipeline.BuildReport
      depends_on:
        stats: pipeline.ComputeWordStats
        keywords: [pipeline.ExtractKeywords, keywords]                    # output field routing
        readability: pipeline.ScoreReadability
        num_words_removed: [pipeline.ExtractKeywords, num_words_removed]  # output field routing

runner:
  hooks:
    - hook: taskmaestro.hooks.logging.LoggingHook
    - hook: taskmaestro.hooks.timing.TimingHook

context:
  services:
    title: "Python Overview"
# input.yaml
text: "Python is a high-level programming language..."
title: "Python Overview"

Load and run:

from taskmaestro import load_workflow_from_yaml, run_workflow_from_yaml

# Load for inspection, then run
loaded = load_workflow_from_yaml("workflow.yaml", "input.yaml")
result = loaded.run()

# Or run directly
result = run_workflow_from_yaml("workflow.yaml", "input.yaml")

YAML supports named task instances (name:), per-task input config (keyed by task name in the input file), fan-in dicts, and output field routing via [task, field] lists.

Visualization

Generate Mermaid diagrams of workflow topology:

print(workflow.to_mermaid())
# or with config nodes:
print(workflow.to_mermaid(job_configuration=job_config))

Output:

---
title: text_analysis
---
graph TD
    _start_(("start"))
    _end_(("end"))
    prepare_text["prepare_text"]
    generate_stop_words["generate_stop_words"]
    compute_word_stats["compute_word_stats"]
    build_report["build_report"]
    _start_ -->|TextInput| prepare_text
    _start_ -->|TextInput| generate_stop_words
    prepare_text -->|content: TextContent| compute_word_stats
    compute_word_stats -->|WordStatsOutput| build_report
    build_report -->|AnalysisReport| _end_

Edges are labeled with data types. Fan-in edges show field names, and field routing edges show .field: Type. When a JobConfiguration is provided, configured tasks get dashed edges from a JobConfiguration node.

Error Handling

WorkflowRunnerError (base)
├── WorkflowDefinitionError       # Invalid workflow definition
│   ├── CycleDetectedError        # Dependency cycle
│   └── IncompleteInputError      # Missing fan-in field mappings
├── JobStateError                 # e.g., re-running a completed job
├── ConfigLoadError               # YAML config loading failure
└── TaskExecutionError            # Runtime task failure
    ├── TaskOutputTypeError       # Output type mismatch
    └── TaskTimeoutError          # Task exceeded timeout

Examples

Two full example pipelines are included in the examples/ directory:

Example Features
examples/text_analysis/ DAG with fan-out/fan-in, output field routing, inline Inputs/Outputs classes, YAML config, Mermaid visualization
examples/resinsight/ ObjectModel[T] for gRPC objects, JobConfiguration with per-task config, named task instances, config_fields, YAML config

Run an example:

python examples/text_analysis/pipeline.py              # Python API
python examples/text_analysis/pipeline.py --yaml       # YAML config

Development

source .venv/bin/activate
pytest -v                  # run tests
ruff check .               # lint
ruff format .              # format
mypy taskmaestro       # type check (strict)

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

taskmaestro-0.1.0.tar.gz (1.7 MB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

taskmaestro-0.1.0-py3-none-any.whl (40.9 kB view details)

Uploaded Python 3

File details

Details for the file taskmaestro-0.1.0.tar.gz.

File metadata

  • Download URL: taskmaestro-0.1.0.tar.gz
  • Upload date:
  • Size: 1.7 MB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for taskmaestro-0.1.0.tar.gz
Algorithm Hash digest
SHA256 2f6d34aa760beae8a6cbadb7485917868e0d241688dbcb9c9f2318d63da225af
MD5 1c720c7c777683a78b3d7d3af0297e95
BLAKE2b-256 1acb3a366129a5245560531bb334a4754cfafe8fd2c0e5d1c8a920489f957fe1

See more details on using hashes here.

Provenance

The following attestation bundles were made for taskmaestro-0.1.0.tar.gz:

Publisher: publish.yml on OPM/taskmaestro

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file taskmaestro-0.1.0-py3-none-any.whl.

File metadata

  • Download URL: taskmaestro-0.1.0-py3-none-any.whl
  • Upload date:
  • Size: 40.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for taskmaestro-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 4289afdec47bca3c25bd474edd05a582e9cf1f177a03eedbff25fbb74573f733
MD5 6ac4ec34c0265b857462df6abcfcaf46
BLAKE2b-256 5cdc3f9cce9e2c731b5d75d6d24e1e4b185fce7b042f04c85ec4e7d9f5e7aebd

See more details on using hashes here.

Provenance

The following attestation bundles were made for taskmaestro-0.1.0-py3-none-any.whl:

Publisher: publish.yml on OPM/taskmaestro

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page