Skip to main content

Postgres-native background job queue — Python SDK with async/sync workers, transactional enqueue, and progress tracking. Install awa-pg[ui] to add the bundled web dashboard.

Project description

Awa

Postgres-native job queue for Rust and Python.

Awa (Māori: river) fills the gap between Postgres event queues that are too narrow for real job-queue behavior and language-specific job frameworks (River, Oban, Sidekiq) that couple you to one ecosystem. If you run Rust or Python (or both) on Postgres and want priorities, cron, DLQ, and transactional enqueue without Redis or RabbitMQ, Awa is built for you.

AWA Web UI — Jobs (dark mode)

Features

Core queue

  • Transactional enqueue — insert jobs inside your business transaction. Commit = visible. Rollback = gone.
  • Unique jobs — declare uniqueness by kind/queue/args; cancel by unique key without storing job IDs.
  • Priorities, retries, snoozes — exponential backoff with jitter; priority aging for fairness.
  • Dead Letter Queue — first-class DLQ with per-queue opt-in, retention, and operator retry/purge.
  • Periodic/cron jobs — leader-elected scheduler with timezone support and atomic enqueue.
  • Sequential callbackswait_for_callback() / resume_external() for multi-step orchestration within a single handler.
  • Webhook callbacks — park jobs for external completion with optional CEL-expression filtering.

Runtime

  • Rust and Python workers — same queues, same storage engine, mixed deployments.
  • Crash recovery — heartbeat + hard deadline rescue. Stale jobs recovered automatically.
  • Runtime-owned maintenance — dispatch, rescue, segment rotation, and pruning run in the worker fleet; no pg_cron ticker required.
  • Segmented queue storage — append-only ready and terminal entries with rotating lease segments; queue history and execution churn stay off the dispatch path.
  • LISTEN/NOTIFY wakeup — millisecond-scale pickup latency.
  • HTTP Worker — feature-gated worker that dispatches jobs to serverless functions (Lambda, Cloud Run) via HTTP with HMAC-BLAKE3 callback auth.
  • Weighted concurrency + rate limiting — global worker pool with per-queue guarantees; per-queue token bucket.

Operations

  • Web UI — dashboard, job inspector, queue management, cron controls, DLQ retry/purge.
  • Structured progress — handlers report percent, message, and checkpoint metadata; persisted across retries.
  • OpenTelemetry metrics — 20+ built-in counters, histograms, and gauges for Prometheus/Grafana. Python workers enable export with awa.init_telemetry(endpoint, service); Rust workers install their own provider.
  • Operator descriptors — code-declared queue and job-kind names/descriptions with stale/drift visibility in the UI.
  • Postgres-only — one dependency you already have; no Redis, no RabbitMQ, no separate scheduler.

AWA Web UI — Queue detail (dark mode)

Correctness

Core concurrency invariants — no duplicate processing after rescue, stale completions rejected, no claim/rotate/prune deadlock, DLQ round-trip safety, prune-segment emptiness, heartbeat-driven short-job rescue — are checked by TLA+ models covering the segmented storage engine, the lock-ordering protocol, and the single/multi-instance worker runtime. The storage model has a trace-replay harness that verifies concrete runtime-test event sequences against the spec.

Delivery Contract

  • Transactional enqueue is a core Postgres-native feature: enqueue inside the same transaction as application data, and the job commits or rolls back with that data.
  • At-least-once delivery is the contract. Awa rejects stale completions and rescues stuck work, but it does not promise “exactly once”.
  • Idempotency is recommended for handlers, because retries and recovery are part of the honest failure model.
  • No lost work under failure takes priority over clever fast paths. If a design weakens crash/restart safety, it loses even if the benchmark looks better.

Benchmarks

Local queue-storage soak, 5k-job runtime run: 9.5k jobs/s, 22 ms p95 pickup, 417 exact final dead tuples. Enqueue: ~30k/s single-producer, ~100k/s multi-producer.

A phase-driven portable benchmark harness comparing Awa against pgque, procrastinate, pg-boss, river, oban, and pgmq on a shared Postgres instance lives in its own repository: hardbyte/postgresql-job-queue-benchmarking. It records producer, subscriber, and end-to-end delivery latency alongside throughput, queue depth, and dead tuples over time.

Methodology and caveats live in benchmarking notes. Validation artifacts: ADR-019 (queue storage) and ADR-023 (receipt-plane ring partitioning).

Where Awa Fits

Awa is for teams that already trust Postgres and want a real job queue, not just a stream or a framework tied to one host language.

  • Choose Awa when you want priorities, unique jobs, retries, cron, callbacks, DLQ, and operator tooling on one Postgres-backed runtime.
  • Choose PgQue-style systems when you want an event queue with independent consumer cursors and event-log semantics first.
  • Choose River or Oban Pro when you want a job framework tightly shaped around one surrounding language ecosystem.

See docs/positioning.md for the category map and messaging guidance.

Getting Started

# 1. Install
pip install 'awa-pg[ui]'       # Python SDK + dashboard binary
# pip install awa-pg           # SDK only (no dashboard, smaller wheel)
# or: cargo add awa            # Rust

# 2. Start Postgres and run migrations
awa --database-url $DATABASE_URL migrate

# 3. Write a worker and start processing (see examples below)

# 4. Monitor
awa --database-url $DATABASE_URL serve   # → http://127.0.0.1:3000
awa --database-url $DATABASE_URL storage status
awa --database-url $DATABASE_URL job dump 123
awa --database-url $DATABASE_URL job dump-run 123

The Awa mental model: your app inserts durable queue entries inside Postgres, often in the same transaction as business data; workers claim runnable entries through short-lived execution leases and rescue stale work after crashes; long-running attempts touch attempt_state only when they need mutable data like progress or callback state; operators inspect live, terminal, and DLQ state through the CLI or the built-in UI.

Language-specific guides:

Configuring real workloads:

Already running 0.5? Read the 0.5 → 0.6 upgrade guide before you bump — 0.6 introduces a staged storage transition (canonical → prepared → mixed_transition → active) with a refused-by-default gate that expects the operator to roll out queue-storage-capable workers first.

Python Example

import awa
import asyncio
from dataclasses import dataclass

@dataclass
class SendEmail:
    to: str
    subject: str

async def main():
    client = awa.AsyncClient("postgres://localhost/mydb")
    await client.migrate()

    @client.task(SendEmail, queue="email")
    async def handle_email(job):
        print(f"Sending to {job.args.to}: {job.args.subject}")

    await client.insert(
        SendEmail(to="alice@example.com", subject="Welcome"),
        queue="email",
    )

    client.start([("email", 2)])
    await asyncio.sleep(1)
    await client.shutdown()

asyncio.run(main())

Progress tracking — checkpoint and resume on retry:

@client.task(BatchImport, queue="etl")
async def handle_import(job):
    last_id = (job.progress or {}).get("metadata", {}).get("last_id", 0)
    for item in fetch_items(after=last_id):
        process(item)
        job.set_progress(50, "halfway")
        job.update_metadata({"last_id": item.id})
    await job.flush_progress()

Transactional enqueue — atomic with your business logic:

async with await client.transaction() as tx:
    await tx.execute("INSERT INTO orders (id) VALUES ($1)", order_id)
    await tx.insert(SendEmail(to="alice@example.com", subject="Order confirmed"))

Sync API for Django/Flask — use awa.Client for sync frameworks; all methods are plain (no suffix):

client = awa.Client("postgres://localhost/mydb")
client.migrate()
job = client.insert(SendEmail(to="bob@example.com", subject="Hello"))

Sequential callbacks — suspend a handler, wait for an external system, then resume:

@client.task(ProcessPayment, queue="payments")
async def handle_payment(job):
    token = await job.register_callback(timeout_seconds=3600)
    send_to_payment_gateway(token.id, job.args.amount)
    result = await job.wait_for_callback(token)
    # result contains the payload from resume_external()
    await record_payment(job.args.order_id, result)

The external system calls await client.resume_external(callback_id, {"status": "paid"}) to wake the handler.

Periodic jobs — leader-elected cron scheduling with timezone support:

client.periodic(
    "daily_report", "0 9 * * *",
    GenerateReport, GenerateReport(format="pdf"),
    timezone="Pacific/Auckland",
)

6-field expressions with seconds precision are also supported: "*/15 * * * * *" fires every 15 seconds.

See examples/python/ for complete runnable scripts tested in CI.

Rust Example

use awa::{Client, QueueConfig, JobArgs, JobResult, JobError, JobContext, Worker};
use serde::{Serialize, Deserialize};

#[derive(Debug, Serialize, Deserialize, JobArgs)]
struct SendEmail {
    to: String,
    subject: String,
}

struct SendEmailWorker;

#[async_trait::async_trait]
impl Worker for SendEmailWorker {
    fn kind(&self) -> &'static str { "send_email" }

    async fn perform(&self, ctx: &JobContext) -> Result<JobResult, JobError> {
        let args: SendEmail = serde_json::from_value(ctx.job.args.clone())
            .map_err(|e| JobError::terminal(e.to_string()))?;
        send_email(&args.to, &args.subject).await
            .map_err(JobError::retryable)?;
        Ok(JobResult::Completed)
    }
}

// Insert a job (with uniqueness)
awa::insert_with(&pool, &SendEmail { to: "alice@example.com".into(), subject: "Welcome".into() },
    awa::InsertOpts { unique: Some(awa::UniqueOpts { by_args: true, ..Default::default() }), ..Default::default() },
).await?;

// Cancel by unique key (e.g., when the triggering condition is resolved)
awa::admin::cancel_by_unique_key(&pool, "send_email", None, Some(&serde_json::json!({"to": "alice@example.com", "subject": "Welcome"})), None).await?;

// Start workers with a typed lifecycle hook
let client = Client::builder(pool)
    .queue("default", QueueConfig::default())
    .register_worker(SendEmailWorker)
    .on_event::<SendEmail, _, _>(|event| async move {
        if let awa::JobEvent::Exhausted { args, error, .. } = event {
            tracing::error!(to = %args.to, error = %error, "email job exhausted retries");
        }
    })
    .build()?;
client.start().await?;

Cancellation is cooperative for running handlers:

  • Rust handlers can poll ctx.is_cancelled().
  • Python handlers can poll job.is_cancelled().
  • Shutdown and runtime rescue paths flip that flag.
  • Admin cancel (awa::admin::cancel, client.cancel) updates job state in storage and signals the matching in-flight handler, when that exact running attempt is still alive on a worker process.
  • If a handler ignores the signal or returns too late, stale completion/retry results remain no-ops because the job is already cancelled in storage.

Installation

Python

pip install awa-pg          # SDK: insert, worker, admin, progress
pip install 'awa-pg[ui]'    # SDK + bundled `awa` binary for the dashboard
# or, just the CLI:
pip install awa-cli         # CLI on its own: migrations, queue admin, web UI

pip install awa-pg stays small for workers and producers. The [ui] extra pulls in awa-cli, which ships the awa binary plus the embedded React dashboard; afterwards python -m awa serve (or awa serve directly) launches it.

Rust

[dependencies]
awa = "0.6"

CLI

Available via pip (no Rust toolchain needed) or cargo:

pip install awa-cli
# or: cargo install awa-cli

awa --database-url $DATABASE_URL migrate
awa --database-url $DATABASE_URL serve
awa --database-url $DATABASE_URL queue stats
awa --database-url $DATABASE_URL job list --state failed
awa --database-url $DATABASE_URL job dump 123
awa --database-url $DATABASE_URL job dump-run 123

Architecture

 ┌────────────────┐  ┌────────────────┐
 │ Rust producer  │  │  Python (pip)  │
 └───────┬────────┘  └────────┬───────┘
         └────────┬───────────┘
                  ▼
       ┌──────────────────────────────┐
       │          PostgreSQL          │
       │ ready / deferred entries     │
       │ active leases / attempt_state│
       │ terminal / dlq entries       │
       └──────────────┬───────────────┘
                 │
       ┌─────────┼─────────┐
       ▼         ▼         ▼
   ┌────────┐┌────────┐┌────────┐
   │ Worker ││ Worker ││ Worker │
   │ (Rust) ││ (PyO3) ││ (PyO3) │
   └────────┘└────────┘└────────┘

All coordination through Postgres. The Rust runtime owns dispatch, leases, heartbeats, rescue, rotation, prune, and shutdown for both languages. Mixed Rust and Python workers coexist on the same queues. See architecture overview for full details.

Workspace

Crate Purpose
awa Main crate — re-exports awa-model + awa-worker
awa-model Types, queries, migrations, admin ops
awa-macros #[derive(JobArgs)] proc macro
awa-worker Runtime: dispatch, heartbeat, maintenance
awa-ui Web UI (axum API + embedded React frontend)
awa-cli CLI binary (migrations, admin, serve)
awa-python PyO3 extension module (pip install awa-pg)
awa-testing Test helpers (TestClient)

Documentation

Doc Description
Rust getting started From cargo add to a job reaching completed
Python getting started From pip install to a job reaching completed
Deployment guide Docker, Kubernetes, pool sizing, graceful shutdown
Migration guide Fresh installs, upgrades, extracted SQL, rollback strategy
0.5 → 0.6 upgrade Step-by-step operator checklist for the staged storage transition
Configuration reference QueueConfig, ClientBuilder, Python start(), env vars
Security & Postgres roles Minimum-privilege roles, callback auth, operational guidance
Troubleshooting Stuck running jobs, leader delays, heartbeat timeouts
Architecture overview System design, data flow, state machine, crash recovery
Web UI design API endpoints, pages, component library
Benchmarking notes Methodology, headline numbers, how to run
Validation test plan Full test matrix with 100+ test cases
TLA+ correctness models Formal verification of core invariants
Grafana dashboards Pre-built Prometheus dashboards for monitoring
Architecture Decision Records (ADRs)

See docs/adr/README.md for the index with status and supersession.

License

MIT OR Apache-2.0

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distributions

No source distribution files available for this release.See tutorial on generating distribution archives.

Built Distributions

If you're not sure about the file name format, learn more about wheel file names.

awa_pg-0.6.0a5-cp310-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (10.1 MB view details)

Uploaded CPython 3.10+manylinux: glibc 2.17+ x86-64

awa_pg-0.6.0a5-cp310-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl (10.1 MB view details)

Uploaded CPython 3.10+manylinux: glibc 2.17+ ARM64

awa_pg-0.6.0a5-cp310-abi3-macosx_11_0_arm64.whl (9.2 MB view details)

Uploaded CPython 3.10+macOS 11.0+ ARM64

awa_pg-0.6.0a5-cp310-abi3-macosx_10_12_x86_64.whl (9.6 MB view details)

Uploaded CPython 3.10+macOS 10.12+ x86-64

File details

Details for the file awa_pg-0.6.0a5-cp310-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.

File metadata

File hashes

Hashes for awa_pg-0.6.0a5-cp310-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl
Algorithm Hash digest
SHA256 3ed4683b0b522d5077ba3ea30ed0adea8f0828fee175dc3e60cd607c3b09bdeb
MD5 e291c94eee5e0e9d5897f21bb60bcc21
BLAKE2b-256 691665e46f3afa8094f714d3c00a0f15b5989bec338e04b998bac63dd554265b

See more details on using hashes here.

Provenance

The following attestation bundles were made for awa_pg-0.6.0a5-cp310-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl:

Publisher: release.yml on hardbyte/awa

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file awa_pg-0.6.0a5-cp310-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl.

File metadata

File hashes

Hashes for awa_pg-0.6.0a5-cp310-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl
Algorithm Hash digest
SHA256 dde06cd547183ce2efd8baed702e10cab2a233707e8070182d617163a8bfdaa3
MD5 61fb99d60a23e9d4672a687ed5deb553
BLAKE2b-256 7523b9ba4f40835210cfb0b6f2167cadb5aa74ab22d3643cdb573f123b80ef92

See more details on using hashes here.

Provenance

The following attestation bundles were made for awa_pg-0.6.0a5-cp310-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl:

Publisher: release.yml on hardbyte/awa

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file awa_pg-0.6.0a5-cp310-abi3-macosx_11_0_arm64.whl.

File metadata

File hashes

Hashes for awa_pg-0.6.0a5-cp310-abi3-macosx_11_0_arm64.whl
Algorithm Hash digest
SHA256 1253195fda5d8f91bf458fa55dddb72cf482b3a3be78b5be06d0838b6bdac859
MD5 c28362ef32c78915f78636f50fa33efc
BLAKE2b-256 a142f9d1464a96b2a1da6460e17ef72e369678f96793b166d9981c62560ec38b

See more details on using hashes here.

Provenance

The following attestation bundles were made for awa_pg-0.6.0a5-cp310-abi3-macosx_11_0_arm64.whl:

Publisher: release.yml on hardbyte/awa

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file awa_pg-0.6.0a5-cp310-abi3-macosx_10_12_x86_64.whl.

File metadata

File hashes

Hashes for awa_pg-0.6.0a5-cp310-abi3-macosx_10_12_x86_64.whl
Algorithm Hash digest
SHA256 4a4009532c2be57f03bca0cbbe3127000d8ea4905b99e00edde175101c65899b
MD5 63a5e53a8189a32e169c87627ab29e2b
BLAKE2b-256 4d548d5fe867a6d30cc753bf678fefae278961999d882a5b0c9ab5c28274893e

See more details on using hashes here.

Provenance

The following attestation bundles were made for awa_pg-0.6.0a5-cp310-abi3-macosx_10_12_x86_64.whl:

Publisher: release.yml on hardbyte/awa

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page