Skip to main content

Lightweight distributed task orchestrator with absolute process isolation and SQLite WAL persistence.

Project description

snap — Lightweight, Zero-Dependency Distributed Task Orchestrator

Executive Overview

snap is a lightweight, embedded, production-ready distributed task orchestrator built entirely on the Python Standard Library and SQLite. It requires zero external dependencies—no Redis, no RabbitMQ, no Celery, no message brokers. Designed for high-performance engineering teams who need reliable background task execution without infrastructure overhead, snap provides:

  • Process-level isolation via the multiprocessing spawn start method, protecting the orchestrator from user-code memory leaks, segmentation faults, and hard crashes.
  • Atomic task scheduling using SQLite Write-Ahead Logging (WAL) mode with BEGIN IMMEDIATE transactions to eliminate race conditions and double-claiming.
  • Directed Acyclic Graph (DAG) support with built-in topological cycle detection for complex dependency chains.
  • Automatic crash recovery with persistent worker heartbeats, orphaned task reclamation, and checkpoint/restart capabilities.

snap is ideal for microservices, data pipelines, CI/CD workflows, and any Python application requiring robust background processing without operational complexity.

Architectural & Technical Highlights

Zero-Dependency Infrastructure

snap is powered exclusively by the Python Standard Library and SQLite. There are no third-party packages, message brokers, or external services to install, configure, or maintain. The entire system runs in a single process with thread-safe SQLite connections and isolated worker subprocesses.

Absolute Memory & Process Isolation

Every task executes in its own dedicated OS process using the multiprocessing spawn start method (PEP 703 compatible). This guarantees:

  • Complete memory isolation between the orchestrator and user code.
  • Protection against segmentation faults and hard crashes in task functions.
  • Deterministic cleanup of resources via context managers (__enter__/__exit__).

Atomic Task Claiming with Write-Locking

snap eliminates race conditions through:

  • BEGIN IMMEDIATE transactions that acquire an exclusive write lock before claiming tasks.
  • Exponential backoff retry logic for lock contention scenarios.
  • Filtered claims that respect DAG dependency constraints within the same transaction.

Built-in Directed Acyclic Graph (DAG) Engine

The dependency system is mapped relationally in a dedicated snap_task_dependencies table with:

  • Native topological cycle detection via graphlib.TopologicalSorter before enqueueing.
  • Automatic filtering of blocked tasks during claiming (tasks wait until all parents complete).
  • Efficient indexing for child-parent lookups.

Fail-Safe Robustness

  • WAL mode allows concurrent reads and writes without blocking.
  • Persistent heartbeats from workers enable detection of stalled or dead processes.
  • Automatic reclamation of tasks assigned to dead workers, with retry count incrementation.
  • Complete error traces captured and persisted for failed tasks.
  • Progress checkpoints allow long-running tasks to resume from the last successful step.

Installation Guide

Prerequisites

  • Python 3.9 or higher
  • No third-party packages required

Installation

Install snap in editable mode for development, or directly for deployment:

# From the project root directory
pip install -e .

This installs the package and makes the snap-scheduler CLI entry point available. Verify installation:

python -c "import snap; print(snap.__version__)"

No additional configuration is needed. snap is ready to use immediately.

Quick Start & Practical Usage

Defining and Enqueueing Tasks

Create a separate module for your tasks to avoid Python's __main__ import guard issues:

tasks.py:

import time
from snap.interface.decorators import task

@task(max_retries=3, priority=10)
def send_email(recipient: str, subject: str, body: str):
    """Simulate sending an email."""
    print(f"Sending email to {recipient}...")
    time.sleep(2)  # Simulate work
    print(f"Email sent to {recipient}")
    return {"status": "sent", "recipient": recipient}

@task(max_retries=5, priority=5)
def process_image(image_path: str, format: str = "webp"):
    """Simulate image processing."""
    print(f"Processing {image_path} to {format}...")
    time.sleep(3)
    print(f"Processed {image_path}")
    return {"status": "processed", "path": image_path}

@task(max_retries=2, priority=1)
def generate_report(data_id: str):
    """Simulate report generation."""
    print(f"Generating report for {data_id}...")
    time.sleep(1)
    print(f"Report generated for {data_id}")
    return {"status": "generated", "report_id": data_id}

run.py:

from tasks import send_email, process_image, generate_report
from snap.orchestrator.engine import Engine

# Initialize the engine (this also sets the global DB path)
engine = Engine(db_path="/tmp/snap.db", max_workers=4)

# Enqueue tasks using the .delay() API
email_task_id = send_email.delay(
    "user@example.com",
    "Welcome!",
    "Thank you for signing up."
)

image_task_id = process_image.delay(
    "/images/photo.jpg",
    format="avif"
)

report_task_id = generate_report.delay("REPORT-2024-001")

print(f"Enqueued tasks:")
print(f"  Email task:  {email_task_id}")
print(f"  Image task:  {image_task_id}")
print(f"  Report task: {report_task_id}")

# Start the engine to process tasks
engine.start()

# Block until tasks complete
engine.wait_for_completion(email_task_id, timeout=60)
engine.wait_for_completion(image_task_id, timeout=60)
engine.wait_for_completion(report_task_id, timeout=60)

engine.stop()
print("All tasks completed successfully.")

Handling Task Dependencies (DAG)

Tasks can declare explicit dependencies on other tasks. The orchestrator ensures a task only runs after all its parent tasks have completed:

pipeline.py:

from tasks import process_image, generate_report
from snap.orchestrator.engine import Engine

engine = Engine(db_path="/tmp/snap.db", max_workers=2)
engine.start()

# Step 1: Enqueue image processing (no dependencies)
image_task = process_image.delay("/images/photo.jpg")

# Step 2: Enqueue report generation that depends on the image task
# This task will not execute until 'image_task' is COMPLETED
report_task = generate_report.delay(
    "REPORT-2024-001",
    depends_on=[image_task]  # <-- DAG dependency
)

print(f"Image task:  {image_task}")
print(f"Report task: {report_task} (waiting for image to complete)")

# Wait for both tasks
engine.wait_for_completion(report_task, timeout=120)
engine.stop()
print("Pipeline completed successfully.")

The orchestrator automatically:

  1. Validates the dependency graph for cycles before enqueueing.
  2. Marks tasks as READY only when all parent tasks are COMPLETED.
  3. Blocks claiming of tasks with uncompleted dependencies.

Using Progress Checkpoints

For long-running tasks, you can persist intermediate state to enable recovery from failures:

long_task.py:

import time
from snap.interface.decorators import task
from snap.worker.process import checkpoint

@task(max_retries=3, priority=5)
def process_large_dataset(dataset_id: str):
    """Process a large dataset with checkpoint support."""
    
    # Phase 1: Data loading
    print(f"[{dataset_id}] Loading data...")
    time.sleep(10)
    checkpoint("data_loaded", {
        "dataset_id": dataset_id,
        "loaded_rows": 1000000,
        "phase": "loading_complete"
    })
    
    # Phase 2: Transformation
    print(f"[{dataset_id}] Transforming data...")
    time.sleep(15)
    checkpoint("transformation_complete", {
        "dataset_id": dataset_id,
        "transformed_rows": 950000,
        "phase": "transformation_complete"
    })
    
    # Phase 3: Export
    print(f"[{dataset_id}] Exporting results...")
    time.sleep(5)
    
    return {"status": "complete", "dataset": dataset_id}

If the worker crashes during Phase 2, upon retry, the orchestrator will:

  1. Detect the checkpoint from Phase 1.
  2. Pass the checkpoint state as the first argument to the function.
  3. Resume from Phase 2, avoiding re-execution of Phase 1.

Running the Orchestrator (The Daemon Loop)

Starting the Scheduler

After installing snap, start the orchestrator daemon using the CLI entry point:

# Start the scheduler with default settings
snap-scheduler

# Or specify a custom database path
export SNAP_ACTIVE_DB_PATH=/custom/path/snap.db
snap-scheduler

What Happens Behind the Scenes

When snap-scheduler starts, it:

  1. Initializes the database schema (snap_tasks, snap_workers, snap_checkpoints, snap_task_dependencies tables) if not present.
  2. Sets the global database path via DBManager.set_db_path() so all @task.delay() calls resolve to the correct database.
  3. Begins an orchestration loop that:
    • Updates the engine's heartbeat to avoid self-reclamation.
    • Reclaims orphaned tasks assigned to dead workers.
    • Calculates available worker slots.
    • Atomically claims the next batch of ready tasks (with DAG dependency filtering).
    • Spawns isolated WorkerProcess instances for each claimed task.
  4. Monitors worker health via persistent heartbeat updates.
  5. Collects telemetry from workers via a unidirectional multiprocessing.Pipe.

The loop runs at 100Hz by default, providing high throughput while maintaining resource efficiency.

Graceful Shutdown

The scheduler handles SIGINT (Ctrl+C) and SIGTERM gracefully:

  1. Sets the stop flag to prevent new task claims.
  2. Terminates all active workers (graceful terminate() first, then kill() if needed).
  3. Joins the orchestration thread.
  4. Restores signal handlers and cleans up resources.

Core API Reference & Technical Specification

@task(max_retries=3, priority=0)

The primary decorator for converting a Python function into a snap task.

Parameter Type Default Description
max_retries int 3 Maximum number of automatic retries when a task fails.
priority int 0 Scheduling priority. Higher values are dequeued first by the orchestrator.

Behavior: The decorator wraps the function into a TaskDefinition object that preserves the original function's metadata. The task's fully qualified name (e.g., mymodule.myfunc) is automatically resolved.

Returns: A TaskDefinition instance with a .delay() method.


task.delay(*args, **kwargs)

Enqueue a task for asynchronous execution by the orchestrator.

Parameter Type Description
*args tuple Positional arguments passed to the task function.
**kwargs dict Keyword arguments passed to the task function. If only **kwargs are provided, they are automatically wrapped into a single positional argument for convenience.

Returns: A str UUID representing the task ID. This ID can be used for:

  • Waiting for completion via engine.wait_for_completion(task_id).
  • Defining DAG dependencies via depends_on=[task_id].

Side Effects:

  1. Sensitive data (keys matching patterns like password, secret, token) are automatically masked before serialization via SecurityEngine.
  2. Arguments and keyword arguments are pickled into binary BLOBs for SQLite storage.
  3. The database path is resolved dynamically at execution time from SNAP_ACTIVE_DB_PATH environment variable or the global path set by the Engine.

checkpoint(step_name, state_dict)

Persist intermediate computation state for a running task. Enables recovery from failures.

Parameter Type Description
step_name str A human-readable label for this checkpoint step (e.g., "data_loaded", "transformation_complete").
state_dict dict Arbitrary application state to persist. Must be picklable.

Raises: RuntimeError if called outside a snap worker process (i.e., SNAP_TASK_ID environment variable not set).

Behavior:

  1. Serializes the state dictionary with pickle.
  2. Opens a dedicated SQLite connection and performs INSERT OR REPLACE into snap_checkpoints.
  3. The checkpoint ID is constructed as {task_id}_{step_name}.

Recovery: When a failed task is retried, the worker loads the most recent checkpoint and passes it as the first positional argument to the task function. The task can then inspect this state to skip already-completed phases.


Engine(db_path, max_workers=10, loop_interval=0.01, heartbeat_timeout=10.0, auto_initialize=True)

The central orchestrator class.

Parameter Type Default Description
db_path str Required Path to the SQLite3 database file.
max_workers int 10 Maximum number of concurrent worker processes.
loop_interval float 0.01 Sleep interval (seconds) between orchestration cycles.
heartbeat_timeout float 10.0 Seconds without heartbeat before a worker is considered dead.
auto_initialize bool True If True, automatically creates the database schema on construction.

Key Methods:

Method Description
start() Start the orchestration loop and telemetry collector.
stop(timeout=5.0) Gracefully stop the engine, terminate workers, and clean up resources.
wait_for_completion(task_id, poll_interval=0.1, timeout=30.0) Block until a task reaches COMPLETED or FAILED status. Returns True if completed.
active_worker_count Property returning the current number of alive worker processes.

Database State Machine Lifecycle

Each task in snap progresses through a well-defined state machine:

                    ┌─────────────┐
                    │   PENDING   │
                    └──────┬──────┘
                           │
                    ┌──────▼──────┐
                    │   READY *   │ (DAG dependencies satisfied)
                    └──────┬──────┘
                           │
                    ┌──────▼──────┐
               ┌───▶│  RUNNING   │◀───┐
               │    └──────┬──────┘    │
               │           │          │
               │    ┌──────▼──────┐   │
               │    │ COMPLETED   │   │
               │    └─────────────┘   │
               │                      │
               │    ┌─────────────┐   │
               ├───▶│   FAILED    │   │
               │    └─────────────┘   │
               │                      │
               │    ┌─────────────┐   │
               └───▶│ RECOVERING  │───┘ (retries < max_retries)
                    └─────────────┘
                           │
                    ┌──────▼──────┐
                    │  ║ DEAD ║   │ (max retries exhausted)
                    └─────────────┘

State Descriptions

State Description
PENDING Task has been enqueued but is not yet eligible for execution. It may be waiting for DAG dependencies.
**READY *** A virtual state (not persisted) indicating the task has no uncompleted parent dependencies. Tasks in this state are eligible for claiming.
RUNNING Task has been claimed by a worker and is currently executing. The worker_id field is set.
COMPLETED Task executed successfully without exceptions. The result is not persisted (future feature).
FAILED Task raised an exception during execution. The error_trace field contains the full traceback.
RECOVERING A previously RUNNING task whose worker was detected as dead (heartbeat timeout). The task will be retried if retries < max_retries.
DEAD Task has exhausted its maximum number of retries. No further execution attempts will be made.

Transition Triggers

Transition Trigger
PENDING → RUNNING Worker claims the task (DAG dependencies must be satisfied)
RUNNING → COMPLETED Task function returns without raising an exception
RUNNING → FAILED Task function raises an exception
RUNNING → RECOVERING Orchestrator detects worker heartbeat timeout
RECOVERING → PENDING Orchestrator resets task to pending state
PENDING → FAILED Maximum retries exceeded (future enhancement)
RECOVERING → FAILED Maximum retries exceeded (future enhancement)

Heartbeat & Reclamation

snap's recovery mechanism operates as follows:

  1. Workers update their heartbeat in snap_workers every 500ms.
  2. The orchestrator's main loop checks the last_heartbeat of all workers.
  3. Any worker with last_heartbeat older than heartbeat_timeout is marked as DEAD.
  4. All RUNNING tasks assigned to that worker are reverted to RECOVERING with retries += 1.
  5. On the next cycle, these RECOVERING tasks become eligible for claiming by a new worker.
  6. If retries >= max_retries, the task transitions to FAILED.

snap is engineered for teams who demand reliability, performance, and simplicity. With zero dependencies, process-level isolation, and automatic crash recovery, it provides enterprise-grade task orchestration without the operational burden of traditional message brokers.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

snap_task_orchestrator-0.1.0.tar.gz (49.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

snap_task_orchestrator-0.1.0-py3-none-any.whl (38.0 kB view details)

Uploaded Python 3

File details

Details for the file snap_task_orchestrator-0.1.0.tar.gz.

File metadata

  • Download URL: snap_task_orchestrator-0.1.0.tar.gz
  • Upload date:
  • Size: 49.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.14.5

File hashes

Hashes for snap_task_orchestrator-0.1.0.tar.gz
Algorithm Hash digest
SHA256 87d4058e2b6e389fec43d041957271eac14ed420f7759e01f3a8c61c98df7e39
MD5 ef952a62a7035b80ba96a395f0255f47
BLAKE2b-256 8158295d047b3b045b7439774229fce22ccd9ade3d96a5515fb5794aa82011aa

See more details on using hashes here.

File details

Details for the file snap_task_orchestrator-0.1.0-py3-none-any.whl.

File metadata

File hashes

Hashes for snap_task_orchestrator-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 756dff6ff50c8b37bd840aef331ee14951b4b2183cfc1bd2672b4fe3408c40d6
MD5 149d19a1b4f4362afcb9f15076bf51aa
BLAKE2b-256 fad441856f6c676df6f0289c0464c2714677c4be6b1b23eeef8838df2ff8580b

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page