Skip to main content

Python SDK for Absurd - PostgreSQL-based durable task execution

Project description

Absurd SDK for Python

Python SDK for Absurd: a PostgreSQL-based durable task execution system.

Absurd is the simplest durable execution workflow system you can think of. It's entirely based on Postgres and nothing else. It's almost as easy to use as a queue, but it handles scheduling and retries, and it does all of that without needing any other services to run in addition to Postgres.

What is Durable Execution?

Durable execution (or durable workflows) is a way to run long-lived, reliable functions that can survive crashes, restarts, and network failures without losing state or duplicating work. Instead of running your logic in memory, a durable execution system decomposes a task into smaller pieces (step functions) and records every step and decision.

Installation

uv add absurd-sdk

Synchronous API

If you omit the connection argument, the client uses ABSURD_DATABASE_URL, then PGDATABASE, then postgresql://localhost/absurd.

from absurd_sdk import Absurd

app = Absurd("postgresql://localhost/absurd")

@app.register_task(name="order-fulfillment")
def process_order(params, ctx):
    step = ctx.run_step

    @step("process-payment")
    def payment():
        return {
            "payment_id": f"pay-{params['order_id']}",
            "amount": params["amount"],
        }

    @step("reserve-inventory")
    def inventory():
        return {"reserved_items": params["items"]}

    shipment = ctx.await_event(f"shipment.packed:{params['order_id']}")

    @step("send-notification")
    def notification():
        return {
            "sent_to": params["email"],
            "tracking_number": shipment["tracking_number"],
        }

    return {
        "order_id": params["order_id"],
        "payment": payment,
        "inventory": inventory,
        "tracking_number": shipment["tracking_number"],
        "notification": notification,
    }

app.start_worker()

Asynchronous API

from absurd_sdk import AsyncAbsurd

app = AsyncAbsurd("postgresql://localhost/absurd")

@app.register_task(name="order-fulfillment")
async def process_order(params, ctx):
    async def process_payment():
        return {
            "payment_id": f"pay-{params['order_id']}",
            "amount": params["amount"],
        }

    payment = await ctx.step("process-payment", process_payment)

    async def reserve_inventory():
        return {"reserved_items": params["items"]}

    inventory = await ctx.step("reserve-inventory", reserve_inventory)

    shipment = await ctx.await_event(f"shipment.packed:{params['order_id']}")

    async def send_notification():
        return {
            "sent_to": params["email"],
            "tracking_number": shipment["tracking_number"],
        }

    notification = await ctx.step("send-notification", send_notification)

    return {
        "order_id": params["order_id"],
        "payment": payment,
        "inventory": inventory,
        "tracking_number": shipment["tracking_number"],
        "notification": notification,
    }

await app.start_worker()

For async tasks there is no decorator shortcut yet, but the pattern is the same: define a zero-argument async def helper and pass it to await ctx.step("step-name", helper).

Using @step in synchronous tasks

Because Python lambda is limited to a single expression, a nice pattern for sync code is to alias ctx.run_step as step and then use @step(...):

@app.register_task(name="my-task")
def my_task(params, ctx):
    step = ctx.run_step

    # Define and run a step in one go
    @step()
    def fetch_data():
        return {"result": 42}

    # fetch_data is now the return value ({"result": 42}), not a function
    print(fetch_data)  # {"result": 42}

    @step("transform-data")
    def transformed():
        return {"value": fetch_data["result"] * 2}

    return transformed

The decorator is only implemented for synchronous tasks. In asynchronous tasks use async def helpers with await ctx.step(...).

Decomposed Steps

When you need to split step handling into two phases (for instance around an external loop), use begin_step() / complete_step():

@app.register_task(name="agent-turn")
def agent_turn(params, ctx):
    handle = ctx.begin_step("persist-turn")
    if handle.done:
        persisted = handle.state
    else:
        payload = {"turn": params["turn"]}
        persisted = ctx.complete_step(handle, payload)

    return {"persisted": persisted}

The async API provides the same methods as await ctx.begin_step(...) and await ctx.complete_step(...).

Spawning from a Separate Process

If a task is not registered in the current process, spawn() requires an explicit queue=... for safety. In that case, task-level defaults from register_task(...) are unavailable; spawn options (or client defaults) are used.

Task Result Snapshots

You can inspect or await a task's result state. Both methods return a TaskResultSnapshot dataclass:

snapshot = app.fetch_task_result(task_id)
if snapshot is not None:
    print(snapshot.state, snapshot.result, snapshot.failure)

final = app.await_task_result(task_id, timeout=30)
if final.state == "completed":
    print(final.result)

From inside a task handler, TaskContext / AsyncTaskContext also provide await_task_result(...) so parent tasks can durably wait for child tasks.

License and Links

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

absurd_sdk-0.4.0.tar.gz (14.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

absurd_sdk-0.4.0-py3-none-any.whl (15.4 kB view details)

Uploaded Python 3

File details

Details for the file absurd_sdk-0.4.0.tar.gz.

File metadata

  • Download URL: absurd_sdk-0.4.0.tar.gz
  • Upload date:
  • Size: 14.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for absurd_sdk-0.4.0.tar.gz
Algorithm Hash digest
SHA256 2c17c81c9ead963c8596fc65e0a68ad1a4bd5b917745a5b2390d0e962f78dd02
MD5 cb9b3fcb8d45159896b023614b490956
BLAKE2b-256 1cbcdb387fbe419df975ac674d1c7926b19574c33f0afe5a8fa3a30f656c2d18

See more details on using hashes here.

Provenance

The following attestation bundles were made for absurd_sdk-0.4.0.tar.gz:

Publisher: build.yml on earendil-works/absurd

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file absurd_sdk-0.4.0-py3-none-any.whl.

File metadata

  • Download URL: absurd_sdk-0.4.0-py3-none-any.whl
  • Upload date:
  • Size: 15.4 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for absurd_sdk-0.4.0-py3-none-any.whl
Algorithm Hash digest
SHA256 e2bc7e0abfd6b293e07594905039636fcd77ef2e1481b98954d7fe3b344a0dab
MD5 dc86a4ec94a74e2bcd3fa6744bdf1e65
BLAKE2b-256 8d8e472b63aaa20306e734d45983dea61aefd202fa5e4fc4aa99d51b081c0dac

See more details on using hashes here.

Provenance

The following attestation bundles were made for absurd_sdk-0.4.0-py3-none-any.whl:

Publisher: build.yml on earendil-works/absurd

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page