Skip to main content

FastAPI job queue with SQLite-backed persistence for ZnDraw

Project description

ZnDraw Job Management Library

A self-contained FastAPI package for distributed job/task management with SQL persistence. Provides a pluggable router, ORM models, a client SDK with auto-serve, provider-based data reads, and server-side taskiq workers.

Integration into your APP

from fastapi import FastAPI
from zndraw_auth import current_active_user, current_superuser
from zndraw_auth.db import get_session_maker
from zndraw_joblib.router import router
from zndraw_joblib.exceptions import ProblemException, problem_exception_handler
from zndraw_joblib.sweeper import run_sweeper
from zndraw_joblib.settings import JobLibSettings

app = FastAPI()

# 1. Override session maker dependency at auth level
#    All database access (from auth and joblib) flows through this
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession
engine = create_async_engine("sqlite+aiosqlite:///./app.db")
my_session_maker = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
app.dependency_overrides[get_session_maker] = lambda: my_session_maker

# 2. Override auth dependencies (from zndraw_auth)
# app.dependency_overrides[current_active_user] = my_get_current_user
# app.dependency_overrides[current_superuser] = my_get_superuser

# 3. Register exception handler and router
app.add_exception_handler(ProblemException, problem_exception_handler)
app.include_router(router)

# 4. Start background sweeper
async def get_session():
    async with my_session_maker() as session:
        yield session

settings = JobLibSettings()
# asyncio.create_task(run_sweeper(get_session=get_session, settings=settings))

Dependency Architecture

All database access flows through zndraw_auth.db.get_session_maker:

get_session_maker (from zndraw_auth)  <- override this one dependency
  +- SessionDep (regular endpoints)
  +- SessionMakerDep (long-polling endpoints)
Dependency Override? Purpose
get_session_maker Yes Single source of truth for all DB sessions (from zndraw_auth)
current_active_user Yes (from zndraw_auth) Authenticated user identity
current_superuser Yes (from zndraw_auth) Superuser access control
verify_writable_room Optional Room writability guard for register_job and submit_task
get_tsio Optional Socket.IO server for real-time events
get_result_backend Yes (for providers) Result caching backend for provider reads
get_settings Optional Override JobLibSettings defaults

Note: SQLite locking is handled by the host application. For SQLite databases, wrap the session maker with a lock in your app's lifespan context.

Room Writability Guard

The verify_writable_room dependency guards write endpoints (register_job, submit_task). By default it only validates the room_id format. Host apps can override it to add lock checks:

from fastapi import Path
from zndraw_joblib import verify_writable_room, validate_room_id

async def get_writable_room(
    session: SessionDep,
    current_user: CurrentUserDep,
    redis: RedisDep,
    room_id: str = Path(),
) -> str:
    validate_room_id(room_id)  # format validation (@ and : checks)
    room = await verify_room(session, room_id)
    if room.locked and not current_user.is_superuser:
        raise HTTPException(status_code=423, detail="Room is locked")
    return room_id

app.dependency_overrides[verify_writable_room] = get_writable_room

Read endpoints and existing task/worker operations (updates, heartbeats, disconnects) are not affected by this guard.

Configuration

Settings via environment variables with ZNDRAW_JOBLIB_ prefix:

Variable Default Purpose
ZNDRAW_JOBLIB_ALLOWED_CATEGORIES ["modifiers", "selections", "analysis"] Valid job categories
ZNDRAW_JOBLIB_WORKER_TIMEOUT_SECONDS 60 Stale heartbeat threshold
ZNDRAW_JOBLIB_SWEEPER_INTERVAL_SECONDS 30 Sweeper cycle interval
ZNDRAW_JOBLIB_LONG_POLL_MAX_WAIT_SECONDS 60 Max long-poll wait
ZNDRAW_JOBLIB_CLAIM_MAX_ATTEMPTS 10 Retries for concurrent claim contention
ZNDRAW_JOBLIB_CLAIM_BASE_DELAY_SECONDS 0.01 Exponential backoff base delay
ZNDRAW_JOBLIB_INTERNAL_TASK_TIMEOUT_SECONDS 3600 Timeout for stuck @internal tasks
ZNDRAW_JOBLIB_ALLOWED_PROVIDER_CATEGORIES None (unrestricted) Valid provider categories
ZNDRAW_JOBLIB_PROVIDER_RESULT_TTL_SECONDS 300 Cached provider result lifetime
ZNDRAW_JOBLIB_PROVIDER_INFLIGHT_TTL_SECONDS 30 Inflight lock lifetime

Job Naming Convention

Jobs use the format: <room_id>:<category>:<name>

  • @global:modifiers:Rotate - global job available to all rooms
  • room_123:modifiers:Rotate - private job for room_123 only
  • @internal:modifiers:Rotate - server-side job executed via taskiq

Validation rules:

  • room_id cannot contain @ (reserved for @global/@internal) or : (delimiter)
  • category must be in settings.allowed_categories
  • Same job name in same room: schema must match (409 Conflict otherwise)
  • Different rooms can have same job name with different schemas

Client SDK

The JobManager is the main entry point for Python workers. It handles job registration, task claiming, provider dispatch, and background lifecycle management.

Basic Usage

from zndraw_joblib import JobManager, Extension, Category

# Auto-serve mode: background threads claim and execute tasks
with JobManager(api, tsio=tsio, execute=my_execute) as manager:
    @manager.register
    class Rotate(Extension):
        category: ClassVar[Category] = Category.MODIFIER
        angle: float = 0.0

        def run(self, vis, **kwargs):
            # modify vis based on self.angle
            pass

    manager.wait()  # blocks until SIGINT/SIGTERM or disconnect()
# disconnect() called automatically: threads joined, worker deleted

Extension Classes

Extensions are Pydantic models with a category ClassVar and a run() method:

from typing import ClassVar, Any
from zndraw_joblib import Extension, Category

class Rotate(Extension):
    category: ClassVar[Category] = Category.MODIFIER  # or SELECTION, ANALYSIS
    angle: float = 0.0
    axis: str = "z"

    def run(self, vis: Any, **kwargs: Any) -> None:
        # Implementation here
        pass

The JSON Schema is auto-generated from Pydantic fields and sent to the server on registration.

Auto-Serve Mode

When an execute callback is provided, JobManager runs background threads that automatically claim and execute tasks:

from zndraw_joblib import JobManager, ClaimedTask

def execute(task: ClaimedTask) -> None:
    """Called for each claimed task."""
    task.extension.run(vis)

manager = JobManager(
    api,
    tsio=tsio,
    execute=execute,
    polling_interval=2.0,      # how often to poll for tasks (seconds)
    heartbeat_interval=30.0,   # heartbeat frequency (seconds)
)

Background threads start on the first register() or register_provider() call:

  • Heartbeat thread - periodic keep-alives to prevent sweeper cleanup
  • Claim loop thread - polls for tasks, calls execute, marks completed/failed

The lifecycle is fully managed: start() is called before execute, complete() or fail() after. Exceptions in execute mark the task as failed with the error message.

Manual Mode

Without execute, tasks must be claimed and processed manually:

manager = JobManager(api, tsio=tsio)

@manager.register
class Rotate(Extension):
    category: ClassVar[Category] = Category.MODIFIER
    angle: float = 0.0
    def run(self, vis, **kwargs): ...

# Manual claim-execute loop
for task in manager.listen(polling_interval=2.0):
    manager.start(task)
    try:
        task.extension.run(vis)
        manager.complete(task)
    except Exception as e:
        manager.fail(task, str(e))

Task Submission

task_id = manager.submit(
    Rotate(angle=90.0),
    room="room_123",
    job_room="@global",  # room where the job is registered
)

Provider Registration

Providers handle server-dispatched read requests (see Providers):

from zndraw_joblib import Provider

class FilesystemRead(Provider):
    category: ClassVar[str] = "filesystem"
    path: str = "/"

    def read(self, handler):
        return handler.ls(self.path, detail=True)

# Binary provider (e.g. msgpack, arrow, parquet)
class AtomsProvider(Provider):
    category: ClassVar[str] = "atoms"
    content_type: ClassVar[str] = "application/x-msgpack"
    index: int = 0

    def read(self, handler) -> bytes:
        return handler.get_atoms_msgpack(self.index)

manager.register_provider(
    FilesystemRead,
    name="local",
    handler=fsspec.filesystem("file"),
    room="@global",
)

# Access handlers during job execution
print(manager.handlers)  # {"@global:filesystem:local": <LocalFileSystem>}

Lifecycle Management

# Context manager (recommended)
with JobManager(api, execute=execute) as manager:
    # ... register jobs/providers ...
    manager.wait()  # blocks until disconnect

# Manual lifecycle
manager = JobManager(api, execute=execute)
# ... register jobs/providers ...
manager.disconnect()  # idempotent, safe to call multiple times

disconnect() is idempotent and handles:

  1. Signaling background threads to stop
  2. Joining all threads (waits for in-flight tasks to finish)
  3. Emitting LeaveJobRoom/LeaveProviderRoom events
  4. Calling DELETE /workers/{id} for server-side cleanup

Signal handlers (SIGINT/SIGTERM) call disconnect() automatically.

REST Endpoints

All endpoints prefixed with /v1/joblib.

Workers

POST   /workers                         # Create worker (201)
GET    /workers                         # List workers (paginated)
PATCH  /workers/{worker_id}             # Heartbeat
DELETE /workers/{worker_id}             # Delete + cascade cleanup (204)

Jobs

PUT    /rooms/{room_id}/jobs            # Register job (idempotent, 201/200)
GET    /rooms/{room_id}/jobs            # List jobs (room + @global, paginated)
GET    /rooms/{room_id}/jobs/{job_name} # Job details
GET    /rooms/{room_id}/jobs/{job_name}/tasks  # Tasks for job (paginated)

Tasks

POST   /rooms/{room_id}/tasks/{job_name}  # Submit task (202 Accepted)
POST   /tasks/claim                        # Claim oldest pending (FIFO)
GET    /tasks/{task_id}                    # Status (supports Prefer: wait=N)
PATCH  /tasks/{task_id}                    # Update status
GET    /rooms/{room_id}/tasks              # List room tasks (paginated)

Task Lifecycle

PENDING -> CLAIMED -> RUNNING -> COMPLETED
                              -> FAILED
                   -> CANCELLED
         -> CANCELLED

Claiming uses optimistic locking with exponential backoff for concurrent safety.

Long-polling: GET /tasks/{id} with Prefer: wait=N header (max long_poll_max_wait_seconds). Returns immediately on terminal states.

Providers

Providers are a generic abstraction for connected Python clients to serve data on demand. While jobs are user-initiated computation (workers pull tasks), providers handle server-dispatched read requests with result caching.

Jobs Providers
Purpose User-initiated computation Remote resource access
Dispatch Workers pull/claim (FIFO) Server pushes to specific provider
Results Side effects (modify room state) Data returned to caller (cached)
Formats JSON payloads JSON or binary (msgpack, arrow, etc.)
HTTP POST (creates task) GET (reads resource) -> 200 or 202

Content Types

Each provider declares its response format via content_type: ClassVar[str] (defaults to "application/json"). This is stored on the ProviderRecord at registration and used as the response media_type when returning cached results. Providers with content_type != "application/json" must return bytes from read().

The result upload endpoint stores raw request bytes as-is -- no parsing or re-serialization. The X-Request-Hash header identifies the request.

Provider Endpoints

PUT    /rooms/{room_id}/providers                        # Register (201/200)
GET    /rooms/{room_id}/providers                        # List (paginated)
GET    /rooms/{room_id}/providers/{name}/info             # Schema + metadata
GET    /rooms/{room_id}/providers/{name}?params           # Read (200 cached / 202 dispatched)
POST   /providers/{provider_id}/results                   # Upload result (204, X-Request-Hash header)
DELETE /providers/{provider_id}                            # Unregister (204)

Read Request Flow

1. Frontend: GET /rooms/room-42/providers/@global:filesystem:local?path=/data
2. Server:   check cache -> HIT: return 200
                         -> MISS: acquire inflight, emit ProviderRequest -> return 202
3. Client:   receives ProviderRequest via Socket.IO
             calls provider.read(handler)
             POST /providers/{id}/results (raw body + X-Request-Hash header)
4. Server:   store raw bytes in ResultBackend, emit ProviderResultReady
5. Frontend: receives ProviderResultReady, re-fetches -> 200 (content_type from provider)

Result Backend

Provider reads require a ResultBackend for caching and inflight coalescing. The host app must override get_result_backend:

from zndraw_joblib.dependencies import get_result_backend

class RedisResultBackend:
    def __init__(self, redis):
        self._redis = redis

    async def store(self, key: str, data: bytes, ttl: int) -> None:
        await self._redis.set(key, data, ex=ttl)

    async def get(self, key: str) -> bytes | None:
        return await self._redis.get(key)

    async def delete(self, key: str) -> None:
        await self._redis.delete(key)

    async def acquire_inflight(self, key: str, ttl: int) -> bool:
        return await self._redis.set(key, b"1", nx=True, ex=ttl)

    async def release_inflight(self, key: str) -> None:
        await self._redis.delete(key)

app.dependency_overrides[get_result_backend] = lambda: RedisResultBackend(redis)

Internal TaskIQ Workers

For server-side jobs that should execute without an external Python client, use the @internal room with taskiq:

from zndraw_joblib import register_internal_jobs

# In your FastAPI app lifespan:
await register_internal_jobs(
    app=app,
    broker=redis_broker,
    extensions=[MyServerSideJob],
    executor=my_executor,
    session_factory=my_session_maker,
)

This registers extensions as taskiq tasks, creates @internal:category:name job rows in the database, and stores the InternalRegistry on app.state.internal_registry.

For external taskiq worker processes (no FastAPI app):

from zndraw_joblib import register_internal_tasks

registry = register_internal_tasks(
    broker=redis_broker,
    extensions=[MyServerSideJob],
    executor=my_executor,
)

Internal tasks that exceed internal_task_timeout_seconds are automatically failed by the sweeper.

Socket.IO Real-Time Events

The package emits real-time events via zndraw-socketio. The host app provides its AsyncServerWrapper through dependency injection:

from zndraw_socketio import wrap
from zndraw_joblib.dependencies import get_tsio

tsio = wrap(socketio.AsyncServer(async_mode="asgi"))
app.dependency_overrides[get_tsio] = lambda: tsio

When get_tsio returns None (default), all event emissions are skipped.

Event Models

All models are frozen Pydantic BaseModels (hashable for set-based deduplication).

Event Payload Room Target Trigger
JobsInvalidate (none) room:{room_id} Job registered/deleted, worker connected/disconnected
TaskAvailable job_name, room_id, task_id jobs:{full_name} Task submitted (non-@internal only)
TaskStatusEvent id, name, room_id, status, timestamps, worker_id, error room:{room_id} Any task status transition
ProvidersInvalidate (none) room:{room_id} Provider registered/deleted, worker disconnected
ProviderRequest request_id, provider_name, params providers:{full_name} Server dispatches read to provider
ProviderResultReady provider_name, request_hash room:{room_id} Provider result cached
JoinJobRoom job_name, worker_id (client -> server) Worker joins notification room
LeaveJobRoom job_name, worker_id (client -> server) Worker leaves notification room
JoinProviderRoom provider_name, worker_id (client -> server) Client joins provider dispatch room
LeaveProviderRoom provider_name, worker_id (client -> server) Client leaves provider dispatch room

Emission Deduplication

Internally, emissions are Emission(NamedTuple) pairs of (event, room). Functions that modify state return set[Emission], and callers emit after commit. Frozen models ensure duplicate events are deduplicated automatically.

Worker Notification Pattern

# 1. Register job via REST
client.put("/v1/joblib/rooms/@global/jobs", json={...})

# 2. Join the job's socketio room
await sio.emit(JoinJobRoom(job_name="@global:modifiers:Rotate", worker_id="..."))

# 3. Receive TaskAvailable when tasks are submitted
@sio.on(TaskAvailable)
async def on_task_available(sid: str, data: TaskAvailable):
    await worker.claim_and_run(data.job_name)

Server-Side Disconnect Cleanup

When a worker's Socket.IO connection drops, the host app can immediately clean up:

from zndraw_joblib import cleanup_worker, emit

@tsio.on("disconnect")
async def on_disconnect(sid: str, reason: str):
    session = await tsio.get_session(sid)
    worker_id = session.get("worker_id")
    if worker_id:
        async with get_session() as db:
            worker = await db.get(Worker, UUID(worker_id))
            if worker:
                emissions = await cleanup_worker(db, worker)
                await db.commit()
                await emit(tsio, emissions)
Disconnect Scenario Handler
Network drop / process kill Server-side SIO disconnect (immediate)
Graceful shutdown (with manager:) Client disconnect() emits leave events + DELETE /workers
REST-only workers (no SIO) Background sweeper heartbeat timeout

Background Sweeper

Host app starts explicitly:

from zndraw_joblib import run_sweeper

asyncio.create_task(
    run_sweeper(get_session=my_session_factory, settings=settings, tsio=tsio)
)

The sweeper runs periodically (sweeper_interval_seconds) and:

  1. Finds workers with stale last_heartbeat (beyond worker_timeout_seconds)
  2. Marks their running/claimed tasks as FAILED
  3. Removes orphan jobs (no workers, no pending tasks, not @internal)
  4. Cleans up @internal tasks stuck beyond internal_task_timeout_seconds
  5. Emits TaskStatusEvent and JobsInvalidate events after each cleanup cycle

Error Handling (RFC 9457)

All errors use RFC 9457 Problem Details format:

Exception Status Description
JobNotFound 404 Job does not exist
SchemaConflict 409 Job schema differs from existing registration
InvalidCategory 400 Category not in allowed list
WorkerNotFound 404 Worker does not exist
TaskNotFound 404 Task does not exist
InvalidTaskTransition 409 Invalid status transition
InvalidRoomId 400 Room ID contains @ or :
Forbidden 403 Admin privileges required
InternalJobNotConfigured 503 Internal job has no executor
ProviderNotFound 404 Provider does not exist

ORM Models

Models use SQLAlchemy 2.0 ORM inheriting from zndraw_auth.Base:

  • Job - (room_id, category, name) unique, soft-deleted via deleted flag
  • Worker - Tracks last_heartbeat, linked to user via user_id
  • Task - Status state machine, linked to job and claiming worker
  • WorkerJobLink - M:N bridge between Worker and Job
  • ProviderRecord - (room_id, category, name) unique, linked to worker, stores content_type for response media type

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

zndraw_joblib-0.1.7.tar.gz (293.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

zndraw_joblib-0.1.7-py3-none-any.whl (40.0 kB view details)

Uploaded Python 3

File details

Details for the file zndraw_joblib-0.1.7.tar.gz.

File metadata

  • Download URL: zndraw_joblib-0.1.7.tar.gz
  • Upload date:
  • Size: 293.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for zndraw_joblib-0.1.7.tar.gz
Algorithm Hash digest
SHA256 ada4087c32136bc8be7308267fcaee47952c8830db5312f34df29464d02d73e7
MD5 73068fd1634968123d3b38a64b994b09
BLAKE2b-256 f61bf0a5b0218ee4484ed6f89124afd17fbee002740b7ef79d27e29ee8c060db

See more details on using hashes here.

Provenance

The following attestation bundles were made for zndraw_joblib-0.1.7.tar.gz:

Publisher: publish.yaml on PythonFZ/zndraw-joblib

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file zndraw_joblib-0.1.7-py3-none-any.whl.

File metadata

  • Download URL: zndraw_joblib-0.1.7-py3-none-any.whl
  • Upload date:
  • Size: 40.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for zndraw_joblib-0.1.7-py3-none-any.whl
Algorithm Hash digest
SHA256 9d0c8f5a1213ecd03ee4b26c3735d3f1ed2a479d5b5cf057b450dfe5c2d5d534
MD5 7b78bdb76ee15c4eec6b5136681a0b18
BLAKE2b-256 1659af6ad6674648339736e3514b708c48f2aff484aa56ccd41121fe67284fc8

See more details on using hashes here.

Provenance

The following attestation bundles were made for zndraw_joblib-0.1.7-py3-none-any.whl:

Publisher: publish.yaml on PythonFZ/zndraw-joblib

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page