Simple Sync Manager
Project description
JobSync - Distributed Task Coordination
A Python library for coordinating distributed task processing across multiple worker nodes with automatic load balancing, failover, and zero-downtime deployments.
What It Does
JobSync coordinates multiple worker processes so each task is processed exactly once, even when workers come and go. It uses PostgreSQL for state management and provides:
- Automatic load balancing - Tasks distributed evenly across all active workers
- Zero-downtime deployments - Add/remove workers without stopping processing
- Automatic failover - When workers die, their tasks are automatically reassigned
- Task pinning - Lock specific tasks to specific workers (e.g., GPU tasks to GPU nodes)
- Health monitoring - Built-in health checks for load balancers
Quick Start
Installation
pip install jobsync
Basic Example
from jobsync import Job, Task, CoordinationConfig
# Configure coordination
coord_config = CoordinationConfig(
host='localhost',
dbname='jobsync',
user='postgres',
password='postgres',
appname='myapp_' # Prefix for database tables
)
# Each worker runs this code
with Job('worker-01', coordination_config=coord_config) as job:
for item_id in get_pending_items():
task = Task(item_id)
# Only process if this task belongs to this worker
if job.can_claim_task(task):
job.add_task(task)
process_item(item_id)
Long-Running Task Example
from jobsync import Job, CoordinationConfig
# For subscriptions, WebSockets, or continuous processing
def on_rebalance():
"""Called when cluster membership changes.
Re-evaluate which tasks this node should process and update
long-running connections or subscriptions accordingly.
"""
current_tokens = job.my_tokens
# Update your subscriptions/connections based on token ownership
logger.info(f'Rebalance: now own {len(current_tokens)} tokens')
coord_config = CoordinationConfig(
host='localhost',
dbname='jobsync',
user='postgres',
password='postgres',
appname='myapp_'
)
with Job('worker-01', coordination_config=coord_config,
on_rebalance=on_rebalance) as job:
# Callback notifies you of cluster changes
while not shutdown:
time.sleep(1)
Run multiple workers - they automatically coordinate:
# Terminal 1
python worker.py --node-name worker-01
# Terminal 2
python worker.py --node-name worker-02
# Terminal 3
python worker.py --node-name worker-03
Each worker processes different tasks - no duplicate work.
How It Works
Token-Based Distribution: JobSync divides a pool of 10,000 tokens evenly across all active workers. Each task hashes consistently to exactly one token, so each worker knows which tasks it owns.
Leader-Based Coordination: The oldest worker becomes the leader and monitors cluster health. When workers join or leave, the leader redistributes tokens to maintain balance.
Consistent Hashing: Tasks always hash to the same token, so the same task always goes to the same worker (until the cluster changes).
Lifecycle:
- Workers register and send heartbeat every 5 seconds
- Leader is elected (oldest worker by registration time)
- Leader distributes tokens evenly across all workers
- Workers claim only tasks that hash to their tokens
- Leader monitors for dead workers and triggers rebalancing
- When leader dies, next oldest worker becomes leader
Configuration
Configure database connection and coordination settings using CoordinationConfig:
from jobsync import CoordinationConfig
coord_config = CoordinationConfig(
host='localhost',
port=5432,
dbname='jobsync',
user='postgres',
password='postgres',
appname='myapp_', # Prefix for database tables (e.g., myapp_node, myapp_token)
total_tokens=10000, # Optional: customize token pool size
hash_function='double_sha256' # Optional: 'md5', 'sha256', or 'double_sha256' (default, recommended)
)
See Usage Guide for detailed configuration options and Operator Guide for tuning parameters.
Usage Examples
Basic Task Processing
from jobsync import Job, Task, CoordinationConfig
coord_config = CoordinationConfig(
host='localhost',
dbname='jobsync',
user='postgres',
password='postgres',
appname='myapp_'
)
with Job('worker-01', coordination_config=coord_config) as job:
for item_id in get_pending_items():
task = Task(item_id)
if job.can_claim_task(task):
job.add_task(task)
process_item(item_id)
job.write_audit()
Task Pinning (Lock GPU tasks to GPU workers)
from jobsync import Job, Task, CoordinationConfig
def register_gpu_locks(job):
gpu_tasks = get_gpu_task_ids()
locks = [(task_id, '%gpu%', 'requires_gpu') for task_id in gpu_tasks]
job.register_locks_bulk(locks)
coord_config = CoordinationConfig(
host='localhost',
dbname='jobsync',
user='postgres',
password='postgres',
appname='myapp_'
)
with Job('worker-gpu-01', coordination_config=coord_config,
lock_provider=register_gpu_locks) as job:
for task_id in get_all_tasks():
if job.can_claim_task(Task(task_id)):
process_gpu_task(task_id)
See Usage Guide for complete examples including WebSocket subscriptions, Kafka consumers, ETL pipelines, and more.
Key Features
Automatic Load Balancing and Failover
- Token-based distribution - 10,000 tokens evenly distributed across all active workers
- Consistent hashing - Each task always hashes to the same token
- Automatic rebalancing - When workers join/leave, tokens are redistributed with minimal movement
- Leader-based coordination - Oldest worker manages cluster state and triggers rebalancing
- Zero-downtime deployments - Add new workers, wait for tokens, then stop old workers
Task Locking and Pinning
Lock specific tasks to specific workers using SQL LIKE patterns with ordered fallback support. Locks are registered by task_id and stored in the database. During distribution, each task_id is hashed to a token_id, and lock patterns determine which node receives that token.
from jobsync import Job, CoordinationConfig
def register_locks(job):
gpu_tasks = get_gpu_task_ids()
# Ordered fallback: try primary GPU first, then any GPU node
locks = [
(task_id, ['gpu-primary-01', '%-gpu'], 'requires_gpu')
for task_id in gpu_tasks
]
job.register_locks_bulk(locks)
coord_config = CoordinationConfig(
host='localhost',
dbname='jobsync',
user='postgres',
password='postgres',
appname='myapp_'
)
with Job('worker-gpu-01', coordination_config=coord_config,
lock_provider=register_locks,
clear_existing_locks=True) as job:
process_tasks(job)
Use cases:
- Pin GPU tasks to GPU-enabled workers
- Route high-memory tasks to large-memory nodes
- Ensure data locality (tasks process data in same region)
- Resource-aware scheduling
See Usage Guide for complete lock API including pattern matching, fallback chains, expiration, and lifecycle management.
Health Monitoring and Custom Configuration
Health checks for load balancers:
from flask import Flask, jsonify
from jobsync import Job, CoordinationConfig
@app.route('/health/ready')
def health():
if job.am_i_healthy():
return jsonify({'status': 'ready'}), 200
return jsonify({'status': 'not_ready'}), 503
Fine-tune coordination:
coord_config = CoordinationConfig(
host='localhost',
dbname='jobsync',
user='postgres',
password='postgres',
appname='myapp_',
total_tokens=50000, # More tokens for large clusters
minimum_nodes=2, # Wait for 2 nodes before distribution
heartbeat_interval_sec=3, # Faster heartbeat
heartbeat_timeout_sec=9, # Quicker failure detection
rebalance_check_interval_sec=15 # More responsive rebalancing
)
See Usage Guide and Operator Guide for tuning recommendations.
Documentation
📚 Documentation
Usage Guide - Developer reference with examples, configuration, and patterns
Operator Guide - Deployment, monitoring, and troubleshooting
SQL Cheatsheet - Common monitoring queries
Monitoring
Check cluster health:
-- Quick health check
SELECT COUNT(*) as active_nodes FROM sync_node
WHERE last_heartbeat > NOW() - INTERVAL '15 seconds';
See Operator Guide for metrics, alerts, and troubleshooting.
Performance
- Memory: <1 MB per worker
- Database load: <10 queries/minute per worker
- Scalability: Tested with 2-50 workers
- Rebalancing: Only ~14% of tokens move when worker fails
- Distribution speed: 100-500ms for 10,000 tokens
Testing
# Install test dependencies
pip install pytest pytest-cov
# Run all tests
pytest tests/
# Run with coverage
pytest --cov=jobsync tests/
# Run specific test suite
pytest tests/test_coordination.py -v
Requirements
- Python 3.10+
- PostgreSQL 12+
- Required packages: psycopg (psycopg3), sqlalchemy (installed automatically)
License
MIT
Support
- Issues: https://github.com/bissli/jobsync/issues
- Documentation: See
docs/folder - Examples: See
tests/folder
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file jobsync-1.3.3.tar.gz.
File metadata
- Download URL: jobsync-1.3.3.tar.gz
- Upload date:
- Size: 28.9 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/2.3.2 CPython/3.14.3 Linux/6.8.0-1050-aws
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
24b259bc743ae2be292c79137231b66047fcb475952dca6d61c80eeb520ed383
|
|
| MD5 |
3c2fb515bed4aa3098c2be5126cd348f
|
|
| BLAKE2b-256 |
b37e9a9da694bc64a6c47c8406b7e0b44f2a123e422481b3b6227381ea7d599d
|
File details
Details for the file jobsync-1.3.3-py3-none-any.whl.
File metadata
- Download URL: jobsync-1.3.3-py3-none-any.whl
- Upload date:
- Size: 27.4 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/2.3.2 CPython/3.14.3 Linux/6.8.0-1050-aws
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
0ba4ce051d1e14e3473ea71a5b6211cf2cc51080fc6d3ea199e0a55b8f7dc4c0
|
|
| MD5 |
00473e435ff5ab9ee5bfe748bff50c5c
|
|
| BLAKE2b-256 |
67282e0c5be5e8cedf31762458913aada19a57fb7e2bcf2211971c22574cfa1e
|