Lightweight distributed task orchestrator with absolute process isolation and SQLite WAL persistence.
Project description
snap — Lightweight, Zero-Dependency Distributed Task Orchestrator
Executive Overview
snap is a lightweight, embedded, production-ready distributed task orchestrator built entirely on the Python Standard Library and SQLite. It requires zero external dependencies—no Redis, no RabbitMQ, no Celery, no message brokers. Designed for high-performance engineering teams who need reliable background task execution without infrastructure overhead, snap provides:
- Process-level isolation via the
multiprocessingspawnstart method, protecting the orchestrator from user-code memory leaks, segmentation faults, and hard crashes. - Atomic task scheduling using SQLite Write-Ahead Logging (WAL) mode with
BEGIN IMMEDIATEtransactions to eliminate race conditions and double-claiming. - Directed Acyclic Graph (DAG) support with built-in topological cycle detection for complex dependency chains.
- Automatic crash recovery with persistent worker heartbeats, orphaned task reclamation, and checkpoint/restart capabilities.
snap is ideal for microservices, data pipelines, CI/CD workflows, and any Python application requiring robust background processing without operational complexity.
Architectural & Technical Highlights
Zero-Dependency Infrastructure
snap is powered exclusively by the Python Standard Library and SQLite. There are no third-party packages, message brokers, or external services to install, configure, or maintain. The entire system runs in a single process with thread-safe SQLite connections and isolated worker subprocesses.
Absolute Memory & Process Isolation
Every task executes in its own dedicated OS process using the multiprocessing spawn start method (PEP 703 compatible). This guarantees:
- Complete memory isolation between the orchestrator and user code.
- Protection against segmentation faults and hard crashes in task functions.
- Deterministic cleanup of resources via context managers (
__enter__/__exit__).
Atomic Task Claiming with Write-Locking
snap eliminates race conditions through:
BEGIN IMMEDIATEtransactions that acquire an exclusive write lock before claiming tasks.- Exponential backoff retry logic for lock contention scenarios.
- Filtered claims that respect DAG dependency constraints within the same transaction.
Built-in Directed Acyclic Graph (DAG) Engine
The dependency system is mapped relationally in a dedicated snap_task_dependencies table with:
- Native topological cycle detection via
graphlib.TopologicalSorterbefore enqueueing. - Automatic filtering of blocked tasks during claiming (tasks wait until all parents complete).
- Efficient indexing for child-parent lookups.
Fail-Safe Robustness
- WAL mode allows concurrent reads and writes without blocking.
- Persistent heartbeats from workers enable detection of stalled or dead processes.
- Automatic reclamation of tasks assigned to dead workers, with retry count incrementation.
- Complete error traces captured and persisted for failed tasks.
- Progress checkpoints allow long-running tasks to resume from the last successful step.
Installation Guide
Prerequisites
- Python 3.9 or higher
- No third-party packages required
Installation
Install snap in editable mode for development, or directly for deployment:
# From the project root directory
pip install -e .
This installs the package and makes the snap-scheduler CLI entry point available. Verify installation:
python -c "import snap; print(snap.__version__)"
No additional configuration is needed. snap is ready to use immediately.
Quick Start & Practical Usage
Defining and Enqueueing Tasks
Create a separate module for your tasks to avoid Python's __main__ import guard issues:
tasks.py:
import time
from snap.interface.decorators import task
@task(max_retries=3, priority=10)
def send_email(recipient: str, subject: str, body: str):
"""Simulate sending an email."""
print(f"Sending email to {recipient}...")
time.sleep(2) # Simulate work
print(f"Email sent to {recipient}")
return {"status": "sent", "recipient": recipient}
@task(max_retries=5, priority=5)
def process_image(image_path: str, format: str = "webp"):
"""Simulate image processing."""
print(f"Processing {image_path} to {format}...")
time.sleep(3)
print(f"Processed {image_path}")
return {"status": "processed", "path": image_path}
@task(max_retries=2, priority=1)
def generate_report(data_id: str):
"""Simulate report generation."""
print(f"Generating report for {data_id}...")
time.sleep(1)
print(f"Report generated for {data_id}")
return {"status": "generated", "report_id": data_id}
run.py:
from tasks import send_email, process_image, generate_report
from snap.orchestrator.engine import Engine
# Initialize the engine (this also sets the global DB path)
engine = Engine(db_path="/tmp/snap.db", max_workers=4)
# Enqueue tasks using the .delay() API
email_task_id = send_email.delay(
"user@example.com",
"Welcome!",
"Thank you for signing up."
)
image_task_id = process_image.delay(
"/images/photo.jpg",
format="avif"
)
report_task_id = generate_report.delay("REPORT-2024-001")
print(f"Enqueued tasks:")
print(f" Email task: {email_task_id}")
print(f" Image task: {image_task_id}")
print(f" Report task: {report_task_id}")
# Start the engine to process tasks
engine.start()
# Block until tasks complete
engine.wait_for_completion(email_task_id, timeout=60)
engine.wait_for_completion(image_task_id, timeout=60)
engine.wait_for_completion(report_task_id, timeout=60)
engine.stop()
print("All tasks completed successfully.")
Handling Task Dependencies (DAG)
Tasks can declare explicit dependencies on other tasks. The orchestrator ensures a task only runs after all its parent tasks have completed:
pipeline.py:
from tasks import process_image, generate_report
from snap.orchestrator.engine import Engine
engine = Engine(db_path="/tmp/snap.db", max_workers=2)
engine.start()
# Step 1: Enqueue image processing (no dependencies)
image_task = process_image.delay("/images/photo.jpg")
# Step 2: Enqueue report generation that depends on the image task
# This task will not execute until 'image_task' is COMPLETED
report_task = generate_report.delay(
"REPORT-2024-001",
depends_on=[image_task] # <-- DAG dependency
)
print(f"Image task: {image_task}")
print(f"Report task: {report_task} (waiting for image to complete)")
# Wait for both tasks
engine.wait_for_completion(report_task, timeout=120)
engine.stop()
print("Pipeline completed successfully.")
The orchestrator automatically:
- Validates the dependency graph for cycles before enqueueing.
- Marks tasks as
READYonly when all parent tasks areCOMPLETED. - Blocks claiming of tasks with uncompleted dependencies.
Using Progress Checkpoints
For long-running tasks, you can persist intermediate state to enable recovery from failures:
long_task.py:
import time
from snap.interface.decorators import task
from snap.worker.process import checkpoint
@task(max_retries=3, priority=5)
def process_large_dataset(dataset_id: str):
"""Process a large dataset with checkpoint support."""
# Phase 1: Data loading
print(f"[{dataset_id}] Loading data...")
time.sleep(10)
checkpoint("data_loaded", {
"dataset_id": dataset_id,
"loaded_rows": 1000000,
"phase": "loading_complete"
})
# Phase 2: Transformation
print(f"[{dataset_id}] Transforming data...")
time.sleep(15)
checkpoint("transformation_complete", {
"dataset_id": dataset_id,
"transformed_rows": 950000,
"phase": "transformation_complete"
})
# Phase 3: Export
print(f"[{dataset_id}] Exporting results...")
time.sleep(5)
return {"status": "complete", "dataset": dataset_id}
If the worker crashes during Phase 2, upon retry, the orchestrator will:
- Detect the checkpoint from Phase 1.
- Pass the checkpoint state as the first argument to the function.
- Resume from Phase 2, avoiding re-execution of Phase 1.
Running the Orchestrator (The Daemon Loop)
Starting the Scheduler
After installing snap, start the orchestrator daemon using the CLI entry point:
# Start the scheduler with default settings
snap-scheduler
# Or specify a custom database path
export SNAP_ACTIVE_DB_PATH=/custom/path/snap.db
snap-scheduler
What Happens Behind the Scenes
When snap-scheduler starts, it:
- Initializes the database schema (
snap_tasks,snap_workers,snap_checkpoints,snap_task_dependenciestables) if not present. - Sets the global database path via
DBManager.set_db_path()so all@task.delay()calls resolve to the correct database. - Begins an orchestration loop that:
- Updates the engine's heartbeat to avoid self-reclamation.
- Reclaims orphaned tasks assigned to dead workers.
- Calculates available worker slots.
- Atomically claims the next batch of ready tasks (with DAG dependency filtering).
- Spawns isolated
WorkerProcessinstances for each claimed task.
- Monitors worker health via persistent heartbeat updates.
- Collects telemetry from workers via a unidirectional
multiprocessing.Pipe.
The loop runs at 100Hz by default, providing high throughput while maintaining resource efficiency.
Graceful Shutdown
The scheduler handles SIGINT (Ctrl+C) and SIGTERM gracefully:
- Sets the stop flag to prevent new task claims.
- Terminates all active workers (graceful
terminate()first, thenkill()if needed). - Joins the orchestration thread.
- Restores signal handlers and cleans up resources.
Core API Reference & Technical Specification
@task(max_retries=3, priority=0)
The primary decorator for converting a Python function into a snap task.
| Parameter | Type | Default | Description |
|---|---|---|---|
max_retries |
int |
3 |
Maximum number of automatic retries when a task fails. |
priority |
int |
0 |
Scheduling priority. Higher values are dequeued first by the orchestrator. |
Behavior: The decorator wraps the function into a TaskDefinition object that preserves the original function's metadata. The task's fully qualified name (e.g., mymodule.myfunc) is automatically resolved.
Returns: A TaskDefinition instance with a .delay() method.
task.delay(*args, **kwargs)
Enqueue a task for asynchronous execution by the orchestrator.
| Parameter | Type | Description |
|---|---|---|
*args |
tuple |
Positional arguments passed to the task function. |
**kwargs |
dict |
Keyword arguments passed to the task function. If only **kwargs are provided, they are automatically wrapped into a single positional argument for convenience. |
Returns: A str UUID representing the task ID. This ID can be used for:
- Waiting for completion via
engine.wait_for_completion(task_id). - Defining DAG dependencies via
depends_on=[task_id].
Side Effects:
- Sensitive data (keys matching patterns like
password,secret,token) are automatically masked before serialization viaSecurityEngine. - Arguments and keyword arguments are pickled into binary BLOBs for SQLite storage.
- The database path is resolved dynamically at execution time from
SNAP_ACTIVE_DB_PATHenvironment variable or the global path set by the Engine.
checkpoint(step_name, state_dict)
Persist intermediate computation state for a running task. Enables recovery from failures.
| Parameter | Type | Description |
|---|---|---|
step_name |
str |
A human-readable label for this checkpoint step (e.g., "data_loaded", "transformation_complete"). |
state_dict |
dict |
Arbitrary application state to persist. Must be picklable. |
Raises: RuntimeError if called outside a snap worker process (i.e., SNAP_TASK_ID environment variable not set).
Behavior:
- Serializes the state dictionary with
pickle. - Opens a dedicated SQLite connection and performs
INSERT OR REPLACEintosnap_checkpoints. - The checkpoint ID is constructed as
{task_id}_{step_name}.
Recovery: When a failed task is retried, the worker loads the most recent checkpoint and passes it as the first positional argument to the task function. The task can then inspect this state to skip already-completed phases.
Engine(db_path, max_workers=10, loop_interval=0.01, heartbeat_timeout=10.0, auto_initialize=True)
The central orchestrator class.
| Parameter | Type | Default | Description |
|---|---|---|---|
db_path |
str |
Required | Path to the SQLite3 database file. |
max_workers |
int |
10 |
Maximum number of concurrent worker processes. |
loop_interval |
float |
0.01 |
Sleep interval (seconds) between orchestration cycles. |
heartbeat_timeout |
float |
10.0 |
Seconds without heartbeat before a worker is considered dead. |
auto_initialize |
bool |
True |
If True, automatically creates the database schema on construction. |
Key Methods:
| Method | Description |
|---|---|
start() |
Start the orchestration loop and telemetry collector. |
stop(timeout=5.0) |
Gracefully stop the engine, terminate workers, and clean up resources. |
wait_for_completion(task_id, poll_interval=0.1, timeout=30.0) |
Block until a task reaches COMPLETED or FAILED status. Returns True if completed. |
active_worker_count |
Property returning the current number of alive worker processes. |
Database State Machine Lifecycle
Each task in snap progresses through a well-defined state machine:
┌─────────────┐
│ PENDING │
└──────┬──────┘
│
┌──────▼──────┐
│ READY * │ (DAG dependencies satisfied)
└──────┬──────┘
│
┌──────▼──────┐
┌───▶│ RUNNING │◀───┐
│ └──────┬──────┘ │
│ │ │
│ ┌──────▼──────┐ │
│ │ COMPLETED │ │
│ └─────────────┘ │
│ │
│ ┌─────────────┐ │
├───▶│ FAILED │ │
│ └─────────────┘ │
│ │
│ ┌─────────────┐ │
└───▶│ RECOVERING │───┘ (retries < max_retries)
└─────────────┘
│
┌──────▼──────┐
│ ║ DEAD ║ │ (max retries exhausted)
└─────────────┘
State Descriptions
| State | Description |
|---|---|
| PENDING | Task has been enqueued but is not yet eligible for execution. It may be waiting for DAG dependencies. |
| **READY *** | A virtual state (not persisted) indicating the task has no uncompleted parent dependencies. Tasks in this state are eligible for claiming. |
| RUNNING | Task has been claimed by a worker and is currently executing. The worker_id field is set. |
| COMPLETED | Task executed successfully without exceptions. The result is not persisted (future feature). |
| FAILED | Task raised an exception during execution. The error_trace field contains the full traceback. |
| RECOVERING | A previously RUNNING task whose worker was detected as dead (heartbeat timeout). The task will be retried if retries < max_retries. |
| DEAD | Task has exhausted its maximum number of retries. No further execution attempts will be made. |
Transition Triggers
| Transition | Trigger |
|---|---|
| PENDING → RUNNING | Worker claims the task (DAG dependencies must be satisfied) |
| RUNNING → COMPLETED | Task function returns without raising an exception |
| RUNNING → FAILED | Task function raises an exception |
| RUNNING → RECOVERING | Orchestrator detects worker heartbeat timeout |
| RECOVERING → PENDING | Orchestrator resets task to pending state |
| PENDING → FAILED | Maximum retries exceeded (future enhancement) |
| RECOVERING → FAILED | Maximum retries exceeded (future enhancement) |
Heartbeat & Reclamation
snap's recovery mechanism operates as follows:
- Workers update their heartbeat in
snap_workersevery 500ms. - The orchestrator's main loop checks the
last_heartbeatof all workers. - Any worker with
last_heartbeatolder thanheartbeat_timeoutis marked asDEAD. - All
RUNNINGtasks assigned to that worker are reverted toRECOVERINGwithretries += 1. - On the next cycle, these
RECOVERINGtasks become eligible for claiming by a new worker. - If
retries >= max_retries, the task transitions toFAILED.
snap is engineered for teams who demand reliability, performance, and simplicity. With zero dependencies, process-level isolation, and automatic crash recovery, it provides enterprise-grade task orchestration without the operational burden of traditional message brokers.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file snap_task_orchestrator-0.1.0.tar.gz.
File metadata
- Download URL: snap_task_orchestrator-0.1.0.tar.gz
- Upload date:
- Size: 49.9 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.14.5
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
87d4058e2b6e389fec43d041957271eac14ed420f7759e01f3a8c61c98df7e39
|
|
| MD5 |
ef952a62a7035b80ba96a395f0255f47
|
|
| BLAKE2b-256 |
8158295d047b3b045b7439774229fce22ccd9ade3d96a5515fb5794aa82011aa
|
File details
Details for the file snap_task_orchestrator-0.1.0-py3-none-any.whl.
File metadata
- Download URL: snap_task_orchestrator-0.1.0-py3-none-any.whl
- Upload date:
- Size: 38.0 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.14.5
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
756dff6ff50c8b37bd840aef331ee14951b4b2183cfc1bd2672b4fe3408c40d6
|
|
| MD5 |
149d19a1b4f4362afcb9f15076bf51aa
|
|
| BLAKE2b-256 |
fad441856f6c676df6f0289c0464c2714677c4be6b1b23eeef8838df2ff8580b
|