Skip to main content

Lightweight task dependency engine with topological execution

Project description

philiprehberger-task-graph

Tests PyPI version Last updated

Lightweight task dependency engine with topological execution.

Installation

pip install philiprehberger-task-graph

Usage

from philiprehberger_task_graph import TaskGraph

graph = TaskGraph()

@graph.task()
def fetch_data():
    return download()

@graph.task(depends=["fetch_data"])
def process_data():
    return transform()

@graph.task(depends=["process_data"])
def save_results():
    return store()

# Run tasks in dependency order
results = graph.run()

# Or run with parallelism
results = graph.run_parallel(max_workers=4)

Programmatic API

graph = TaskGraph()
graph.add_task("fetch", fetch_fn)
graph.add_task("process", process_fn, depends=["fetch"])
graph.add_task("save", save_fn, depends=["process"])

# Preview execution order
order = graph.dry_run()
# ["fetch", "process", "save"]

Timeout

Set a maximum execution time for a task. Raises TimeoutError if the task exceeds the limit.

@graph.task(timeout=30.0)
def slow_task():
    return long_running_operation()

# Or with add_task
graph.add_task("fetch", fetch_fn, timeout=10.0)

Retries

Automatically retry a task on failure. The task is retried up to N times before the exception propagates.

@graph.task(retries=3)
def flaky_task():
    return call_unreliable_api()

# Or with add_task
graph.add_task("fetch", fetch_fn, retries=2)

Timeout and Retries Combined

@graph.task(timeout=5.0, retries=2)
def resilient_task():
    return fetch_with_deadline()

Pass Dependency Results

Pass each dependency's return value as a keyword argument to the task function (named after the dependency).

graph = TaskGraph()
graph.add_task("source", lambda: 7)
graph.add_task("double", lambda source: source * 2, depends=["source"])

results = graph.run(pass_results=True)
# {"source": 7, "double": 14}

Cycle Detection

from philiprehberger_task_graph import CycleError

# Raises CycleError if dependencies form a cycle
graph.run()

API

Function / Class Description
TaskGraph() Create a new task graph
@graph.task(name=None, depends=None, timeout=None, retries=0) Decorator to register a task with optional timeout and retries
graph.add_task(name, fn, depends=None, timeout=None, retries=0) Add a task programmatically
graph.run(pass_results=False) Execute tasks in topological order; optionally pass dep results as kwargs
graph.run_parallel(max_workers=None, pass_results=False) Execute with thread parallelism
graph.dry_run() Return execution order without running
CycleError Raised when a dependency cycle is detected

Development

pip install -e .
python -m pytest tests/ -v

Support

If you find this project useful:

Star the repo

🐛 Report issues

💡 Suggest features

❤️ Sponsor development

🌐 All Open Source Projects

💻 GitHub Profile

🔗 LinkedIn Profile

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

philiprehberger_task_graph-0.3.0.tar.gz (7.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

philiprehberger_task_graph-0.3.0-py3-none-any.whl (6.1 kB view details)

Uploaded Python 3

File details

Details for the file philiprehberger_task_graph-0.3.0.tar.gz.

File metadata

File hashes

Hashes for philiprehberger_task_graph-0.3.0.tar.gz
Algorithm Hash digest
SHA256 0dfbedcf6ea925df6422d65426c1d570e3cdd3e9f9e58e84882509b0cd67949b
MD5 106c316a5b1d95fd64696d845bb048f3
BLAKE2b-256 7cb3b0f46d6dfd4337e7d8894f811ff465d0d112198cc51d35deb47fdd49a9c4

See more details on using hashes here.

File details

Details for the file philiprehberger_task_graph-0.3.0-py3-none-any.whl.

File metadata

File hashes

Hashes for philiprehberger_task_graph-0.3.0-py3-none-any.whl
Algorithm Hash digest
SHA256 d62ea325194f93230718067f150368ab72a46ee36e095ed2b29f33e0baee378e
MD5 c5f15dfe321c0479bcffeb5371999548
BLAKE2b-256 d03737a522fbf6bedc88e73e0bda811ddcc21dc691e0b54cd6a69326e0b9fad2

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page