Skip to main content

A full workflow engine for async/parallelized Python tasks.

Project description

pyWorkflowy

A full workflow engine for async/parallelized Python tasks. Tasks, DAGs, retries, timeouts, three execution backends, persistence/resume, and a cron-like scheduler — all in one library with zero runtime dependencies.

Install

pip install pyworkflowy

or

uv add pyworkflowy

Core has no runtime dependencies. Process-pool support, threading, and asyncio are all stdlib.

Tasks

Decorator form

@task is the simplest entry point. Bare or parameterised, exactly like @hook in pyHooky:

from pyworkflowy import task, TaskRunner

@task
def square(x: int) -> int:
    return x * x

@task(name="checkout", retries=3, timeout=10.0, backend="thread")
def checkout(cart_id: int) -> dict:
    ...

The wrapped object is a Task instance. Use .submit(...) to enqueue it on the active runner; calling it directly bypasses the runner (useful in unit tests):

with TaskRunner() as runner:
    handle = square.submit(5)
    runner.run()
    assert handle.result() == 25

# direct call — no runner involved
assert square(5) == 25

The task name is auto-derived from module.qualname if you don't pass one. Lambdas and other anonymous functions get an id()-suffixed name so multiple ones in the same scope don't collide.

Class form

When configuration-as-class reads better than configuration-as-kwargs, subclass TaskBase:

from pyworkflowy import TaskBase

class FetchUser(TaskBase):
    name = "fetch-user"
    backend = "thread"
    retries = 2
    timeout = 5.0

    def run(self, user_id: int) -> dict[str, Any]:
        return http.get(f"/users/{user_id}").json()

fetch_user = FetchUser()
handle = fetch_user.submit(42)

Instantiating FetchUser() returns a Task — the same type the decorator produces. run may be def or async def; pyWorkflowy auto-detects.

TaskBase constructors take no arguments — class-level attributes configure the task; runtime values are passed to submit(*args, **kwargs) and forwarded to run.

max_attempts vs retries

max_attempts=3 means "up to 3 attempts including the first" — sugar for retries=2. Pass whichever framing reads better; passing both raises ValueError.

Backends

Backend Constraint Cancellation
asyncio sync or async tasks cooperative (CancelledError)
thread sync tasks only cooperative (cancel flag)
process sync tasks; top-level/picklable functions best-effort (future.cancel)
@task(backend="thread")
def cpu_bound(x): ...

@task(backend="process")
def heavy(x): ...

The runner's backend= is the default for tasks that don't override it. The asyncio loop always orchestrates — runner.run() is just asyncio.run(runner.arun()) — so picking thread/process as the runner default just changes the default for plain-callable submissions.

Async tasks on non-asyncio backends are rejected at decoration time. Async needs the event loop; the thread/process pools can't run coroutines without one.

DAG / dependencies

Pass depends_on=[other_handle, ...] when submitting. The runner topologically orders execution and gates each task on its deps reaching COMPLETED:

with TaskRunner() as runner:
    h_load = load_csv.submit("data.csv")
    h_clean = clean_rows.submit(depends_on=[h_load])
    h_write = write_db.submit(depends_on=[h_clean])
    runner.run()

Cycles are detected eagerly: runner.submit(t, depends_on=[h]) raises CycleError immediately if adding the edge would close a cycle.

On-dependency-failure policies

Per-task — set via @task(on_dep_failure=...):

Policy Behaviour when a dep ends non-COMPLETED
"fail" This task is marked FAILED with a DependencyFailedError. (default)
"skip" This task is marked SKIPPED; downstream sees the skip too.
"run-anyway" Task runs as if the dep had succeeded. Args you passed are used as-is.

Dependencies don't auto-thread their return values into the dependent task's args. If task B needs A's output, look it up after submit via h_a.result() inside B's body, or pass the value through your own closure.

Retries / timeouts / cancellation

@task(retries=3, backoff="exponential", backoff_base=1.0, backoff_max=30.0,
      retry_on=(TransientError,), timeout=15.0)
def fetch(url): ...
Knob Effect
retries=N Up to N additional attempts after the initial one (max_attempts=N+1).
retry_on Exception class or tuple. Only matches are retried; others fail immediately. Default: Exception.
backoff "none", "linear", or "exponential". Delay is base * attempt (linear) or base * 2^(attempt-1) (exponential), capped at backoff_max.
timeout Seconds per attempt. Exceeding raises TaskTimeoutErrornot retried (timeouts are terminal).

Cancellation is per-handle: handle.cancel() requests stop. Cooperative for asyncio (CancelledError at the next await), cooperative for threads (your body must check current_task().cancel_event), best-effort for processes (the future is cancelled if not yet scheduled).

from pyworkflowy import current_task

@task(backend="thread")
def long_loop(n):
    ctx = current_task()
    for i in range(n):
        if ctx.cancel_event.is_set():
            return "stopped early"
        do_chunk(i)

runner.cancel_all() sets the cancel flag on every non-terminal handle.

Runner

runner = TaskRunner(
    max_workers=8,
    backend="asyncio",           # default for tasks that don't specify
    on_task_error="raise",       # "raise" | "log" | "continue"
    checkpoint_path="state.json",
    checkpoint_interval=5.0,
)

on_task_error chooses how task failures propagate out of run():

Value Behaviour
"raise" The first failing task's exception aborts the runner. (default)
"log" Failures logged via logging.getLogger("pyworkflowy"); run continues.
"continue" Failures stored on handles; no log, no raise.

Use as a context manager so the executor pools are torn down cleanly:

with TaskRunner() as runner:
    ...
    runner.run()

with TaskRunner(...) also binds the runner to a contextvar, so task.submit(...) finds it without explicit runner=. Outside the with, pass runner= or call runner.submit(task, ...) directly.

Persistence / resume

Tell the runner where to write its state:

with TaskRunner(checkpoint_path="state.json", checkpoint_interval=5.0) as runner:
    ...
    runner.run()

The default JSONCheckpointer writes after each task completion (rate-limited by checkpoint_interval). To resume after a crash, call TaskRunner.resume:

runner = TaskRunner.resume("state.json")
# Re-submit the same tasks; previously-completed ones get their results
# primed and won't re-execute.

Resume uses each handle's persisted ID — so the runner needs to be re-built with the same submission order (or you have to inject IDs manually for now). Already-completed handles are primed with the persisted result on submit.

Backend Trade-off
JSONCheckpointer Default. Args/return values must JSON-serialise. Validated at submit.
PickleCheckpointer Anything pickle accepts; standard pickle caveats apply.
custom Subclass Checkpointer with save()/load().

Unserialisable args raise CheckpointError at submit time so you see the error where it originated.

Scheduling

from pyworkflowy import TaskRunner, task
from pyworkflowy.schedule import Scheduler

@task
def cleanup():
    ...

with TaskRunner() as runner:
    sched = Scheduler(runner=runner)
    sched.every(60).do(cleanup)
    sched.cron("0 * * * *").do(cleanup)        # top of every hour
    sched.at(datetime(2026, 12, 31, 23, 59)).do(cleanup)  # one-shot

    sched.start()    # background thread
    # ... do other work ...
    sched.stop()

Or async, on your own loop:

sched = Scheduler(runner=runner)
sched.every(0.5).do(cleanup)
await sched.arun()

For tests, sched.tick() fires every due job once and returns the resulting handles — useful with a fake clock.

Cron subset

m h dom mon dow — five fields, space-separated:

Syntax Example Meaning
* * every value in range
N 5 exactly 5
a-b 9-17 inclusive range
a,b,c 0,15,30 list
*/N */15 every N from the start of range
a-b/N 9-17/2 every N within range

day_of_week uses cron-style 0=Sunday. No seconds field, no @hourly/@daily aliases, no L/W modifiers. Missed fires while the scheduler is stopped are not backfilled.

Async

Async tasks need the asyncio backend. The runner orchestrates on asyncio either way — run() just wraps asyncio.run(arun()).

@task
async def fetch(url):
    async with httpx.AsyncClient() as c:
        return (await c.get(url)).json()

runner = TaskRunner()
h = runner.submit(fetch, "https://example.com")
await runner.arun()
print(await h)   # handles are awaitable
runner.shutdown()

Handles are awaitable; awaiting yields the value or raises the failure. Sync handles also work — handle.result(timeout=...) blocks.

Errors

Exception Raised when
TaskError Base class. Catch this to swallow any pyWorkflowy-raised error.
TaskTimeoutError A task exceeds its timeout= budget. Not retried.
TaskCancelledError A task was cancelled via handle.cancel() / runner.cancel_all().
CycleError A submission would close a cycle in the dependency graph.
DependencyFailedError A task with on_dep_failure="fail" had a failed dep.
RetryExhaustedError All attempts failed; wraps the last exception via __cause__.
CheckpointError Serialisation or I/O failed in a Checkpointer.

Inside a task body, you can read current_task() for the current TaskContext (name, attempt number, cancel event).

Threads / Multiprocessing notes

  • The thread pool is created lazily on first use, shut down when the runner is shut down. current_task() does work in the thread backend because pyWorkflowy binds the contextvar on entry.
  • The process backend's workers run in separate interpreters — module-level state is reimported, current_task() returns None, and the function reference is serialised via pickle. Top-level functions only; no lambdas, no nested defs.
  • On Windows (and on Python 3.14+ generally), the default start method is spawn — the same caveat: every worker re-imports your code.

Development

uv sync
uv run pytest
uv run ruff check .
uv run pyrefly check

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pyworkflowy-0.3.0.tar.gz (40.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

pyworkflowy-0.3.0-py3-none-any.whl (46.9 kB view details)

Uploaded Python 3

File details

Details for the file pyworkflowy-0.3.0.tar.gz.

File metadata

  • Download URL: pyworkflowy-0.3.0.tar.gz
  • Upload date:
  • Size: 40.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for pyworkflowy-0.3.0.tar.gz
Algorithm Hash digest
SHA256 894c692406a7d0b71f00861dd6cc9b7465e02c274d80e613cc065439e71e97f8
MD5 76466c81ae7642457aae83c5a94bb636
BLAKE2b-256 4ac63e2ee6305e44d78bde7b0d28f0d90a0c6058e92ad6d725dd8854a033d312

See more details on using hashes here.

Provenance

The following attestation bundles were made for pyworkflowy-0.3.0.tar.gz:

Publisher: release.yml on KilianSen/pyWorkflowy

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file pyworkflowy-0.3.0-py3-none-any.whl.

File metadata

  • Download URL: pyworkflowy-0.3.0-py3-none-any.whl
  • Upload date:
  • Size: 46.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for pyworkflowy-0.3.0-py3-none-any.whl
Algorithm Hash digest
SHA256 db5a396e1967a0d57e3dad012da983f2f3ac3a669551299328d8336a08e70d47
MD5 dfcad154acedd30252f4796c35dc2032
BLAKE2b-256 3b2d84e77f474ce4bdd95e2811ca335bbf72d2601da4d7c3826e22d57887a09d

See more details on using hashes here.

Provenance

The following attestation bundles were made for pyworkflowy-0.3.0-py3-none-any.whl:

Publisher: release.yml on KilianSen/pyWorkflowy

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page