Skip to main content

Embedded-first job queue for Python, powered by Rust

Project description

Forge

Embedded job queue for Python. No Redis. No daemon. No broker.

Forge is a persistent, single-process job queue that embeds directly in your Python application. Jobs are stored durably via an append-only log (AOF) with SHA-256 checksums — same proven storage architecture as Kron (scheduler) and Flint (rate limiter).

import forge

queue = forge.Queue(data_dir=".forge")

queue.push("send_email", payload={"to": "user@example.com"}, priority=1)
queue.push("resize_image", payload={"path": "/tmp/x.png"}, delay="10s")

@queue.worker("send_email")
def handle_email(payload, context):
    send(payload["to"])

queue.run()

Features (v0.1)

  • Persistent queue — survives restarts via AOF + snapshot + compaction
  • Priority scheduling — higher number = higher priority
  • Delayed jobs — run after N seconds
  • Retry with exponential backoff — configurable max attempts
  • Dead letter queue — jobs that exceed max attempts are quarantined
  • Job history — full lifecycle per job (queued → claimed → succeeded/failed → retrying → dead)
  • Crash recovery — claimed-but-incomplete jobs are returned to the queue on restart
  • Exclusive data directory locking — single-writer semantics
  • Integrity verified — SHA-256 checksums on every AOF record and snapshot
  • Zero external dependencies — Rust core, no Redis, no system daemon

Installation

pip install forge

Or build from source:

pip install maturin
maturin develop

Usage

Basic

import forge

queue = forge.Queue(data_dir=".forge")

# Push a job with priority (higher = more important)
queue.push("send_email", payload={"to": "user@example.com"}, priority=1)

# Push a delayed job (runs after delay)
queue.push("resize_image", payload={"path": "/tmp/x.png"}, delay="10s")

Workers

@queue.worker("send_email")
def handle(payload, context):
    print(f"Sending email to {payload['to']}")
    # context.job_id, context.attempt available

Run

Blocking:

queue.run()  # processes jobs forever

Non-blocking (background thread):

queue.start()
# ... do other work ...
queue.stop()

CLI

forge --data-dir .forge queue list
forge --data-dir .forge job status <job_id>
forge --data-dir .forge job history <job_id>
forge --data-dir .forge dead list
forge --data-dir .forge dead retry <job_id>
forge --data-dir .forge doctor
forge --data-dir .forge compact

Storage

.forge/
├── forge.aof        # Append-only log (AOF)
├── forge.snapshot   # Compressed state snapshot
└── forge.lock       # Exclusive data directory lock

AOF events

Event Description
JOB_PUSHED A new job was enqueued
JOB_CLAIMED A worker claimed the job
JOB_SUCCEEDED Job completed successfully
JOB_FAILED Job failed (with error)
JOB_RETRYING Job scheduled for retry
JOB_DEAD Job moved to dead letter queue
JOB_REQUEUED_AFTER_CRASH Job re-queued during crash recovery

Forge vs Celery

Feature Forge Celery
Infrastructure None — embeds in your process Requires Redis/RabbitMQ + worker daemon
Installation pip install forge pip install celery + run Redis + run celeryd
Persistence Built-in AOF with SHA-256 checksums Delegates to broker (Redis with AOF, RabbitMQ)
Dead letter queue Built-in, per-job Requires broker-specific DLQ config
Job history Built-in per job_id lifecycle Requires external monitoring (Flower)
Crash recovery Automatic on restart Depends on broker config
Priority Built-in (numeric priority) Supported but broker-dependent
Delayed jobs Built-in (delay="10s") Via countdown/ETA, needs Redis
Retry Built-in exponential backoff Supported, requires config
Configuration Zero — single data_dir param Multiple settings, broker URL, result backend
Dependencies Zero external runtime deps Redis or RabbitMQ, celeryd process
Data integrity SHA-256 checksums on every record Depends on broker guarantees
Deployment Single python app.py app.py + celeryd + Redis + monitoring

Forge is ideal for applications that want a durable job queue without operating a distributed infrastructure. If you need multi-node workers, task routing, or massive throughput, use Celery. If you want reliability without complexity, use Forge.

Development

cargo build
cargo test

The project uses the same architectural patterns as Kron and Flint:

  • AOF — Append-only log with NDJSON format
  • Snapshot/compaction — Atomic .tmp → fsync → rename → fsync dir
  • Checksums — SHA-256 with constant-time comparison
  • Locking — BSD flock via fs2
  • PyO3 — GIL released during blocking Rust calls

License

BSD-3-Clause

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distributions

No source distribution files available for this release.See tutorial on generating distribution archives.

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

forge_queue-0.1.1-cp312-cp312-manylinux_2_34_x86_64.whl (5.2 MB view details)

Uploaded CPython 3.12manylinux: glibc 2.34+ x86-64

File details

Details for the file forge_queue-0.1.1-cp312-cp312-manylinux_2_34_x86_64.whl.

File metadata

File hashes

Hashes for forge_queue-0.1.1-cp312-cp312-manylinux_2_34_x86_64.whl
Algorithm Hash digest
SHA256 8b29c2e61c601394308892bdcbc2169c28c1d90763e410cb1fb734513d2f242f
MD5 82403be1f73f506bc2756b7c8ef9b241
BLAKE2b-256 84790d0ada44f3aae575965fccf06cdda45a963e535f085ee291909821812c16

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page