Skip to main content

Process Parallelization

Project description

Python package workflow status

PARPROC

A library for parallelizing scripts, by allowing different script elements to be grouped in processes, and allowing each group to depend on the completion of one or more other groups

Installation and Usage

Installation

pip install parproc

Usage

See examples/success.py for an example. Different jobs can be defined and dependencies between them established, e.g.:

import parproc as pp

@pp.Proc(now=True)
def func0(context):
    time.sleep(1)


@pp.Proc(now=True)
def func1(context):
    time.sleep(3)


@pp.Proc(now=True)
def func2(context, deps=['func0', 'func1']):
    time.sleep(2)

The now=True argument kicks off the jobs as soon as they are defined, which is optional. The example then waits for all jobs to finish

pp.wait_for_all()

Argument choices and glob expansion

Protos can define allowed values for arguments via arg_choices. When set, only those values are accepted when creating procs, and you can use glob patterns (*, ?) in filled-out names to create multiple procs at once:

@pp.Proto(name='build::[env]', arg_choices={'env': ['dev', 'prod']})
def build(context: pp.ProcContext, env: str) -> str:
    return f'built_{env}'

# Create one proc
pp.create('build::dev')

# Create all matching procs: build::dev and build::prod
names = pp.create('build::*')
pp.start(*names)
pp.wait(*names)

Using * or ? in an argument when the proto has no arg_choices for that argument raises an error.

Concepts

Proc and Proto

parproc has two main building blocks: Proc and Proto.

A Proc is a single named unit of work. Decorating a function with @pp.Proc registers it immediately as a concrete process that can be scheduled:

@pp.Proc(name='setup')
def setup(context: pp.ProcContext) -> str:
    return 'done'

A Proto (short for prototype) is a template for creating procs. Protos can be parameterized with [placeholder] patterns in their names. Each unique combination of parameters produces a distinct proc:

@pp.Proto(name='build::[env]')
def build(context: pp.ProcContext, env: str) -> str:
    return f'built_{env}'

The separator between placeholders defaults to :: and can be changed via pp.set_options(name_param_separator='...').

Creating procs from protos

Protos don't run directly -- you create concrete procs from them with pp.create:

names = pp.create('build::prod')   # creates one proc named "build::prod"
pp.start(*names)

Parameters are automatically extracted from the filled-out name and passed to the function. Type casting is performed based on the function's type annotations.

Proc vs Proto at a glance

Proc Proto
What it is A concrete process A template for processes
Parameterized No Yes, via [placeholder] patterns
Created when At decoration time Procs are created later via pp.create
Typical use One-off tasks Repeated tasks with different parameters

Lifecycle and state machine

Every proc goes through a state machine:

IDLE  -->  WANTED  -->  RUNNING  -->  SUCCEEDED
                                 \->  FAILED
                                 \->  SKIPPED
                                 \->  UP_TO_DATE
  • IDLE -- defined but not yet scheduled. Will not run until something triggers it.
  • WANTED -- scheduled to run. Waiting for dependencies, locks, or wave ordering.
  • RUNNING -- currently executing in a subprocess (or in-process in single mode).
  • SUCCEEDED -- completed successfully.
  • FAILED -- completed with an error (exception, timeout, dep failure, etc.).
  • SKIPPED -- the proc raised ProcSkippedError; counts as success for downstream deps.
  • UP_TO_DATE -- auto-skipped by the framework because all inputs and dependencies are unchanged since the last cached result. Counts as success for downstream deps. See Incremental builds below.

Scheduling and execution

There are several ways to move a proc from IDLE to WANTED:

# 1. now=True: auto-start at definition time
@pp.Proc(name='early', now=True)
def early(context): ...

# 2. Explicit start
pp.create('build::prod')
pp.start('build::prod')

# 3. Shorthand: create + start in one call
pp.run('build::prod')

# 4. Pulled in as a dependency of another proc (see below)

Once a proc is WANTED, the scheduler checks (in order):

  1. Wave ordering -- lower-wave procs run first.
  2. Locks -- named locks must be available.
  3. Dependencies -- all deps must be in a succeeded state.
  4. Special dependencies -- global conditions like pp.NO_FAILURES.
  5. Parallel limit -- the number of concurrently running procs must not exceed the limit.

When all conditions are met, the proc begins executing.

Waiting for completion

pp.wait_for_all()            # wait for everything
pp.wait('build::prod')       # wait for specific procs
results = pp.results()       # dict of proc_name -> return value

Dependencies (deps)

Dependencies define what must complete before a proc can run. They are specified with the deps parameter:

@pp.Proc(name='deploy', deps=['build', 'test'])
def deploy(context):
    # runs only after both 'build' and 'test' have succeeded
    ...

When a proc becomes WANTED, its dependencies are recursively scheduled too (transitioned from IDLE to WANTED if needed).

Dependency types

Dependencies can take several forms:

@pp.Proto(name='deploy::[env]', deps=[
    'build::[env]',                              # pattern -- [env] is filled from parent
    'lint::all',                                  # filled-out name -- created from proto
    lambda ctx, env: f'provision::{env}',         # lambda -- called at creation time
    pp.NO_FAILURES,                               # special dep -- global condition
])
def deploy(context, env): ...
  • String patterns ('build::[env]'): placeholders are filled with matching args from the parent proc.
  • Filled-out names ('lint::all'): matched against protos and auto-created if a proto matches.
  • Lambdas: called with a DepProcContext (or just the matching keyword args) and must return a proc name or list of names.
  • Special deps (pp.NO_FAILURES): global conditions that must hold. NO_FAILURES is satisfied only when no proc in the run has failed.

Reverse dependencies (rdeps)

Reverse dependencies flip the direction: instead of "I depend on X", an rdep says "I should be injected as a dependency of X". The declaring proc runs before the target.

@pp.Proc(name='setup', rdeps=['deploy::[env]'])
def setup(context):
    return 'initialized'

@pp.Proto(name='deploy::[env]')
def deploy(context, env):
    # 'setup' is automatically injected as a dep of deploy
    ...

When deploy::prod is scheduled, the framework scans all protos and procs for rdeps matching deploy::prod. It finds setup's rdep pattern matches, so setup is injected as a dependency of deploy::prod. The result: setup runs first.

Rdeps support the same [placeholder] pattern matching as proto names. Placeholders in the rdep pattern are matched against the target proc name.

Conditional reverse dependencies (WhenScheduled)

A WhenScheduled rdep is a conditional variant: it only activates when the declaring proc is itself scheduled. It also pulls in (schedules) the target.

@pp.Proto(name='migrate', rdeps=[pp.WhenScheduled('deploy::[env]')])
def migrate(context, env):
    return f'migrated_{env}'

@pp.Proto(name='deploy::[env]')
def deploy(context, env):
    return f'deployed_{env}'

The behavior:

  • Only deploy::prod is scheduled -- migrate is not involved. deploy::prod runs alone.
  • migrate::prod is scheduled (explicitly or as a dependency) -- migrate::prod is injected as a dep of deploy::prod, and deploy::prod is also scheduled. migrate runs first, then deploy.
  • Both are scheduled -- same as above: migrate before deploy.

If the target has already started running or completed before the declaring proc is scheduled, a UserError is raised -- the ordering contract cannot be satisfied.

rdeps vs WhenScheduled at a glance

rdeps (plain string) WhenScheduled
Trigger Target is scheduled Declaring proc is scheduled
Direction Target pulls in declarer Declarer pushes into target
Target scheduling Target must be independently scheduled Target is auto-scheduled
Execution order Declarer before target Declarer before target

Combining deps and rdeps

Deps and rdeps can be mixed freely. A proc can have both regular deps and be injected via rdeps from other procs. Plain string rdeps and WhenScheduled entries can coexist in the same rdeps list:

@pp.Proc(name='init', rdeps=['app::[x]', pp.WhenScheduled('cleanup')])
def init(context): ...

Waves

Waves provide coarse-grained ordering across groups of procs. A proc with a lower wave number runs before procs with higher wave numbers, regardless of explicit dependencies:

@pp.Proc(name='A', wave=0, now=True)
def a(context): ...

@pp.Proc(name='B', wave=1, now=True)
def b(context): ...

@pp.Proc(name='C', wave=2, now=True)
def c(context): ...

All wave-0 procs complete before any wave-1 proc starts, and so on. Within the same wave, procs run in parallel (subject to dependency and lock constraints). A dependency from a lower wave to a higher wave is an error (would cause deadlock).

Locks

Locks provide mutual exclusion. Only one proc holding a given lock name can run at a time:

@pp.Proto(name='deploy::[env]', locks=['deploy-lock'])
def deploy(context, env): ...

If deploy::dev and deploy::prod are both WANTED, only one runs at a time because they share the deploy-lock.

ProcContext

Every proc function receives a ProcContext as its first argument. It provides:

  • context.results -- dict of completed proc results (name -> return value).
  • context.params -- global parameters set via pp.set_params(...).
  • context.args -- arguments specific to this proc (from the proto pattern).
  • context.proc_name -- the name of the current proc.
  • context.create(...) / context.start(...) / context.run(...) / context.wait(...) -- create, start, run, or wait for other procs from within a running proc.
  • context.deps_changed -- True if any dependency proc was freshly executed (not UP_TO_DATE or SKIPPED). Useful for procs that want to do their own "should I do work?" logic.
  • context.changed_deps -- list of dependency names that were freshly executed.
  • context.input_fingerprint -- dict mapping resolved input file paths to their mtime_ns values, or None if no inputs are declared. Available for manual inspection.

Options

Global options are configured via pp.set_options(...):

pp.set_options(
    parallel=4,                   # max concurrent procs (default: 100)
    mode='mp',                    # 'mp' (multiprocess) or 'single' (in-process)
    dynamic=True,                 # live terminal output (default: True if TTY)
    watch=False,                  # enable watch() support when true
    watch_debounce_seconds=0.3,   # quiet-period debounce before watch re-runs (0 disables)
    name_param_separator='::',    # separator between proto pattern segments
    allow_missing_deps=True,      # don't error on deps that don't exist yet
    full_log_on_failure=True,     # show complete log output for failed procs
    task_db_path='build/.parproc.db',  # SQLite DB for run history and result cache
)

Setting task_db_path enables two features: progress bar time estimates (based on historical run durations) and incremental builds (result caching so unchanged procs can be auto-skipped). The DB is safe to delete at any time -- losing it just means everything runs once to repopulate.

By default, parproc prints focused log snippets for failures (keyword/context extraction). Set full_log_on_failure=True to always print the entire failing proc log instead.

Incremental builds

When task_db_path is set, parproc can automatically skip procs whose inputs haven't changed since their last successful run. This is the default behavior -- no extra configuration is needed.

How it works

After a proc completes successfully, its return value and input file fingerprints (modification times) are cached in the task DB. On the next run, before executing a proc, the framework checks:

  1. Does a cached result exist?
  2. Are all dependency procs UP_TO_DATE or SKIPPED (i.e., none produced fresh output)?
  3. If the proc declares inputs: are all input file modification times unchanged?
  4. If the proc declares outputs: do all output files still exist?

If all conditions are met, the proc is auto-skipped with state UP_TO_DATE and its cached return value is loaded into results so downstream procs can still access it.

Declaring file dependencies

Use inputs and outputs to declare which files a proc reads and produces:

@pp.Proto(
    name='build::[target]',
    deps=['generate-config'],
    inputs=lambda ctx, target: [f'src/{target}/**/*.ts'],
    outputs=lambda ctx, target: [f'dist/{target}/bundle.js'],
)
def build(ctx: pp.ProcContext, target: str) -> dict:
    ...
  • inputs -- file paths, glob patterns, or callables that return lists of paths. Used for staleness detection: if any input file is newer than the cached result, the proc re-runs. Glob patterns (e.g. src/**/*.ts) are expanded at check time.
  • outputs -- file paths the proc is expected to produce. After a proc runs, the framework verifies that all declared output files exist. If any are missing, the proc is marked FAILED with error ERROR_OUTPUTS_NOT_REFRESHED.

Both accept a list of strings, a single callable, or a list mixing strings and callables. Callables receive the same DepProcContext and keyword args as dependency lambdas.

Incremental run vs full run

pp.run() accepts a full keyword argument:

pp.run('build::prod')              # incremental: skip up-to-date procs
pp.run('build::prod', full=True)   # full: force all procs to run

Opting out of auto-skip

By default, all procs are eligible for auto-skipping. For side-effectful tasks (deployments, notifications, etc.), use no_skip=True:

@pp.Proto(name='deploy::[env]', deps=['build::[env]'], no_skip=True)
def deploy(ctx: pp.ProcContext, env: str) -> None:
    ...  # always executes when scheduled, even if build was UP_TO_DATE

Inspecting dependency changes from within a proc

A running proc can check whether its dependencies actually produced new output:

@pp.Proto(name='test', deps=['build'], no_skip=True)
def test(ctx: pp.ProcContext) -> str:
    if not ctx.deps_changed:
        # All deps were UP_TO_DATE or SKIPPED -- nothing new to test
        ...
    print(f'Changed deps: {ctx.changed_deps}')
    ...

Resilience

The task DB is purely an optimization. It is safe to delete it at any time. When the DB is missing or empty, every proc runs normally and the cache is rebuilt from scratch.

Watch mode

Watch mode combines incremental builds with file system monitoring. Call run first to schedule work; watch waits for that in-flight graph to finish, then watches declared inputs and re-executes affected procs and their downstream dependents. For a full initial build (no incremental skip), use run(..., full=True) before watch()watch does not take a full argument so incremental behavior stays consistent with what run already set.

pp.set_options(watch=True, task_db_path='build/.parproc.db')
pp.run('build::prod')
pp.run('test::prod')
pp.watch()  # blocks until Ctrl+C

Or watch only specific procs:

pp.watch('build::prod', 'test::prod')

When explicit proc names are provided, watch mode also watches the declared inputs of their transitive dependencies (as resolved from the initial run graph). watch() raises UserError unless set_options(watch=True) was set before scheduling/running.

Watch mode uses the watchdog package to detect file changes (it is a core dependency of parproc). By default, watch mode waits for a 300ms quiet period (watch_debounce_seconds=0.3) before re-running, so save bursts are coalesced into one rebuild.

How watch mode handles re-runs

Each proc tracks a generation counter. When an input file changes:

  1. The affected proc and all its transitive dependents are marked dirty (generation incremented).
  2. Dirty procs are reset to IDLE and re-scheduled.
  3. If a proc is currently running when its inputs change, downstream procs wait for the next execution to complete, not the currently running (stale) one.

Contributing

Feel free to send me PRs

Setting up the development environment

  1. Install uv if you haven't already
  2. Clone the repository
  3. Initialize the development environment:
    uv sync --dev
    
    This will install all dependencies and development tools.

Running tests

Run all tests using the test script:

./scripts/test.sh

Or run specific test modules:

./scripts/test.sh tests.simple tests.proto

You can also run tests directly with uv:

uv run python -m unittest tests.simple tests.proto tests.errorformat

Running examples

Run any of the example scripts:

uv run python examples/success.py
uv run python examples/error.py
uv run python examples/failed_deps.py
uv run python examples/full_log_on_failure.py

Change Log

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

parproc-0.6.1.tar.gz (57.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

parproc-0.6.1-py3-none-any.whl (54.4 kB view details)

Uploaded Python 3

File details

Details for the file parproc-0.6.1.tar.gz.

File metadata

  • Download URL: parproc-0.6.1.tar.gz
  • Upload date:
  • Size: 57.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.10

File hashes

Hashes for parproc-0.6.1.tar.gz
Algorithm Hash digest
SHA256 bd792af73a33eef33e579329ed07847a0bfeb67cba61bbf085deacad2d363b28
MD5 7abcab03240837e5b99d0b5d454e8329
BLAKE2b-256 5c49b445c5db7fbb3ddeff1ab9e88c05cf6ddddb171a74fd6cb6ee890bfaaf53

See more details on using hashes here.

File details

Details for the file parproc-0.6.1-py3-none-any.whl.

File metadata

  • Download URL: parproc-0.6.1-py3-none-any.whl
  • Upload date:
  • Size: 54.4 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.10

File hashes

Hashes for parproc-0.6.1-py3-none-any.whl
Algorithm Hash digest
SHA256 2ab52a08c46f6cb3eed8844753fcdf244f780962fed402ff8bf9db20975796cb
MD5 d9c79ed09df3b6dafdbac92dba3f09d9
BLAKE2b-256 8241b3adc7e5a799ab1b5c696d38c0136117ecfcf3b6bb18f96df56d10fbf89c

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page