A portable queue system for SyftBox with relative path support
Project description
SyftQueue
A portable queue system for SyftBox that manages jobs across datasites with built-in support for relative paths and pipeline progression.
Features
- Simple API: Create and manage job queues with just
q("queue_name") - Portable Jobs: Jobs are self-contained folders with
run.shas the entry point - Relative Path Support: Jobs maintain file references when moved between stages
- Pipeline Progression: Built-in API for advancing jobs through lifecycle stages
- Native Integration: Uses syft-objects for storage and permissions
- Cross-Datasite: Submit jobs between different SyftBox datasites
Installation
pip install syft-queue
Or install from source:
git clone https://github.com/OpenMined/syft-queue.git
cd syft-queue
pip install -e .
Quick Start
Creating a Queue and Job
from syft_queue import q
# Create or get a queue
queue = q("analysis_queue")
# Create a job
job = queue.create_job(
name="data_analysis",
requester_email="researcher@university.edu",
target_email="data_owner@hospital.org",
code_folder="./my_analysis_code"
)
Job Structure
Jobs are simple folders containing:
run.sh- The entry point script- Any supporting files/folders the script needs
Example job structure:
my_analysis/
├── run.sh # #!/bin/bash
│ # python analyze.py data.csv
├── analyze.py
├── requirements.txt
└── data.csv
Pipeline Progression
from syft_queue import approve, start, complete
# Progress job through stages
approve(job, approver="data_owner@hospital.org")
start(job, runner="compute-node-1")
# After execution
complete(job,
output_path="results/",
metrics={"runtime": 3600, "success": True}
)
Batch Operations
from syft_queue import process_queue
# Auto-process queue with rules
results = process_queue(
queue,
auto_approve=lambda job: job.requester_email.endswith("@trusted.org"),
auto_reject=lambda job: "No code" if not job.code_folder else None
)
Core Concepts
Job Lifecycle
Jobs progress through these statuses:
- inbox - Newly submitted, awaiting review
- approved - Approved, waiting for resources
- running - Currently executing
- completed - Successfully finished
- failed - Execution failed
- rejected - Denied by reviewer
- timedout - Exceeded time limit
Relative Path Support
Jobs automatically maintain relative paths when moved:
# Create job with relative path support (default)
job = queue.create_job(
name="portable_job",
code_folder="./code",
use_relative_paths=True # Default
)
# Job can be moved between stages/systems and paths still work
Execution Environment
When jobs run, they receive these environment variables:
JOB_UID- Unique job identifierJOB_NAME- Human-readable job nameJOB_DIR- Job's working directoryCODE_PATH- Path to code folderOUTPUT_PATH- Where to write outputs
Advanced Usage
Custom Approval Logic
def review_job(job):
"""Custom job review logic"""
if not job.code_folder:
return reject(job, "Missing code")
if "sensitive" in job.description:
return reject(job, "Contains sensitive keywords")
if job.requester_email.endswith("@university.edu"):
return approve(job, approver="auto-system")
# Needs manual review
return None
# Apply to all inbox jobs
for job in queue.list_jobs(JobStatus.inbox):
review_job(job)
Pipeline Builder (Extended API)
from syft_queue import PipelineBuilder
pipeline = (PipelineBuilder("ml_pipeline")
.stage("validation", JobStatus.inbox)
.stage("preprocessing", JobStatus.running)
.stage("training", JobStatus.running)
.stage("deployment", JobStatus.completed)
.transition("validation", "preprocessing",
condition=lambda j: validate_data(j))
.transition("preprocessing", "training")
.transition("training", "deployment",
condition=lambda j: check_accuracy(j) > 0.9)
.build()
)
# Process job through pipeline
pipeline.advance(job)
API Reference
Queue Management
q(name)- Create or get a queuelist_queues()- List all queuesget_queue(name)- Get existing queue
Job Operations
queue.create_job(...)- Create new jobqueue.get_job(uid)- Get job by IDqueue.list_jobs(status)- List jobs by status
Job Progression
approve(job, approver, notes)- Approve jobreject(job, reason, reviewer)- Reject jobstart(job, runner)- Start executioncomplete(job, output_path, metrics)- Mark completefail(job, error, exit_code)- Mark failedadvance(job, to_status)- Progress to next/specific status
Batch Operations
approve_all(jobs, condition)- Approve multiple jobsprocess_queue(queue, auto_approve, auto_reject)- Process with rules
Examples
See the examples/ directory for:
basic_usage.py- Simple queue operationspipeline_progression.py- Using the progression APIportable_jobs.py- Job portability demosbatch_processing.py- Processing multiple jobs
Development
Setup Development Environment
git clone https://github.com/OpenMined/syft-queue.git
cd syft-queue
pip install -e ".[dev]"
Run Tests
pytest
Code Style
black src/
ruff check src/
mypy src/
License
Apache License 2.0 - see LICENSE for details.
Contributing
Contributions are welcome! Please feel free to submit a Pull Request.
Support
- 📧 Email: contact@openmined.org
- 💬 Slack: Join #syftbox channel
- 🐛 Issues: GitHub Issues
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file syft_queue-0.4.81.tar.gz.
File metadata
- Download URL: syft_queue-0.4.81.tar.gz
- Upload date:
- Size: 174.9 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.4.20
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
2e4609999808766335a223ee9ccf5665aa7b65d278678e012f6f7bc3c74dd1b6
|
|
| MD5 |
7f100cc5852aebf622feaec2f9e835cd
|
|
| BLAKE2b-256 |
f5c759264afe3638ad6f624cffc7bf5af242ddeecde19098a00457c29959ffdd
|
File details
Details for the file syft_queue-0.4.81-py3-none-any.whl.
File metadata
- Download URL: syft_queue-0.4.81-py3-none-any.whl
- Upload date:
- Size: 56.3 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.4.20
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
a3970e02abc588d20636172ef9801be94d01ad95e636ecac3d01616f421f2ec3
|
|
| MD5 |
ba956ab16c7f5e82ad1f10b7b188ce5b
|
|
| BLAKE2b-256 |
1bbdbfd2a62cb03a4597e5b9163010bf1709e00aad5132e320c4cb6d646250a7
|