Skip to main content

Lightweight task dependency engine with topological execution

Project description

philiprehberger-task-graph

Tests PyPI version Last updated

philiprehberger-task-graph

Lightweight task dependency engine with topological execution.

Installation

pip install philiprehberger-task-graph

Usage

from philiprehberger_task_graph import TaskGraph

graph = TaskGraph()

@graph.task()
def fetch_data():
    return download()

@graph.task(depends=["fetch_data"])
def process_data():
    return transform()

@graph.task(depends=["process_data"])
def save_results():
    return store()

# Run tasks in dependency order
results = graph.run()

# Or run with parallelism
results = graph.run_parallel(max_workers=4)

Programmatic API

graph = TaskGraph()
graph.add_task("fetch", fetch_fn)
graph.add_task("process", process_fn, depends=["fetch"])
graph.add_task("save", save_fn, depends=["process"])

# Preview execution order
order = graph.dry_run()
# ["fetch", "process", "save"]

Timeout

Set a maximum execution time for a task. Raises TimeoutError if the task exceeds the limit.

@graph.task(timeout=30.0)
def slow_task():
    return long_running_operation()

# Or with add_task
graph.add_task("fetch", fetch_fn, timeout=10.0)

Retries

Automatically retry a task on failure. The task is retried up to N times before the exception propagates.

@graph.task(retries=3)
def flaky_task():
    return call_unreliable_api()

# Or with add_task
graph.add_task("fetch", fetch_fn, retries=2)

Timeout and Retries Combined

@graph.task(timeout=5.0, retries=2)
def resilient_task():
    return fetch_with_deadline()

Pass Dependency Results

Pass each dependency's return value as a keyword argument to the task function (named after the dependency).

graph = TaskGraph()
graph.add_task("source", lambda: 7)
graph.add_task("double", lambda source: source * 2, depends=["source"])

results = graph.run(pass_results=True)
# {"source": 7, "double": 14}

Execution Hooks

Register callbacks to observe task execution — useful for logging, metrics, or tracing without modifying the task functions.

graph = TaskGraph()

@graph.on_before_run
def log_start(name: str) -> None:
    print(f"starting {name}")

@graph.on_after_run
def log_done(name: str, result: object, duration: float) -> None:
    print(f"{name} done in {duration:.3f}s -> {result!r}")

@graph.on_error
def log_failure(name: str, exc: BaseException) -> None:
    print(f"{name} failed: {exc!r}")

The error hook fires only after all retries have been exhausted; the original exception still propagates after every hook has run.

Cycle Detection

from philiprehberger_task_graph import CycleError

# Raises CycleError if dependencies form a cycle
graph.run()

API

Function / Class Description
TaskGraph() Create a new task graph
@graph.task(name=None, depends=None, timeout=None, retries=0) Decorator to register a task with optional timeout and retries
graph.add_task(name, fn, depends=None, timeout=None, retries=0) Add a task programmatically
graph.run(pass_results=False) Execute tasks in topological order; optionally pass dep results as kwargs
graph.run_parallel(max_workers=None, pass_results=False) Execute with thread parallelism
graph.dry_run() Return execution order without running
graph.on_before_run(hook) Register (name) -> None callback fired before each task
graph.on_after_run(hook) Register (name, result, duration) -> None callback fired after each successful task
graph.on_error(hook) Register (name, exc) -> None callback fired after retries are exhausted
CycleError Raised when a dependency cycle is detected

Development

pip install -e .
python -m pytest tests/ -v

Support

If you find this project useful:

Star the repo

🐛 Report issues

💡 Suggest features

❤️ Sponsor development

🌐 All Open Source Projects

💻 GitHub Profile

🔗 LinkedIn Profile

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

philiprehberger_task_graph-0.4.0.tar.gz (188.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

philiprehberger_task_graph-0.4.0-py3-none-any.whl (6.8 kB view details)

Uploaded Python 3

File details

Details for the file philiprehberger_task_graph-0.4.0.tar.gz.

File metadata

File hashes

Hashes for philiprehberger_task_graph-0.4.0.tar.gz
Algorithm Hash digest
SHA256 c018172187e14786d233c1dec7b55c66587fd27015e03e125635b56cf85c5e3a
MD5 873ceb529c6e091894e1b66d0683eb8c
BLAKE2b-256 c14141be290581e28f262eb8a322bf7bcbaf1a26e6a4808521ffbd5fa9050669

See more details on using hashes here.

File details

Details for the file philiprehberger_task_graph-0.4.0-py3-none-any.whl.

File metadata

File hashes

Hashes for philiprehberger_task_graph-0.4.0-py3-none-any.whl
Algorithm Hash digest
SHA256 92e45a82287a9caac1d566e3b7dd728aade170e89a6b8859713e00b832edebc3
MD5 47f1dc08f3f7e0b7dc49e11286850715
BLAKE2b-256 9cbeaba94240bd34540ed40595939be3f9fabb7c5ebcd99f1fae1253a2a70784

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page