Skip to main content

Lightweight task dependency engine with topological execution

Project description

philiprehberger-task-graph

Tests PyPI version GitHub release Last updated License Bug Reports Feature Requests Sponsor

Lightweight task dependency engine with topological execution.

Installation

pip install philiprehberger-task-graph

Usage

from philiprehberger_task_graph import TaskGraph

graph = TaskGraph()

@graph.task()
def fetch_data():
    return download()

@graph.task(depends=["fetch_data"])
def process_data():
    return transform()

@graph.task(depends=["process_data"])
def save_results():
    return store()

# Run tasks in dependency order
results = graph.run()

# Or run with parallelism
results = graph.run_parallel(max_workers=4)

Programmatic API

graph = TaskGraph()
graph.add_task("fetch", fetch_fn)
graph.add_task("process", process_fn, depends=["fetch"])
graph.add_task("save", save_fn, depends=["process"])

# Preview execution order
order = graph.dry_run()
# ["fetch", "process", "save"]

Timeout

Set a maximum execution time for a task. Raises TimeoutError if the task exceeds the limit.

@graph.task(timeout=30.0)
def slow_task():
    return long_running_operation()

# Or with add_task
graph.add_task("fetch", fetch_fn, timeout=10.0)

Retries

Automatically retry a task on failure. The task is retried up to N times before the exception propagates.

@graph.task(retries=3)
def flaky_task():
    return call_unreliable_api()

# Or with add_task
graph.add_task("fetch", fetch_fn, retries=2)

Timeout and Retries Combined

@graph.task(timeout=5.0, retries=2)
def resilient_task():
    return fetch_with_deadline()

Cycle Detection

from philiprehberger_task_graph import CycleError

# Raises CycleError if dependencies form a cycle
graph.run()

API

Function / Class Description
TaskGraph() Create a new task graph
@graph.task(name=None, depends=None, timeout=None, retries=0) Decorator to register a task with optional timeout and retries
graph.add_task(name, fn, depends=None, timeout=None, retries=0) Add a task programmatically
graph.run() Execute tasks in topological order
graph.run_parallel(max_workers=None) Execute with thread parallelism
graph.dry_run() Return execution order without running
CycleError Raised when a dependency cycle is detected

Development

pip install -e .
python -m pytest tests/ -v

Support

If you find this package useful, consider giving it a star on GitHub — it helps motivate continued maintenance and development.

LinkedIn More packages

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

philiprehberger_task_graph-0.2.0.tar.gz (7.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

philiprehberger_task_graph-0.2.0-py3-none-any.whl (5.8 kB view details)

Uploaded Python 3

File details

Details for the file philiprehberger_task_graph-0.2.0.tar.gz.

File metadata

File hashes

Hashes for philiprehberger_task_graph-0.2.0.tar.gz
Algorithm Hash digest
SHA256 89f689c8eda781d9bdd7870af206d440beee0ab4b6f9ad52aba98b4721dcac77
MD5 2d6defd460229f79701e0a2f28678acc
BLAKE2b-256 fab71ec0dcbf6590defe9cfa76df36df88ecbd2c508db47ea5f6f3ae5bc226bd

See more details on using hashes here.

File details

Details for the file philiprehberger_task_graph-0.2.0-py3-none-any.whl.

File metadata

File hashes

Hashes for philiprehberger_task_graph-0.2.0-py3-none-any.whl
Algorithm Hash digest
SHA256 5aa091c5f3f20965d1c4697f5d716a933059523e1470e6c3f570761fbae4d96c
MD5 ed376c841b85035753448ed6ea536fcb
BLAKE2b-256 4a63fd7bd9bc5b7c0662d622cb37f89fcc43f75412e3ca0594542907b71b2f80

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page