Skip to main content

A Python library for managing and executing dependent tasks in parallel or sequential order with automatic dependency resolution and topological sorting

Project description

Processes - Smart Task Orchestration

🚀 Processes: Robust Routines Management

Python Version Fast & Lightweight

Documentation License: MIT

Python Tests Status Ruff Lint Status mypy-check

PyPI version

Processes is a lightweight, high-performance Python library designed to execute complex task graphs. It manages dependencies, handles parallel execution, and ensures system resilience without any external libraries.

File logging and email notification is supported.


📑 Table of Contents


✨ Features

  • 🐍 Pure Python: Zero external dependencies. Built entirely on the Python Standard Library.
  • ⚡ Parallel Execution: Built-in support for parallelization to maximize throughput.
  • 🔗 Dependency Resolution: Automatically sorts and executes tasks based on their requirements, regardless of input order.
  • 📝 Shared Logging: Multiple tasks can write to the same logfile or maintain separate ones seamlessly.
  • 📧 Email Notifications: Integrated SMTP support (including HTML) to alert you the moment an exception occurs.

⚙️ Core Concepts

The library operates on two main primitives:

  1. Task: The atomic unit of work. It encapsulates a function, its parameters, its specific logfile, and its relationship with other tasks.
  2. Process: The orchestrator. It builds the execution graph, validates dependencies, and manages the lifecycle of the entire workflow.

🛠️ Use Cases

  • ETL Pipelines: Fetch data from an API, transform it, and load it into a database as separate, dependent tasks.

  • System Maintenance: Run parallel cleanup scripts, check server health, and receive email alerts if a specific check fails.

  • Automated Reporting: Generate multiple data parts in parallel, aggregate them into a final report, and distribute via SMTP.


💻 Quick Start

Four short examples — read top to bottom.

1. One task

from processes import Process, Task

def hello():
    return 42

tasks = [Task("greet", "run.log", hello)]

with Process(tasks) as process:
    result = process.run()

print(result.passed_tasks_results["greet"].result)   # 42

2. args and kwargs

args and kwargs are forwarded to your function — like func(*args, **kwargs).

from processes import Process, Task

def fetch(source, *, limit=100, dry_run=False):
    return ["row1", "row2"]

tasks = [
    Task(
        "fetch_orders", "run.log", fetch,
        args=("orders_api",),
        kwargs={"limit": 500, "dry_run": True},
    ),
]

with Process(tasks) as process:
    process.run()

3. Dependencies + result injection

TaskDependency orders tasks. To also pipe the upstream result into the downstream function, pick one:

  • use_result_as_additional_args=True — appended as the next positional argument.
  • use_result_as_additional_kwargs=True + additional_kwarg_name="..." — passed as a keyword argument.
from processes import Process, Task, TaskDependency

def load_users():
    return [{"id": 1}, {"id": 2}, {"id": 3}]

def enrich(api_key, users):                # `users` injected positionally
    return [{**u, "name": f"user-{u['id']}"} for u in users]

def summarize(*, enriched, label="report"):  # `enriched` injected as kwarg
    return f"{label}: {len(enriched)} users"

tasks = [
    Task("load", "run.log", load_users),

    Task(
        "enrich", "run.log", enrich,
        args=("MY_API_KEY",),
        dependencies=[
            TaskDependency("load", use_result_as_additional_args=True),
        ],
    ),

    Task(
        "summary", "run.log", summarize,
        kwargs={"label": "daily"},
        dependencies=[
            TaskDependency(
                "enrich",
                use_result_as_additional_kwargs=True,
                additional_kwarg_name="enriched",
            ),
        ],
    ),
]

with Process(tasks) as process:
    result = process.run(parallel=True)

print(result.passed_tasks_results["summary"].result)
# "daily: 3 users"

Task order doesn't matter — Process sorts them. A failure only skips its own dependents; the rest keeps running.

4. Email alerts on failure

Attach an HTMLSMTPHandler to any task. If it raises, an HTML email is sent.

from processes import HTMLSMTPHandler, Process, Task

smtp = HTMLSMTPHandler(
    mailhost=("smtp.example.com", 587),
    fromaddr="alerts@example.com",
    toaddrs=["oncall@example.com"],
    credentials=("user", "pass"),
    secure=(),                             # () = STARTTLS; omit for no encryption
)

tasks = [
    Task("risky_step", "run.log", lambda: 1 / 0, html_mail_handler=smtp),
]

with Process(tasks) as process:
    process.run()

🛡️ Fault Tolerance & Logs

Resilience by Design

If a Task raises an exception, the Process does not stop. It intelligently skips any tasks that depend on the failed one but continues to execute all other independent branches of your workflow.

Advanced Logging

All tasks record their execution flow to their assigned logfiles. You can share a single logfile across the whole process or isolate specific tasks for easier debugging.


📦 Installation

Registered in PyPI: https://pypi.org/project/processes/

pip install processes

Also, since it's a pure Python library, you can install it directly from the repository:

pip install git+https://github.com/oliverm91/processes.git

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

processes-2.0.1.tar.gz (248.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

processes-2.0.1-py3-none-any.whl (26.0 kB view details)

Uploaded Python 3

File details

Details for the file processes-2.0.1.tar.gz.

File metadata

  • Download URL: processes-2.0.1.tar.gz
  • Upload date:
  • Size: 248.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for processes-2.0.1.tar.gz
Algorithm Hash digest
SHA256 9527d18e2832074842452178c789fcc149b530e697be122a0fa35029bcb0122c
MD5 ed64721bdeda818ff5560bb07f3ff98e
BLAKE2b-256 2afa2bde0b85b07fc1410d3cc1d31c8b0f0c4e93333f82518def7dcbd0b8888c

See more details on using hashes here.

Provenance

The following attestation bundles were made for processes-2.0.1.tar.gz:

Publisher: publish.yml on oliverm91/processes

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file processes-2.0.1-py3-none-any.whl.

File metadata

  • Download URL: processes-2.0.1-py3-none-any.whl
  • Upload date:
  • Size: 26.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for processes-2.0.1-py3-none-any.whl
Algorithm Hash digest
SHA256 d4ed7ad968002c769b84619ed67e55df678bde42d9393de60a15576e32b669d8
MD5 ae2dee30a8f9cf4c3e8663bd731cc136
BLAKE2b-256 2af51fbfe42ec12725d766a1177517c207a940e295c319e897503d612c270c9e

See more details on using hashes here.

Provenance

The following attestation bundles were made for processes-2.0.1-py3-none-any.whl:

Publisher: publish.yml on oliverm91/processes

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page