Skip to main content

A portable queue system for SyftBox with relative path support

Project description

SyftQueue

A portable queue system for SyftBox that manages jobs across datasites with built-in support for relative paths and pipeline progression.

Features

  • Simple API: Create and manage job queues with just q("queue_name")
  • Portable Jobs: Jobs are self-contained folders with run.sh as the entry point
  • Relative Path Support: Jobs maintain file references when moved between stages
  • Pipeline Progression: Built-in API for advancing jobs through lifecycle stages
  • Native Integration: Uses syft-objects for storage and permissions
  • Cross-Datasite: Submit jobs between different SyftBox datasites

Installation

pip install syft-queue

Or install from source:

git clone https://github.com/OpenMined/syft-queue.git
cd syft-queue
pip install -e .

Quick Start

Creating a Queue and Job

from syft_queue import q

# Create or get a queue
queue = q("analysis_queue")

# Create a job
job = queue.create_job(
    name="data_analysis",
    requester_email="researcher@university.edu",
    target_email="data_owner@hospital.org",
    code_folder="./my_analysis_code"
)

Job Structure

Jobs are simple folders containing:

  • run.sh - The entry point script
  • Any supporting files/folders the script needs

Example job structure:

my_analysis/
├── run.sh          # #!/bin/bash
│                   # python analyze.py data.csv
├── analyze.py
├── requirements.txt
└── data.csv

Pipeline Progression

from syft_queue import approve, start, complete

# Progress job through stages
approve(job, approver="data_owner@hospital.org")
start(job, runner="compute-node-1")

# After execution
complete(job, 
    output_path="results/",
    metrics={"runtime": 3600, "success": True}
)

Batch Operations

from syft_queue import process_queue

# Auto-process queue with rules
results = process_queue(
    queue,
    auto_approve=lambda job: job.requester_email.endswith("@trusted.org"),
    auto_reject=lambda job: "No code" if not job.code_folder else None
)

Core Concepts

Job Lifecycle

Jobs progress through these statuses:

  1. inbox - Newly submitted, awaiting review
  2. approved - Approved, waiting for resources
  3. running - Currently executing
  4. completed - Successfully finished
  5. failed - Execution failed
  6. rejected - Denied by reviewer
  7. timedout - Exceeded time limit

Relative Path Support

Jobs automatically maintain relative paths when moved:

# Create job with relative path support (default)
job = queue.create_job(
    name="portable_job",
    code_folder="./code",
    use_relative_paths=True  # Default
)

# Job can be moved between stages/systems and paths still work

Execution Environment

When jobs run, they receive these environment variables:

  • JOB_UID - Unique job identifier
  • JOB_NAME - Human-readable job name
  • JOB_DIR - Job's working directory
  • CODE_PATH - Path to code folder
  • OUTPUT_PATH - Where to write outputs

Advanced Usage

Custom Approval Logic

def review_job(job):
    """Custom job review logic"""
    if not job.code_folder:
        return reject(job, "Missing code")
    
    if "sensitive" in job.description:
        return reject(job, "Contains sensitive keywords")
    
    if job.requester_email.endswith("@university.edu"):
        return approve(job, approver="auto-system")
    
    # Needs manual review
    return None

# Apply to all inbox jobs
for job in queue.list_jobs(JobStatus.inbox):
    review_job(job)

Pipeline Builder (Extended API)

from syft_queue import PipelineBuilder

pipeline = (PipelineBuilder("ml_pipeline")
    .stage("validation", JobStatus.inbox)
    .stage("preprocessing", JobStatus.running)
    .stage("training", JobStatus.running)
    .stage("deployment", JobStatus.completed)
    .transition("validation", "preprocessing",
               condition=lambda j: validate_data(j))
    .transition("preprocessing", "training")
    .transition("training", "deployment",
               condition=lambda j: check_accuracy(j) > 0.9)
    .build()
)

# Process job through pipeline
pipeline.advance(job)

API Reference

Queue Management

  • q(name) - Create or get a queue
  • list_queues() - List all queues
  • get_queue(name) - Get existing queue

Job Operations

  • queue.create_job(...) - Create new job
  • queue.get_job(uid) - Get job by ID
  • queue.list_jobs(status) - List jobs by status

Job Progression

  • approve(job, approver, notes) - Approve job
  • reject(job, reason, reviewer) - Reject job
  • start(job, runner) - Start execution
  • complete(job, output_path, metrics) - Mark complete
  • fail(job, error, exit_code) - Mark failed
  • advance(job, to_status) - Progress to next/specific status

Batch Operations

  • approve_all(jobs, condition) - Approve multiple jobs
  • process_queue(queue, auto_approve, auto_reject) - Process with rules

Examples

See the examples/ directory for:

  • basic_usage.py - Simple queue operations
  • pipeline_progression.py - Using the progression API
  • portable_jobs.py - Job portability demos
  • batch_processing.py - Processing multiple jobs

Development

Setup Development Environment

git clone https://github.com/OpenMined/syft-queue.git
cd syft-queue
pip install -e ".[dev]"

Run Tests

pytest

Code Style

black src/
ruff check src/
mypy src/

License

Apache License 2.0 - see LICENSE for details.

Contributing

Contributions are welcome! Please feel free to submit a Pull Request.

Support

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

syft_queue-0.4.81.tar.gz (174.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

syft_queue-0.4.81-py3-none-any.whl (56.3 kB view details)

Uploaded Python 3

File details

Details for the file syft_queue-0.4.81.tar.gz.

File metadata

  • Download URL: syft_queue-0.4.81.tar.gz
  • Upload date:
  • Size: 174.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.4.20

File hashes

Hashes for syft_queue-0.4.81.tar.gz
Algorithm Hash digest
SHA256 2e4609999808766335a223ee9ccf5665aa7b65d278678e012f6f7bc3c74dd1b6
MD5 7f100cc5852aebf622feaec2f9e835cd
BLAKE2b-256 f5c759264afe3638ad6f624cffc7bf5af242ddeecde19098a00457c29959ffdd

See more details on using hashes here.

File details

Details for the file syft_queue-0.4.81-py3-none-any.whl.

File metadata

File hashes

Hashes for syft_queue-0.4.81-py3-none-any.whl
Algorithm Hash digest
SHA256 a3970e02abc588d20636172ef9801be94d01ad95e636ecac3d01616f421f2ec3
MD5 ba956ab16c7f5e82ad1f10b7b188ce5b
BLAKE2b-256 1bbdbfd2a62cb03a4597e5b9163010bf1709e00aad5132e320c4cb6d646250a7

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page