Skip to main content

Postgres-native background job queue — Python SDK with async/sync workers, transactional enqueue, progress tracking, and web UI

Project description

Awa

Postgres-native background job queue for Rust and Python.

Awa (Māori: river) provides durable, transactional job enqueueing with typed handlers in both Rust and Python. All queue state lives in Postgres — no Redis, no RabbitMQ. The Rust runtime handles polling, heartbeating, crash recovery, and dispatch. Python workers run on that same runtime via PyO3, getting Rust-grade reliability with Python-native ergonomics.

AWA Web UI — Jobs (dark mode)

Features

  • Postgres-only — one dependency you already have.
  • Transactional enqueue — insert jobs inside your business transaction. Commit = visible. Rollback = gone.
  • Cancel by unique key — cancel scheduled jobs by their insert-time components (kind + args) without storing job IDs.
  • Rust and Python workers — same queues, identical semantics, mixed deployments.
  • Crash recovery — heartbeat + hard deadline rescue. Stale jobs recovered automatically.
  • Web UI — dashboard, job inspector, queue management, cron controls.
  • Structured progress — handlers report percent, message, and checkpoint metadata; persisted across retries.
  • Periodic/cron jobs — leader-elected scheduler with timezone support and atomic enqueue.
  • Storage transition prep — explicit status/prepare/abort surfaces for future storage-engine upgrades without changing the current canonical runtime.
  • Webhook callbacks — park jobs for external completion with optional CEL expression filtering.
  • Sequential callbackswait_for_callback() suspends a handler mid-execution; resume_external() wakes it with a payload. Enables multi-step orchestration within a single handler.
  • HTTP Worker — feature-gated Worker that dispatches jobs to serverless functions (Lambda, Cloud Run) via HTTP with HMAC-blake3 callback auth.
  • LISTEN/NOTIFY wakeup — sub-10ms pickup latency.
  • Production alerting metrics — queue depth, lag, and wait-duration histogram via OpenTelemetry.
  • OpenTelemetry — 20+ built-in metrics (counters, histograms, gauges) for Prometheus/Grafana.
  • Hot/cold storage — runnable work in a hot table, deferred work in a cold table.
  • Rate limiting — per-queue token bucket.
  • Weighted concurrency — global worker pool with per-queue guarantees.
  • Operator descriptors — code-declared queue and job-kind names/descriptions with stale/drift visibility in the UI.

AWA Web UI — Queue detail (dark mode)

Local benchmarks show ~5.6k jobs/sec sustained throughput (Rust workers, with OTel metrics enabled), ~3.1k jobs/sec (Python workers), and sub-10ms p50 pickup latency. Enqueue throughput reaches ~30k/s single-producer, ~100k/s multi-producer. See benchmarking notes for methodology and caveats.

Core concurrency invariants (no duplicate processing after rescue, stale completions rejected, shutdown drain ordering) are checked with TLA+ models covering single and multi-instance deployments.

Getting Started

The quickest way to reason about Awa is:

  • your app inserts a durable job row inside Postgres, often in the same transaction as business data
  • workers claim runnable rows, heartbeat while they are executing, and safely rescue stale work after crashes
  • the job row itself is the source of truth for state, progress, callbacks, and retry history
  • operators inspect and control that state through the CLI or the built-in UI

If you keep that model in mind, the APIs make more sense: enqueue work, run workers, then inspect the resulting row rather than guessing what happened in memory.

# 1. Install
pip install awa-pg awa-cli     # Python
# or: cargo add awa             # Rust

# 2. Start Postgres and run migrations
awa --database-url $DATABASE_URL migrate

# 3. Write a worker and start processing (see examples below)

# 4. Monitor
awa --database-url $DATABASE_URL serve   # → http://127.0.0.1:3000
awa --database-url $DATABASE_URL storage status
awa --database-url $DATABASE_URL job dump 123
awa --database-url $DATABASE_URL job dump-run 123

Language-specific guides:

Python Example

import awa
import asyncio
from dataclasses import dataclass

@dataclass
class SendEmail:
    to: str
    subject: str

async def main():
    client = awa.AsyncClient("postgres://localhost/mydb")
    await client.migrate()

    @client.task(SendEmail, queue="email")
    async def handle_email(job):
        print(f"Sending to {job.args.to}: {job.args.subject}")

    await client.insert(
        SendEmail(to="alice@example.com", subject="Welcome"),
        queue="email",
    )

    client.start([("email", 2)])
    await asyncio.sleep(1)
    await client.shutdown()

asyncio.run(main())

Progress tracking — checkpoint and resume on retry:

@client.task(BatchImport, queue="etl")
async def handle_import(job):
    last_id = (job.progress or {}).get("metadata", {}).get("last_id", 0)
    for item in fetch_items(after=last_id):
        process(item)
        job.set_progress(50, "halfway")
        job.update_metadata({"last_id": item.id})
    await job.flush_progress()

Transactional enqueue — atomic with your business logic:

async with await client.transaction() as tx:
    await tx.execute("INSERT INTO orders (id) VALUES ($1)", order_id)
    await tx.insert(SendEmail(to="alice@example.com", subject="Order confirmed"))

Sync API for Django/Flask — use awa.Client for sync frameworks; all methods are plain (no suffix):

client = awa.Client("postgres://localhost/mydb")
client.migrate()
job = client.insert(SendEmail(to="bob@example.com", subject="Hello"))

Sequential callbacks — suspend a handler, wait for an external system, then resume:

@client.task(ProcessPayment, queue="payments")
async def handle_payment(job):
    token = await job.register_callback(timeout_seconds=3600)
    send_to_payment_gateway(token.id, job.args.amount)
    result = await job.wait_for_callback(token)
    # result contains the payload from resume_external()
    await record_payment(job.args.order_id, result)

The external system calls await client.resume_external(callback_id, {"status": "paid"}) to wake the handler.

Periodic jobs — leader-elected cron scheduling with timezone support:

client.periodic(
    "daily_report", "0 9 * * *",
    GenerateReport, GenerateReport(format="pdf"),
    timezone="Pacific/Auckland",
)

6-field expressions with seconds precision are also supported: "*/15 * * * * *" fires every 15 seconds.

See examples/python/ for complete runnable scripts tested in CI.

Rust Example

use awa::{Client, QueueConfig, JobArgs, JobResult, JobError, JobContext, Worker};
use serde::{Serialize, Deserialize};

#[derive(Debug, Serialize, Deserialize, JobArgs)]
struct SendEmail {
    to: String,
    subject: String,
}

struct SendEmailWorker;

#[async_trait::async_trait]
impl Worker for SendEmailWorker {
    fn kind(&self) -> &'static str { "send_email" }

    async fn perform(&self, ctx: &JobContext) -> Result<JobResult, JobError> {
        let args: SendEmail = serde_json::from_value(ctx.job.args.clone())
            .map_err(|e| JobError::terminal(e.to_string()))?;
        send_email(&args.to, &args.subject).await
            .map_err(JobError::retryable)?;
        Ok(JobResult::Completed)
    }
}

// Insert a job (with uniqueness)
awa::insert_with(&pool, &SendEmail { to: "alice@example.com".into(), subject: "Welcome".into() },
    awa::InsertOpts { unique: Some(awa::UniqueOpts { by_args: true, ..Default::default() }), ..Default::default() },
).await?;

// Cancel by unique key (e.g., when the triggering condition is resolved)
awa::admin::cancel_by_unique_key(&pool, "send_email", None, Some(&serde_json::json!({"to": "alice@example.com", "subject": "Welcome"})), None).await?;

// Start workers with a typed lifecycle hook
let client = Client::builder(pool)
    .queue("default", QueueConfig::default())
    .register_worker(SendEmailWorker)
    .on_event::<SendEmail, _, _>(|event| async move {
        if let awa::JobEvent::Exhausted { args, error, .. } = event {
            tracing::error!(to = %args.to, error = %error, "email job exhausted retries");
        }
    })
    .build()?;
client.start().await?;

Installation

Python

pip install awa-pg       # SDK: insert, worker, admin, progress
pip install awa-cli      # CLI: migrations, queue admin, web UI

Rust

[dependencies]
awa = "0.5"

CLI

Available via pip (no Rust toolchain needed) or cargo:

pip install awa-cli
# or: cargo install awa-cli

awa --database-url $DATABASE_URL migrate
awa --database-url $DATABASE_URL serve
awa --database-url $DATABASE_URL queue stats
awa --database-url $DATABASE_URL job list --state failed
awa --database-url $DATABASE_URL job dump 123
awa --database-url $DATABASE_URL job dump-run 123

Architecture

 ┌────────────────┐  ┌────────────────┐
 │ Rust producer  │  │  Python (pip)  │
 └───────┬────────┘  └────────┬───────┘
         └────────┬───────────┘
                  ▼
       ┌────────────────────┐
       │     PostgreSQL     │
       │     jobs_hot       │
       │     scheduled_jobs │
       └─────────┬──────────┘
                 │
       ┌─────────┼─────────┐
       ▼         ▼         ▼
   ┌────────┐┌────────┐┌────────┐
   │ Worker ││ Worker ││ Worker │
   │ (Rust) ││ (PyO3) ││ (PyO3) │
   └────────┘└────────┘└────────┘

All coordination through Postgres. The Rust runtime owns polling, heartbeats, shutdown, and crash recovery for both languages. Mixed Rust and Python workers coexist on the same queues. See architecture overview for full details.

Workspace

Crate Purpose
awa Main crate — re-exports awa-model + awa-worker
awa-model Types, queries, migrations, admin ops
awa-macros #[derive(JobArgs)] proc macro
awa-worker Runtime: dispatch, heartbeat, maintenance
awa-ui Web UI (axum API + embedded React frontend)
awa-cli CLI binary (migrations, admin, serve)
awa-python PyO3 extension module (pip install awa-pg)
awa-testing Test helpers (TestClient)

Documentation

Doc Description
Rust getting started From cargo add to a job reaching completed
Python getting started From pip install to a job reaching completed
Deployment guide Docker, Kubernetes, pool sizing, graceful shutdown
Migration guide Fresh installs, upgrades, extracted SQL, rollback strategy
Configuration reference QueueConfig, ClientBuilder, Python start(), env vars
Security & Postgres roles Minimum-privilege roles, callback auth, operational guidance
Troubleshooting Stuck running jobs, leader delays, heartbeat timeouts
Architecture overview System design, data flow, state machine, crash recovery
Web UI design API endpoints, pages, component library
Benchmarking notes Methodology, headline numbers, how to run
Validation test plan Full test matrix with 100+ test cases
TLA+ correctness models Formal verification of core invariants
Grafana dashboards Pre-built Prometheus dashboards for monitoring
Architecture Decision Records (ADRs)

License

MIT OR Apache-2.0

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distributions

No source distribution files available for this release.See tutorial on generating distribution archives.

Built Distributions

If you're not sure about the file name format, learn more about wheel file names.

awa_pg-0.5.5a0-cp310-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (6.3 MB view details)

Uploaded CPython 3.10+manylinux: glibc 2.17+ x86-64

awa_pg-0.5.5a0-cp310-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl (6.4 MB view details)

Uploaded CPython 3.10+manylinux: glibc 2.17+ ARM64

awa_pg-0.5.5a0-cp310-abi3-macosx_11_0_arm64.whl (5.7 MB view details)

Uploaded CPython 3.10+macOS 11.0+ ARM64

awa_pg-0.5.5a0-cp310-abi3-macosx_10_12_x86_64.whl (5.9 MB view details)

Uploaded CPython 3.10+macOS 10.12+ x86-64

File details

Details for the file awa_pg-0.5.5a0-cp310-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.

File metadata

File hashes

Hashes for awa_pg-0.5.5a0-cp310-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl
Algorithm Hash digest
SHA256 b5fc697ed2c5d6cf309bf1e77533b40a71705c72a25c2a77ff3fb8215130d6fe
MD5 b90c64190288f7b8fc9abee0f6b169e0
BLAKE2b-256 e3bee4c834e9558947835e57ef5e7ca27950bf0ecb858e8b961e2379da671dc1

See more details on using hashes here.

Provenance

The following attestation bundles were made for awa_pg-0.5.5a0-cp310-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl:

Publisher: release.yml on hardbyte/awa

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file awa_pg-0.5.5a0-cp310-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl.

File metadata

File hashes

Hashes for awa_pg-0.5.5a0-cp310-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl
Algorithm Hash digest
SHA256 2e705f920740fe5c2d704663a3352e6ec1c5e3f4d889881fb90583c63dd1d6b1
MD5 c0b82957f694b6dc6484d88195388e8c
BLAKE2b-256 6b53f0a872128395b4117edc7727f3b185653419f6fb81cb50a17adca9485bfa

See more details on using hashes here.

Provenance

The following attestation bundles were made for awa_pg-0.5.5a0-cp310-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl:

Publisher: release.yml on hardbyte/awa

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file awa_pg-0.5.5a0-cp310-abi3-macosx_11_0_arm64.whl.

File metadata

File hashes

Hashes for awa_pg-0.5.5a0-cp310-abi3-macosx_11_0_arm64.whl
Algorithm Hash digest
SHA256 70cb47aa2b11152b7cf633958f4185ab5845cdc1f913cb7887cee513eae2745b
MD5 e8d408d0556c3d556607efb4924a30e4
BLAKE2b-256 a95067cf2aa5a28dcdcb2c084b080a7f1d391f97bcb7978db9bc170339ded5f2

See more details on using hashes here.

Provenance

The following attestation bundles were made for awa_pg-0.5.5a0-cp310-abi3-macosx_11_0_arm64.whl:

Publisher: release.yml on hardbyte/awa

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file awa_pg-0.5.5a0-cp310-abi3-macosx_10_12_x86_64.whl.

File metadata

File hashes

Hashes for awa_pg-0.5.5a0-cp310-abi3-macosx_10_12_x86_64.whl
Algorithm Hash digest
SHA256 f3d33cd1c82dbee34af58968be8934b0d4a209fb05abab93b1bcfb465404ed11
MD5 5b205e66f6591aa6fbd265bdf8d65b96
BLAKE2b-256 db2f0c7c3ffe49bb351ee03726aa2a3bf7813c91307380ddf1f18b87b54d195f

See more details on using hashes here.

Provenance

The following attestation bundles were made for awa_pg-0.5.5a0-cp310-abi3-macosx_10_12_x86_64.whl:

Publisher: release.yml on hardbyte/awa

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page