A Python library for managing and executing dependent tasks in parallel or sequential order with automatic dependency resolution and topological sorting
Project description
🚀 Processes: Robust Routines Management
Processes is a lightweight, high-performance Python library designed to execute complex task graphs. It manages dependencies, handles parallel execution, and ensures system resilience without any external libraries.
File logging and email notification is supported.
📑 Table of Contents
✨ Features
- 🐍 Pure Python: Zero external dependencies. Built entirely on the Python Standard Library.
- ⚡ Parallel Execution: Built-in support for parallelization to maximize throughput.
- 🔗 Dependency Resolution: Automatically sorts and executes tasks based on their requirements, regardless of input order.
- 📝 Shared Logging: Multiple tasks can write to the same logfile or maintain separate ones seamlessly.
- 📧 Email Notifications: Integrated SMTP support (including HTML) to alert you the moment an exception occurs.
⚙️ Core Concepts
The library operates on two main primitives:
- Task: The atomic unit of work. It encapsulates a function, its parameters, its specific logfile, and its relationship with other tasks.
- Process: The orchestrator. It builds the execution graph, validates dependencies, and manages the lifecycle of the entire workflow.
🛠️ Use Cases
-
ETL Pipelines: Fetch data from an API, transform it, and load it into a database as separate, dependent tasks.
-
System Maintenance: Run parallel cleanup scripts, check server health, and receive email alerts if a specific check fails.
-
Automated Reporting: Generate multiple data parts in parallel, aggregate them into a final report, and distribute via SMTP.
💻 Quick Start
Four short examples — read top to bottom.
1. One task
from processes import Process, Task
def hello():
return 42
tasks = [Task("greet", "run.log", hello)]
with Process(tasks) as process:
result = process.run()
print(result.passed_tasks_results["greet"].result) # 42
2. args and kwargs
args and kwargs are forwarded to your function — like func(*args, **kwargs).
from processes import Process, Task
def fetch(source, *, limit=100, dry_run=False):
return ["row1", "row2"]
tasks = [
Task(
"fetch_orders", "run.log", fetch,
args=("orders_api",),
kwargs={"limit": 500, "dry_run": True},
),
]
with Process(tasks) as process:
process.run()
3. Dependencies + result injection
TaskDependency orders tasks. To also pipe the upstream result into the downstream function, pick one:
use_result_as_additional_args=True— appended as the next positional argument.use_result_as_additional_kwargs=True+additional_kwarg_name="..."— passed as a keyword argument.
from processes import Process, Task, TaskDependency
def load_users():
return [{"id": 1}, {"id": 2}, {"id": 3}]
def enrich(api_key, users): # `users` injected positionally
return [{**u, "name": f"user-{u['id']}"} for u in users]
def summarize(*, enriched, label="report"): # `enriched` injected as kwarg
return f"{label}: {len(enriched)} users"
tasks = [
Task("load", "run.log", load_users),
Task(
"enrich", "run.log", enrich,
args=("MY_API_KEY",),
dependencies=[
TaskDependency("load", use_result_as_additional_args=True),
],
),
Task(
"summary", "run.log", summarize,
kwargs={"label": "daily"},
dependencies=[
TaskDependency(
"enrich",
use_result_as_additional_kwargs=True,
additional_kwarg_name="enriched",
),
],
),
]
with Process(tasks) as process:
result = process.run(parallel=True)
print(result.passed_tasks_results["summary"].result)
# "daily: 3 users"
Task order doesn't matter —
Processsorts them. A failure only skips its own dependents; the rest keeps running.
4. Email alerts on failure
Attach an HTMLSMTPHandler to any task. If it raises, an HTML email is sent.
from processes import HTMLSMTPHandler, Process, Task
smtp = HTMLSMTPHandler(
mailhost=("smtp.example.com", 587),
fromaddr="alerts@example.com",
toaddrs=["oncall@example.com"],
credentials=("user", "pass"),
secure=(), # () = STARTTLS; omit for no encryption
)
tasks = [
Task("risky_step", "run.log", lambda: 1 / 0, html_mail_handler=smtp),
]
with Process(tasks) as process:
process.run()
🛡️ Fault Tolerance & Logs
Resilience by Design
If a Task raises an exception, the Process does not stop. It intelligently skips any tasks that depend on the failed one but continues to execute all other independent branches of your workflow.
Advanced Logging
All tasks record their execution flow to their assigned logfiles. You can share a single logfile across the whole process or isolate specific tasks for easier debugging.
📦 Installation
Registered in PyPI: https://pypi.org/project/processes/
pip install processes
Also, since it's a pure Python library, you can install it directly from the repository:
pip install git+https://github.com/oliverm91/processes.git
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file processes-2.0.1.tar.gz.
File metadata
- Download URL: processes-2.0.1.tar.gz
- Upload date:
- Size: 248.6 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
9527d18e2832074842452178c789fcc149b530e697be122a0fa35029bcb0122c
|
|
| MD5 |
ed64721bdeda818ff5560bb07f3ff98e
|
|
| BLAKE2b-256 |
2afa2bde0b85b07fc1410d3cc1d31c8b0f0c4e93333f82518def7dcbd0b8888c
|
Provenance
The following attestation bundles were made for processes-2.0.1.tar.gz:
Publisher:
publish.yml on oliverm91/processes
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
processes-2.0.1.tar.gz -
Subject digest:
9527d18e2832074842452178c789fcc149b530e697be122a0fa35029bcb0122c - Sigstore transparency entry: 1807546855
- Sigstore integration time:
-
Permalink:
oliverm91/processes@0dee20b864b374dbe0defbc4a0d69aab24fc98e2 -
Branch / Tag:
refs/tags/v2.0.1 - Owner: https://github.com/oliverm91
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@0dee20b864b374dbe0defbc4a0d69aab24fc98e2 -
Trigger Event:
release
-
Statement type:
File details
Details for the file processes-2.0.1-py3-none-any.whl.
File metadata
- Download URL: processes-2.0.1-py3-none-any.whl
- Upload date:
- Size: 26.0 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
d4ed7ad968002c769b84619ed67e55df678bde42d9393de60a15576e32b669d8
|
|
| MD5 |
ae2dee30a8f9cf4c3e8663bd731cc136
|
|
| BLAKE2b-256 |
2af51fbfe42ec12725d766a1177517c207a940e295c319e897503d612c270c9e
|
Provenance
The following attestation bundles were made for processes-2.0.1-py3-none-any.whl:
Publisher:
publish.yml on oliverm91/processes
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
processes-2.0.1-py3-none-any.whl -
Subject digest:
d4ed7ad968002c769b84619ed67e55df678bde42d9393de60a15576e32b669d8 - Sigstore transparency entry: 1807546896
- Sigstore integration time:
-
Permalink:
oliverm91/processes@0dee20b864b374dbe0defbc4a0d69aab24fc98e2 -
Branch / Tag:
refs/tags/v2.0.1 - Owner: https://github.com/oliverm91
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@0dee20b864b374dbe0defbc4a0d69aab24fc98e2 -
Trigger Event:
release
-
Statement type: