Skip to main content

Postgres-native background job queue — Python SDK with async/sync workers, transactional enqueue, progress tracking, and web UI

Project description

Awa

Postgres-native background job queue for Rust and Python.

Awa (Māori: river) provides durable, transactional job enqueueing with typed handlers in both Rust and Python. All queue state lives in Postgres — no Redis, no RabbitMQ. The Rust runtime handles polling, heartbeating, crash recovery, and dispatch. Python workers run on that same runtime via PyO3, getting Rust-grade reliability with Python-native ergonomics.

AWA Web UI — Dashboard (dark mode)

Features

  • Postgres-only — one dependency you already have.
  • Transactional enqueue — insert jobs inside your business transaction. Commit = visible. Rollback = gone. Works with Awa's own transactions, your existing psycopg3/asyncpg/SQLAlchemy/Django connection (Python), or tokio-postgres (Rust).
  • Cancel by unique key — cancel scheduled jobs by their insert-time components (kind + args) without storing job IDs.
  • Rust and Python workers — same queues, identical semantics, mixed deployments.
  • Crash recovery — heartbeat + hard deadline rescue. Stale jobs recovered automatically.
  • Web UI — dashboard, job inspector, queue management, cron controls.
  • Structured progress — handlers report percent, message, and checkpoint metadata; persisted across retries.
  • Periodic/cron jobs — leader-elected scheduler with timezone support and atomic enqueue.
  • Webhook callbacks — park jobs for external completion with optional CEL expression filtering.
  • LISTEN/NOTIFY wakeup — sub-10ms pickup latency.
  • OpenTelemetry — 20 built-in metrics (counters, histograms, gauges) for Prometheus/Grafana.
  • Hot/cold storage — runnable work in a hot table, deferred work in a cold table.
  • Rate limiting — per-queue token bucket. Weighted concurrency — global worker pool with per-queue guarantees.

Recent engineering benchmarks range from ~5k Python hot-path jobs/sec on a laptop to ~40k immediately-available Rust inserts/sec on dedicated Postgres hardware, with 4-producer enqueue contention runs above ~100k chunked INSERT/sec. See benchmarking notes for methodology, caveats, and the latest COPY-vs-INSERT results.

Core concurrency invariants (no duplicate processing after rescue, stale completions rejected, shutdown drain ordering) are checked with TLA+ models covering single and multi-instance deployments.

Getting Started

# 1. Install
pip install awa-pg awa-cli     # Python
# or: cargo add awa             # Rust

# 2. Start Postgres and run migrations
awa --database-url $DATABASE_URL migrate

# 3. Write a worker and start processing (see examples below)

# 4. Monitor
awa --database-url $DATABASE_URL serve   # → http://127.0.0.1:3000

Language-specific guides:

Python Example

import awa
import asyncio
from dataclasses import dataclass

@dataclass
class SendEmail:
    to: str
    subject: str

async def main():
    client = awa.AsyncClient("postgres://localhost/mydb")
    await client.migrate()

    @client.worker(SendEmail, queue="email")
    async def handle_email(job):
        print(f"Sending to {job.args.to}: {job.args.subject}")

    await client.insert(
        SendEmail(to="alice@example.com", subject="Welcome"),
        queue="email",
    )

    client.start([("email", 2)])
    await asyncio.sleep(1)
    await client.shutdown()

asyncio.run(main())

Progress tracking — checkpoint and resume on retry:

@client.worker(BatchImport, queue="etl")
async def handle_import(job):
    last_id = (job.progress or {}).get("metadata", {}).get("last_id", 0)
    for item in fetch_items(after=last_id):
        process(item)
        job.set_progress(50, "halfway")
        job.update_metadata({"last_id": item.id})
    await job.flush_progress()

Transactional enqueue — atomic with your business logic:

async with await client.transaction() as tx:
    await tx.execute("INSERT INTO orders (id) VALUES ($1)", order_id)
    await tx.insert(SendEmail(to="alice@example.com", subject="Order confirmed"))

Sync API for Django/Flask — use awa.Client for sync frameworks; all methods are plain (no suffix):

client = awa.Client("postgres://localhost/mydb")
client.migrate()
job = client.insert(SendEmail(to="bob@example.com", subject="Hello"))

ORM transaction bridging — insert jobs within your existing psycopg3, asyncpg, SQLAlchemy, or Django transaction. No separate Awa connection needed:

from awa.bridge import insert_job_sync

# Django
with transaction.atomic():
    Order.objects.create(...)
    insert_job_sync(connection, SendEmail(to="alice@example.com", subject="Order confirmed"))

# SQLAlchemy
with Session(engine) as session, session.begin():
    session.execute(...)
    insert_job_sync(session, SendEmail(to="alice@example.com", subject="Order confirmed"))

See examples/python/ for complete runnable scripts tested in CI.

Rust Example

use awa::{Client, QueueConfig, JobArgs, JobResult, JobError, JobContext, Worker};
use serde::{Serialize, Deserialize};

#[derive(Debug, Serialize, Deserialize, JobArgs)]
struct SendEmail {
    to: String,
    subject: String,
}

struct SendEmailWorker;

#[async_trait::async_trait]
impl Worker for SendEmailWorker {
    fn kind(&self) -> &'static str { "send_email" }

    async fn perform(&self, ctx: &JobContext) -> Result<JobResult, JobError> {
        let args: SendEmail = serde_json::from_value(ctx.job.args.clone())
            .map_err(|e| JobError::terminal(e.to_string()))?;
        send_email(&args.to, &args.subject).await
            .map_err(JobError::retryable)?;
        Ok(JobResult::Completed)
    }
}

// Insert a job (with uniqueness)
awa::insert_with(&pool, &SendEmail { to: "alice@example.com".into(), subject: "Welcome".into() },
    awa::InsertOpts { unique: Some(awa::UniqueOpts { by_args: true, ..Default::default() }), ..Default::default() },
).await?;

// Cancel by unique key (e.g., when the triggering condition is resolved)
awa::admin::cancel_by_unique_key(&pool, "send_email", None, Some(&serde_json::json!({"to": "alice@example.com", "subject": "Welcome"})), None).await?;

// Start workers with a typed lifecycle hook
let client = Client::builder(pool)
    .queue("default", QueueConfig::default())
    .register_worker(SendEmailWorker)
    .on_event::<SendEmail, _, _>(|event| async move {
        if let awa::JobEvent::Exhausted { args, error, .. } = event {
            tracing::error!(to = %args.to, error = %error, "email job exhausted retries");
        }
    })
    .build()?;
client.start().await?;

tokio-postgres bridge — insert jobs within existing tokio-postgres transactions (see bridge adapters):

use awa::bridge::tokio_pg; // requires features = ["tokio-postgres"]

// pg_client is a &mut tokio_postgres::Client (not the Awa Client above)
let txn = pg_client.transaction().await?;
txn.execute("INSERT INTO orders ...", &[&order_id]).await?;
tokio_pg::insert_job(&txn, &SendEmail { to: "alice@example.com".into(), subject: "Confirmed".into() }).await?;
txn.commit().await?;

Installation

Python

pip install awa-pg       # SDK: insert, worker, admin, progress
pip install awa-cli      # CLI: migrations, queue admin, web UI

Rust

[dependencies]
awa = "0.4"

CLI

Available via pip (no Rust toolchain needed) or cargo:

pip install awa-cli
# or: cargo install awa-cli

awa --database-url $DATABASE_URL migrate
awa --database-url $DATABASE_URL serve
awa --database-url $DATABASE_URL queue stats
awa --database-url $DATABASE_URL job list --state failed

Architecture

 ┌────────────────┐  ┌────────────────┐
 │ Rust producer  │  │  Python (pip)  │
 └───────┬────────┘  └────────┬───────┘
         └────────┬───────────┘
                  ▼
       ┌────────────────────┐
       │     PostgreSQL     │
       │     jobs_hot       │
       │     scheduled_jobs │
       └─────────┬──────────┘
                 │
       ┌─────────┼─────────┐
       ▼         ▼         ▼
   ┌────────┐┌────────┐┌────────┐
   │ Worker ││ Worker ││ Worker │
   │ (Rust) ││ (PyO3) ││ (PyO3) │
   └────────┘└────────┘└────────┘

All coordination through Postgres. The Rust runtime owns polling, heartbeats, shutdown, and crash recovery for both languages. Mixed Rust and Python workers coexist on the same queues. See architecture overview for full details.

Workspace

Crate Purpose
awa Main crate — re-exports awa-model + awa-worker
awa-model Types, queries, migrations, admin ops
awa-macros #[derive(JobArgs)] proc macro
awa-worker Runtime: dispatch, heartbeat, maintenance
awa-ui Web UI (axum API + embedded React frontend)
awa-cli CLI binary (migrations, admin, serve)
awa-python PyO3 extension module (pip install awa-pg)
awa-testing Test helpers (TestClient)

Documentation

Doc Description
Rust getting started From cargo add to a job reaching completed
Python getting started From pip install to a job reaching completed
Deployment guide Docker, Kubernetes, pool sizing, graceful shutdown
Migration guide Fresh installs, upgrades, extracted SQL, rollback strategy
Bridge adapters tokio-postgres, psycopg3, asyncpg, SQLAlchemy, Django bridging
Configuration reference QueueConfig, ClientBuilder, Python start(), env vars
Troubleshooting Stuck running jobs, leader delays, heartbeat timeouts
Architecture overview System design, data flow, state machine, crash recovery
Web UI design API endpoints, pages, component library
Benchmarking notes Methodology, headline numbers, how to run
Validation test plan Full test matrix with 100+ test cases
TLA+ correctness models Formal verification of core invariants
Grafana dashboards Pre-built Prometheus dashboards for monitoring
Architecture Decision Records (ADRs)

License

MIT OR Apache-2.0

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distributions

No source distribution files available for this release.See tutorial on generating distribution archives.

Built Distributions

If you're not sure about the file name format, learn more about wheel file names.

awa_pg-0.4.1-cp312-cp312-macosx_11_0_arm64.whl (5.5 MB view details)

Uploaded CPython 3.12macOS 11.0+ ARM64

awa_pg-0.4.1-cp312-cp312-macosx_10_12_x86_64.whl (5.7 MB view details)

Uploaded CPython 3.12macOS 10.12+ x86-64

awa_pg-0.4.1-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (6.1 MB view details)

Uploaded CPython 3.8manylinux: glibc 2.17+ x86-64

awa_pg-0.4.1-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl (6.1 MB view details)

Uploaded CPython 3.8manylinux: glibc 2.17+ ARM64

File details

Details for the file awa_pg-0.4.1-cp312-cp312-macosx_11_0_arm64.whl.

File metadata

File hashes

Hashes for awa_pg-0.4.1-cp312-cp312-macosx_11_0_arm64.whl
Algorithm Hash digest
SHA256 6642ed1c1ed8f09d64f726d04f3a035a23eb9619d8ad03afd3a4bd45759fa2a9
MD5 112f00c0f78ca99015abba4527a0eb66
BLAKE2b-256 e71403edf2febeaa5f1e51568125b0804a952fe7da25543d52298d8f04b8e28e

See more details on using hashes here.

Provenance

The following attestation bundles were made for awa_pg-0.4.1-cp312-cp312-macosx_11_0_arm64.whl:

Publisher: release.yml on hardbyte/awa

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file awa_pg-0.4.1-cp312-cp312-macosx_10_12_x86_64.whl.

File metadata

File hashes

Hashes for awa_pg-0.4.1-cp312-cp312-macosx_10_12_x86_64.whl
Algorithm Hash digest
SHA256 9d4e1b4f446f424e7fe171c562220975e13d7bfc25c45e55c5d0c8e4e0af8805
MD5 c896a3531706519d0683bebc68396c28
BLAKE2b-256 b68eb5802e83f62eda368c18cb1a4e7f8ba63d68097d96042c778fbd442c17d8

See more details on using hashes here.

Provenance

The following attestation bundles were made for awa_pg-0.4.1-cp312-cp312-macosx_10_12_x86_64.whl:

Publisher: release.yml on hardbyte/awa

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file awa_pg-0.4.1-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.

File metadata

File hashes

Hashes for awa_pg-0.4.1-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl
Algorithm Hash digest
SHA256 c0191968918624b712c716bede72391f732107bc2ae4535cfe1a169d860143a4
MD5 3cbbda51a5313ac6f5f68b362eb50157
BLAKE2b-256 09a4ab02205c70a26d7350ae059a32f7f87f9c2f5b5271aa4f275ed00ec0a840

See more details on using hashes here.

Provenance

The following attestation bundles were made for awa_pg-0.4.1-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl:

Publisher: release.yml on hardbyte/awa

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file awa_pg-0.4.1-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl.

File metadata

File hashes

Hashes for awa_pg-0.4.1-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl
Algorithm Hash digest
SHA256 5e0bcbfdf6ebf79468e4ce948b8a7e872c117527f097bf3539235922b683b346
MD5 6dd19a35410477d83f4f23fbe38e724d
BLAKE2b-256 04c25b5d7eeef608a919f7471e6a3bfe07b762c4c14c9df987b1520d09fee193

See more details on using hashes here.

Provenance

The following attestation bundles were made for awa_pg-0.4.1-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl:

Publisher: release.yml on hardbyte/awa

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page