Skip to main content

Python SDK for Absurd - PostgreSQL-based durable task execution

Project description

Absurd SDK for Python

Python SDK for Absurd: a PostgreSQL-based durable task execution system.

Absurd is the simplest durable execution workflow system you can think of. It's entirely based on Postgres and nothing else. It's almost as easy to use as a queue, but it handles scheduling and retries, and it does all of that without needing any other services to run in addition to Postgres.

What is Durable Execution?

Durable execution (or durable workflows) is a way to run long-lived, reliable functions that can survive crashes, restarts, and network failures without losing state or duplicating work. Instead of running your logic in memory, a durable execution system decomposes a task into smaller pieces (step functions) and records every step and decision.

Installation

uv add absurd-sdk

Synchronous API

If you omit the connection argument, the client uses ABSURD_DATABASE_URL, then PGDATABASE, then postgresql://localhost/absurd.

from absurd_sdk import Absurd

app = Absurd("postgresql://localhost/absurd")

@app.register_task(name="order-fulfillment")
def process_order(params, ctx):
    step = ctx.run_step

    @step("process-payment")
    def payment():
        return {
            "payment_id": f"pay-{params['order_id']}",
            "amount": params["amount"],
        }

    @step("reserve-inventory")
    def inventory():
        return {"reserved_items": params["items"]}

    shipment = ctx.await_event(f"shipment.packed:{params['order_id']}")

    @step("send-notification")
    def notification():
        return {
            "sent_to": params["email"],
            "tracking_number": shipment["tracking_number"],
        }

    return {
        "order_id": params["order_id"],
        "payment": payment,
        "inventory": inventory,
        "tracking_number": shipment["tracking_number"],
        "notification": notification,
    }

app.start_worker()

Asynchronous API

from absurd_sdk import AsyncAbsurd

app = AsyncAbsurd("postgresql://localhost/absurd")

@app.register_task(name="order-fulfillment")
async def process_order(params, ctx):
    async def process_payment():
        return {
            "payment_id": f"pay-{params['order_id']}",
            "amount": params["amount"],
        }

    payment = await ctx.step("process-payment", process_payment)

    async def reserve_inventory():
        return {"reserved_items": params["items"]}

    inventory = await ctx.step("reserve-inventory", reserve_inventory)

    shipment = await ctx.await_event(f"shipment.packed:{params['order_id']}")

    async def send_notification():
        return {
            "sent_to": params["email"],
            "tracking_number": shipment["tracking_number"],
        }

    notification = await ctx.step("send-notification", send_notification)

    return {
        "order_id": params["order_id"],
        "payment": payment,
        "inventory": inventory,
        "tracking_number": shipment["tracking_number"],
        "notification": notification,
    }

await app.start_worker()

For async tasks there is no decorator shortcut yet, but the pattern is the same: define a zero-argument async def helper and pass it to await ctx.step("step-name", helper).

Using @step in synchronous tasks

Because Python lambda is limited to a single expression, a nice pattern for sync code is to alias ctx.run_step as step and then use @step(...):

@app.register_task(name="my-task")
def my_task(params, ctx):
    step = ctx.run_step

    # Define and run a step in one go
    @step()
    def fetch_data():
        return {"result": 42}

    # fetch_data is now the return value ({"result": 42}), not a function
    print(fetch_data)  # {"result": 42}

    @step("transform-data")
    def transformed():
        return {"value": fetch_data["result"] * 2}

    return transformed

The decorator is only implemented for synchronous tasks. In asynchronous tasks use async def helpers with await ctx.step(...).

Decomposed Steps

When you need to split step handling into two phases (for instance around an external loop), use begin_step() / complete_step():

@app.register_task(name="agent-turn")
def agent_turn(params, ctx):
    handle = ctx.begin_step("persist-turn")
    if handle.done:
        persisted = handle.state
    else:
        payload = {"turn": params["turn"]}
        persisted = ctx.complete_step(handle, payload)

    return {"persisted": persisted}

The async API provides the same methods as await ctx.begin_step(...) and await ctx.complete_step(...).

Task Result Snapshots

You can inspect or await a task's result state. Both methods return a TaskResultSnapshot dataclass:

snapshot = app.fetch_task_result(task_id)
if snapshot is not None:
    print(snapshot.state, snapshot.result, snapshot.failure)

final = app.await_task_result(task_id, timeout=30)
if final.state == "completed":
    print(final.result)

From inside a task handler, TaskContext / AsyncTaskContext also provide await_task_result(...) so parent tasks can durably wait for child tasks.

License and Links

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

absurd_sdk-0.3.0.tar.gz (12.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

absurd_sdk-0.3.0-py3-none-any.whl (13.5 kB view details)

Uploaded Python 3

File details

Details for the file absurd_sdk-0.3.0.tar.gz.

File metadata

  • Download URL: absurd_sdk-0.3.0.tar.gz
  • Upload date:
  • Size: 12.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for absurd_sdk-0.3.0.tar.gz
Algorithm Hash digest
SHA256 4fa1c794e483f5b8893e86ac0a68ff005b0d059a14c4c93743f9ed45625ee206
MD5 3b12e2a4f5004255ce2892aad7e3d60b
BLAKE2b-256 42d74de5915ffa3f644cbf8353c4bdddc52ed57c57bc0cb4e6ff9430b7a2e2f3

See more details on using hashes here.

Provenance

The following attestation bundles were made for absurd_sdk-0.3.0.tar.gz:

Publisher: build.yml on earendil-works/absurd

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file absurd_sdk-0.3.0-py3-none-any.whl.

File metadata

  • Download URL: absurd_sdk-0.3.0-py3-none-any.whl
  • Upload date:
  • Size: 13.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for absurd_sdk-0.3.0-py3-none-any.whl
Algorithm Hash digest
SHA256 0aacedfd89ea27d96a77666ca42030e6c8cedc321c05265f607569f7a5bd1f9d
MD5 acae08ed305af1a5349115efe8fe78e3
BLAKE2b-256 728af1230ece749e844ad455f18771146544936b1fc405eeecf9d4cdb8f5c698

See more details on using hashes here.

Provenance

The following attestation bundles were made for absurd_sdk-0.3.0-py3-none-any.whl:

Publisher: build.yml on earendil-works/absurd

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page