Skip to main content

Interloper DBOS integration

Project description

Interloper DBOS Integration

This package provides DBOS integration for Interloper, enabling durable workflow execution with automatic recovery from failures.

Features

  • Durable Execution: Assets are executed as DBOS steps within durable workflows
  • Automatic Recovery: Failed workflows can be resumed from the last successful step
  • Sequential Execution: Assets execute in dependency order with DBOS checkpointing
  • Workflow Management: Support for custom workflow IDs and automatic ID generation

Installation

pip install interloper-dbos

Prerequisites

  • PostgreSQL database for DBOS state storage
  • Python 3.10+

Quick Start

import interloper as il
import datetime as dt
from interloper_dbos import DBOSConfig, DBOSRunner

# Define your pipeline
@il.asset(io=il.FileIO("data/"))
def my_asset(context: il.Context) -> pd.DataFrame:
    return pd.DataFrame({"value": [1, 2, 3]})

# Create DAG
dag = il.DAG(my_asset)

# Configure DBOS
dbos_config = DBOSConfig(
    host="localhost",
    database="interloper_dbos",
    user="postgres",
    password="your_password"
)

# Create runner
runner = DBOSRunner()

# Execute with durability
result = runner.run(dag)

Configuration

DBOSConfig

The DBOSConfig class handles database connection and DBOS settings:

from interloper_dbos import DBOSConfig

config = DBOSConfig(
    host="localhost",           # Database host
    port=5432,                 # Database port (default: 5432)
    database="interloper_dbos", # Database name
    user="postgres",           # Database user
    password="your_password",   # Database password
    max_connections=10,        # Max DB connections (default: 10)
    connection_timeout=30,     # Connection timeout in seconds (default: 30)
    step_timeout=3600,        # Step timeout in seconds (default: 3600)
    max_retries=3,            # Max step retries (default: 3)
)

DBOSRunner

The DBOSRunner extends Interloper's base runner with DBOS durability:

from interloper_dbos import DBOSRunner

runner = DBOSRunner(
    concurrency=10  # The concurrency of the DBOS queue
)

Workflow Recovery

DBOS enables automatic recovery from failures. You can resume workflows using the same workflow ID:

# First execution
result = runner.run(dag, workflow_id="my_pipeline_20250101")

# If it fails, retry with the same workflow ID to resume
result = runner.run(dag, workflow_id="my_pipeline_20250101")

Automatic Workflow ID Generation

If you don't specify a workflow ID, one will be generated automatically:

  • For partitioned assets: {prefix}_{partition}_{timestamp}
  • For partition windows: {prefix}_{start}_{end}_{timestamp}
  • For non-partitioned: {prefix}_{timestamp}_{uuid}

Database Setup

You need a PostgreSQL database for DBOS to store workflow state:

-- Create database
CREATE DATABASE interloper_dbos;

-- Create user (optional)
CREATE USER interloper_user WITH PASSWORD 'your_password';
GRANT ALL PRIVILEGES ON DATABASE interloper_dbos TO interloper_user;

Examples

See the examples/ directory for complete examples:

  • simple_pipeline.py: Basic pipeline with recovery demonstration
  • More examples coming soon...

How It Works

  1. Workflow Creation: Each run() call creates a DBOS workflow
  2. Asset Steps: Each asset becomes a DBOS step within the workflow
  3. Sequential Execution: Assets execute in topological order
  4. Checkpointing: DBOS automatically checkpoints each completed step
  5. Recovery: Failed workflows resume from the last successful step

Comparison with Other Runners

Feature SerialRunner MultiThreadRunner DBOSRunner
Parallelism No Yes No (planned)
Durability No No Yes
Recovery No No Yes
State Storage None None PostgreSQL

Limitations

  • Currently supports sequential execution only (parallel execution planned)
  • Requires PostgreSQL database setup
  • Workflow state is stored in the database (consider cleanup for old workflows)

Contributing

Contributions are welcome! Please see the main Interloper repository for contribution guidelines.

License

This package is part of the Interloper project. See the main repository for license information.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

interloper_dbos-0.2.0rc1.tar.gz (4.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

interloper_dbos-0.2.0rc1-py3-none-any.whl (5.4 kB view details)

Uploaded Python 3

File details

Details for the file interloper_dbos-0.2.0rc1.tar.gz.

File metadata

  • Download URL: interloper_dbos-0.2.0rc1.tar.gz
  • Upload date:
  • Size: 4.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.10.7 {"installer":{"name":"uv","version":"0.10.7","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"macOS","version":null,"id":null,"libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for interloper_dbos-0.2.0rc1.tar.gz
Algorithm Hash digest
SHA256 3ae8a24940420d123540d15377387fda7ccc5e010ef2c193ff39079e805fb5c9
MD5 03906b2330ebaf2074cc0da7fd6b7435
BLAKE2b-256 eed2a5c688b7df2198ec75d6033ad5acb1414714011da08bf4438b6ccaf05091

See more details on using hashes here.

File details

Details for the file interloper_dbos-0.2.0rc1-py3-none-any.whl.

File metadata

  • Download URL: interloper_dbos-0.2.0rc1-py3-none-any.whl
  • Upload date:
  • Size: 5.4 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.10.7 {"installer":{"name":"uv","version":"0.10.7","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"macOS","version":null,"id":null,"libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for interloper_dbos-0.2.0rc1-py3-none-any.whl
Algorithm Hash digest
SHA256 bf1ee6aeedeb2a53b87f306812c8e9aefdc23354311f402c282c625d8b60d503
MD5 f2b82f20c9a88ab8da57e91ced8c1253
BLAKE2b-256 25f3f5cd3dc6e5263e559d8e6ddf666c486380f06e45afdb7b4679a065f24686

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page