Skip to main content

Lightweight task dependency engine with topological execution

Project description

philiprehberger-task-graph

Tests PyPI version Last updated

Lightweight task dependency engine with topological execution.

Installation

pip install philiprehberger-task-graph

Usage

from philiprehberger_task_graph import TaskGraph

graph = TaskGraph()

@graph.task()
def fetch_data():
    return download()

@graph.task(depends=["fetch_data"])
def process_data():
    return transform()

@graph.task(depends=["process_data"])
def save_results():
    return store()

# Run tasks in dependency order
results = graph.run()

# Or run with parallelism
results = graph.run_parallel(max_workers=4)

Programmatic API

graph = TaskGraph()
graph.add_task("fetch", fetch_fn)
graph.add_task("process", process_fn, depends=["fetch"])
graph.add_task("save", save_fn, depends=["process"])

# Preview execution order
order = graph.dry_run()
# ["fetch", "process", "save"]

Timeout

Set a maximum execution time for a task. Raises TimeoutError if the task exceeds the limit.

@graph.task(timeout=30.0)
def slow_task():
    return long_running_operation()

# Or with add_task
graph.add_task("fetch", fetch_fn, timeout=10.0)

Retries

Automatically retry a task on failure. The task is retried up to N times before the exception propagates.

@graph.task(retries=3)
def flaky_task():
    return call_unreliable_api()

# Or with add_task
graph.add_task("fetch", fetch_fn, retries=2)

Timeout and Retries Combined

@graph.task(timeout=5.0, retries=2)
def resilient_task():
    return fetch_with_deadline()

Cycle Detection

from philiprehberger_task_graph import CycleError

# Raises CycleError if dependencies form a cycle
graph.run()

API

Function / Class Description
TaskGraph() Create a new task graph
@graph.task(name=None, depends=None, timeout=None, retries=0) Decorator to register a task with optional timeout and retries
graph.add_task(name, fn, depends=None, timeout=None, retries=0) Add a task programmatically
graph.run() Execute tasks in topological order
graph.run_parallel(max_workers=None) Execute with thread parallelism
graph.dry_run() Return execution order without running
CycleError Raised when a dependency cycle is detected

Development

pip install -e .
python -m pytest tests/ -v

Support

If you find this project useful:

Star the repo

🐛 Report issues

💡 Suggest features

❤️ Sponsor development

🌐 All Open Source Projects

💻 GitHub Profile

🔗 LinkedIn Profile

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

philiprehberger_task_graph-0.2.1.tar.gz (7.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

philiprehberger_task_graph-0.2.1-py3-none-any.whl (5.7 kB view details)

Uploaded Python 3

File details

Details for the file philiprehberger_task_graph-0.2.1.tar.gz.

File metadata

File hashes

Hashes for philiprehberger_task_graph-0.2.1.tar.gz
Algorithm Hash digest
SHA256 3d1459a678a6f2ada6bff704a458009936be3ea350963a8fca2d177597892772
MD5 b68993ad7d523cdf7ca635bad55d9650
BLAKE2b-256 a4f1b2c7d00b267c187aeef98796bd155595ba227d71fe092876d8f14dfc9cc1

See more details on using hashes here.

File details

Details for the file philiprehberger_task_graph-0.2.1-py3-none-any.whl.

File metadata

File hashes

Hashes for philiprehberger_task_graph-0.2.1-py3-none-any.whl
Algorithm Hash digest
SHA256 a754fd79a3bb37f89a2351a1dc79b930cc0226fc954433f083100c5c48d93a8c
MD5 62c4179ef6c01839da9c08cb2eee749c
BLAKE2b-256 853abfde148cbd1779b309e675adbc474123801e30df4d8ba870c41a39db398c

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page