Orchestrate graphs of callables in Python with automatic dependency resolution, parallel execution, retries, timeouts, and HTML email alerts on failure — zero dependencies
Project description
Processes: Smart Task Orchestration
Run a list of Python callables that depend on each other — in parallel when possible, with per-task log files and optional HTML email notification on failure. Zero dependencies. Pure Python 3.10+.
✨ Why Processes?
- 🔗 Declare what depends on what — write your tasks in any order; the runtime sorts them so every dependency runs first.
- ⚡ Run in parallel when you can — independent tasks run together on a thread pool; the runtime switches on automatically for jobs with 10+ tasks.
- 🛡️ One failure doesn't stop the rest — a failed task skips only the jobs that depend on it, and every other part of the workflow keeps running.
- 📝 One log file per task — share a single log across the whole run, or keep them separate for easier debugging.
- 📧 Email alerts when something breaks — pass an
SMTPConfigto a task and get a styled HTML email (with traceback, task context, and the list of jobs that were skipped) the instant it raises. - 🧰 Modern, strictly-typed Python 3.10+ —
from __future__ import annotations, fullmypy --strictclean,dict[str, TaskResult],set[str],|unions.
⚙️ How it works
A Process holds a list of Tasks. At construction it validates names, types, dependency references, and detects cycles — raising before anything runs.
When you call process.run(), tasks are topologically sorted and scheduled: dependencies first, independent tasks in parallel.
A TaskDependency can forward an upstream result directly into a downstream function, as a positional or keyword argument. The result is a ProcessResult with passed_tasks_results and failed_tasks for inspection.
🚀 Quick start
A 15-line "hello pipeline" — one upstream task feeding a downstream one, run in parallel.
from processes import Process, Task, TaskDependency
def load_users() -> list[dict]:
return [{"id": 1}, {"id": 2}, {"id": 3}]
def enrich(users: list[dict]) -> list[dict]:
return [{**u, "name": f"user-{u['id']}"} for u in users]
tasks = [
Task("load", "run.log", load_users),
Task(
"enrich",
"run.log",
enrich,
dependencies=[TaskDependency("load", use_result_as_additional_args=True)],
),
]
with Process(tasks) as p:
result = p.run(parallel=True)
print(result.passed_tasks_results["enrich"].result)
# [{'id': 1, 'name': 'user-1'}, {'id': 2, 'name': 'user-2'}, {'id': 3, 'name': 'user-3'}]
🧪 End-to-end example
A realistic mini-pipeline: fetch two sources in parallel, transform them, aggregate, and notify — with per-task log files, result piping, and one task deliberately failing to show fault isolation.
Show the full end-to-end example
import logging
from pathlib import Path
from processes import HTMLEmailStyle, Process, SMTPConfig, Task, TaskDependency
LOG_DIR = Path("logs")
LOG_DIR.mkdir(exist_ok=True)
# --- 1. Two independent "fetch" tasks that run in parallel -----------------
def fetch_orders() -> list[dict]:
logging.info("querying orders API")
return [{"order_id": 1, "amount": 42.0}, {"order_id": 2, "amount": 17.5}]
def fetch_inventory() -> list[dict]:
logging.info("querying inventory API")
return [{"sku": "A-1", "qty": 12}, {"sku": "B-2", "qty": 3}]
# --- 2. Two transforms that consume the upstream results -------------------
def total_revenue(orders: list[dict]) -> float:
total = sum(o["amount"] for o in orders)
logging.info("revenue computed: %s", total)
return total
def stock_value(inventory: list[dict], *, price_per_unit: float = 10.0) -> float:
value = sum(i["qty"] for i in inventory) * price_per_unit
logging.info("stock value: %s", value)
return value
# --- 3. An aggregator that joins the two branches --------------------------
def build_report(*, revenue: float, stock: float) -> str:
return f"daily-report | revenue={revenue:.2f} stock={stock:.2f}"
# --- 4. A flaky notifier that ALWAYS fails — to show fault isolation -------
def notify_slack(report: str) -> None:
raise RuntimeError("slack webhook returned 503")
# --- 5. A sibling task that does NOT depend on notify and still runs -------
def archive_report(report: str) -> str:
out = LOG_DIR / "report.txt"
out.write_text(report)
return str(out)
# --- 6. Optional: SMTP config so failures page on-call --------------------
smtp = SMTPConfig(
mailhost=("smtp.example.com", 587),
fromaddr="alerts@example.com",
toaddrs=["oncall@example.com"],
credentials=("user", "pass"),
secure=(),
)
tasks = [
Task("fetch_orders", LOG_DIR / "fetch_orders.log", fetch_orders),
Task("fetch_inventory", LOG_DIR / "fetch_inventory.log", fetch_inventory),
Task(
"compute_revenue",
LOG_DIR / "compute_revenue.log",
total_revenue,
dependencies=[TaskDependency("fetch_orders", use_result_as_additional_args=True)],
),
Task(
"compute_stock",
LOG_DIR / "compute_stock.log",
stock_value,
kwargs={"price_per_unit": 7.25},
dependencies=[
TaskDependency(
"fetch_inventory",
use_result_as_additional_kwargs=True,
additional_kwarg_name="inventory",
)
],
),
Task(
"build_report",
LOG_DIR / "build_report.log",
build_report,
dependencies=[
TaskDependency("compute_revenue", use_result_as_additional_kwargs=True,
additional_kwarg_name="revenue"),
TaskDependency("compute_stock", use_result_as_additional_kwargs=True,
additional_kwarg_name="stock"),
],
),
# notify_slack fails on purpose. archive_report is a *sibling*
# of notify_slack (both depend on build_report), so it has no
# dependency on the failed task and runs normally — the rest of
# the workflow is not blackholed by one broken step.
Task(
"notify_slack",
LOG_DIR / "notify_slack.log",
notify_slack,
dependencies=[TaskDependency("build_report", use_result_as_additional_args=True)],
smtp_config=smtp,
),
Task(
"archive_report",
LOG_DIR / "archive_report.log",
archive_report,
dependencies=[TaskDependency("build_report", use_result_as_additional_args=True)],
),
]
with Process(tasks) as process:
result = process.run(parallel=True)
print("passed:", sorted(result.passed_tasks_results))
# archive_report, build_report, compute_revenue, compute_stock, fetch_inventory, fetch_orders
print("failed:", sorted(result.failed_tasks))
# notify_slack
print("report:", result.passed_tasks_results["build_report"].result)
# daily-report | revenue=59.50 stock=262.50
The failing notify_slack task does not abort the run. archive_report is a sibling of the failed task (both depend on the successful build_report), so it runs unaffected — the rest of the workflow is not blackholed by one broken step. The HTML email handler also fires on the notify_slack task, paging on-call with the full traceback and the list of downstream tasks that were skipped because of it.
📚 API Reference
Show API reference
Task
Task(
name: str,
log_path: str | os.PathLike,
func: Callable[..., Any],
args: tuple = (),
kwargs: dict | None = None,
dependencies: list[TaskDependency] | None = None,
smtp_config: SMTPConfig | None = None,
email_style: HTMLEmailStyle | None = None,
timeout: float | None = None,
retries: int | None = 0,
retry_on: tuple[type[Exception], ...] | None = None,
)
name— unique within theProcess; no spaces.log_path— the file this task logs to (INFO level, format%(asctime)s - %(name)s - %(levelname)s - %(message)s).func— the callable; receivesfunc(*args, **kwargs)after result-injection.smtp_config— when set, fires an HTML email onlogging.ERROR; body includestask_name,function,args,kwargs, anddownstream_impact.email_style— optional presentation override; defaults toHTMLEmailStyle()(modern, neutral, English) whensmtp_configis set.timeout— seconds allowed per attempt;Nonemeans no limit. When the timeout fires the underlying thread is detached (Python threading limitation).retries— additional attempts after the first failure;0orNonemeans a single attempt. Defaults to0.retry_on— tuple of exception types that trigger a retry. Whenretries >= 1andretry_onisNone, defaults to(ConnectionError, TimeoutError)at call time.
TaskDependency
TaskDependency(
task_name: str,
use_result_as_additional_args: bool = False,
use_result_as_additional_kwargs: bool = False,
additional_kwarg_name: str = "",
)
use_result_as_additional_args=True— upstream result appended as the next positional arg.use_result_as_additional_kwargs=Truewith a non-emptyadditional_kwarg_name— upstream result injected as a keyword arg.- Both flags can be combined (positional first, then kwarg).
Process
Process(tasks: list[Task]) # validates types, names, deps, cycles
process.run(parallel: bool | None = None, max_workers: int = 4) -> ProcessResult
- Raises
DependencyNotFoundError,CircularDependencyError,TypeError,ValueErroron construction if the workflow is malformed. parallel=Noneauto-parallelises whenlen(tasks) >= 10;max_workers=1is always sequential.- Use as a context manager — it cleans up
FileHandlers on exit.
ProcessResult
result.passed_tasks_results # dict[str, TaskResult] — name → TaskResult for every task that succeeded
result.failed_tasks # set[str] — all tasks that did not produce a result (errored + skipped)
result.errored_tasks # set[str] — tasks whose function actually raised
result.skipped_tasks # set[str] — tasks skipped because an upstream dependency failed
TaskResult(worked: bool, result: Any, exception: Exception | None)
SMTPConfig
SMTPConfig(
mailhost, # (host, port)
fromaddr,
toaddrs, # list[str]
credentials=None, # (username, password) | None
secure=None, # () = STARTTLS; omit for no encryption
timeout=5,
)
HTMLEmailStyle
HTMLEmailStyle(
style="modern", # classic | modern | compact
palette="neutral", # neutral | catppuccin | neobones | slate
language="en", # en | es | pt | fr | de | it
traced_vars_frame_filter=None, # substring to pick the traced frame | None
)
All fields are optional — omit HTMLEmailStyle entirely to use the defaults.
Traced Variables
On failure, the email body includes the local variables of the outermost
user frame in the traceback — i.e. the last frame that is not inside
site-packages or your virtualenv. A file:line reference next to the
section shows exactly where those values were captured.
traced_vars_frame_filter lets you point this at a different frame: set it
to a path substring (e.g. one of your own package or module names) to
capture locals from the outermost frame whose filename contains that
substring instead. This is useful for deep-debugging code that runs through
several layers of internal libraries or wrappers, where the default
outermost-user-frame would land too high up the call stack.
Show fault-tolerance rules in detail
When a task raises:
- The exception is caught and stored in
TaskResult.exception; the task name goes intofailed_tasksanderrored_tasks. - Every task that depends on it (directly or indirectly) is skipped — added to
failed_tasksandskipped_taskswithout running. - Every other independent part of the workflow keeps running. With
parallel=Truethey keep running concurrently on the worker pool. - After
run()returns,ProcessResult.errored_tasksandProcessResult.skipped_taskslet you distinguish root failures from cascade skips for triage or alerting.
When a task has retries >= 1, a failure matching retry_on triggers another attempt before the task is declared failed and its dependants are skipped. This gives transient errors (network blips, connection resets) a chance to resolve without aborting downstream work.
This makes the library a good fit for fan-out / fan-in pipelines, "best-effort" notifications, and any workflow where one broken step should not blackhole the rest.
Show comparison with other libraries
| Processes | Airflow | Celery | Luigi | |
|---|---|---|---|---|
| External dependencies | None | many | broker (Redis/RabbitMQ) | few |
| Setup cost | pip install |
cluster | broker + workers | task + config |
| Parallelism | built-in | via executors | via workers | via workers |
| Per-task file logs | yes (built-in) | via handlers | via signals | partial |
| HTML email on failure | yes (built-in) | via callbacks | via signals | manual |
| DAG validation at construction | yes | yes (DAG file) | n/a | partial |
Strict typing (mypy --strict) |
yes | partial | partial | no |
Processes is not a distributed scheduler — there are no workers on remote machines, no SLA monitoring, no web UI. If you need any of those, you need Airflow or a similar orchestrator. If you want a small, fast, dependency-aware pipeline that just runs in a single process, this is it.
Show advanced configuration
- Shared log file — pass the same
log_pathto everyTaskfor a single combined run.log; pass distinct paths for per-task isolation. - Auto-parallel —
Process.run()with no argument runs sequentially for small workflows and switches to parallel forlen(tasks) >= 10. Passparallel=Trueorparallel=Falseto force the mode. - Result inspection — iterate
result.passed_tasks_results.items()to log or post-process every successful task; iterateresult.failed_tasksfor triage. - Re-raising — wrap
process.run()intry/exceptif you need a non-zero exit code on any failure; the library itself does not raise on partial failure.
📦 Installation
From PyPI:
pip install processes
Or straight from the repository (pure Python, no build step):
pip install git+https://github.com/oliverm91/processes.git
Requires Python 3.10+.
📄 License & contributing
Released under the MIT License — see docs for full API details.
Contributions welcome — see CONTRIBUTING.md for the workflow, style, and commit-message conventions used by this project.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file processes-3.1.1.tar.gz.
File metadata
- Download URL: processes-3.1.1.tar.gz
- Upload date:
- Size: 258.6 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
d54f1032008878cf1a8263b9d4e32608602932659024ac26ca3b2fd347b11471
|
|
| MD5 |
35c8ce5095de4c1ef231ca9bcc70bc17
|
|
| BLAKE2b-256 |
d94563c1b09717915ba2e88d6df7dcf152a805ea919ed4459c9f1b62579b34df
|
Provenance
The following attestation bundles were made for processes-3.1.1.tar.gz:
Publisher:
publish.yml on oliverm91/processes
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
processes-3.1.1.tar.gz -
Subject digest:
d54f1032008878cf1a8263b9d4e32608602932659024ac26ca3b2fd347b11471 - Sigstore transparency entry: 1810972230
- Sigstore integration time:
-
Permalink:
oliverm91/processes@f2de732ba653c8c650d6748581369cc22d36c217 -
Branch / Tag:
refs/tags/v3.1.1 - Owner: https://github.com/oliverm91
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@f2de732ba653c8c650d6748581369cc22d36c217 -
Trigger Event:
release
-
Statement type:
File details
Details for the file processes-3.1.1-py3-none-any.whl.
File metadata
- Download URL: processes-3.1.1-py3-none-any.whl
- Upload date:
- Size: 33.1 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
f0c4a5ad6272f1c16c65d5163cd2c3f505a699fa06480e85cd90f5b37c154acc
|
|
| MD5 |
ab60b218316742c504f56149756177a6
|
|
| BLAKE2b-256 |
3b4da68d647ce00794f722bee28baefbc9e73a6f57f3ae5a9b74a8762c7b5760
|
Provenance
The following attestation bundles were made for processes-3.1.1-py3-none-any.whl:
Publisher:
publish.yml on oliverm91/processes
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
processes-3.1.1-py3-none-any.whl -
Subject digest:
f0c4a5ad6272f1c16c65d5163cd2c3f505a699fa06480e85cd90f5b37c154acc - Sigstore transparency entry: 1810972279
- Sigstore integration time:
-
Permalink:
oliverm91/processes@f2de732ba653c8c650d6748581369cc22d36c217 -
Branch / Tag:
refs/tags/v3.1.1 - Owner: https://github.com/oliverm91
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@f2de732ba653c8c650d6748581369cc22d36c217 -
Trigger Event:
release
-
Statement type: