Skip to main content

Interloper DBOS integration

Project description

Interloper DBOS Integration

This package provides DBOS integration for Interloper, enabling durable workflow execution with automatic recovery from failures.

Features

  • Durable Execution: Assets are executed as DBOS steps within durable workflows
  • Automatic Recovery: Failed workflows can be resumed from the last successful step
  • Sequential Execution: Assets execute in dependency order with DBOS checkpointing
  • Workflow Management: Support for custom workflow IDs and automatic ID generation

Installation

pip install interloper-dbos

Prerequisites

  • PostgreSQL database for DBOS state storage
  • Python 3.10+

Quick Start

import interloper as il
import datetime as dt
from interloper_dbos import DBOSConfig, DBOSRunner

# Define your pipeline
@il.asset(io=il.FileIO("data/"))
def my_asset(context: il.Context) -> pd.DataFrame:
    return pd.DataFrame({"value": [1, 2, 3]})

# Create DAG
dag = il.DAG(my_asset)

# Configure DBOS
dbos_config = DBOSConfig(
    host="localhost",
    database="interloper_dbos",
    user="postgres",
    password="your_password"
)

# Create runner
runner = DBOSRunner()

# Execute with durability
result = runner.run(dag)

Configuration

DBOSConfig

The DBOSConfig class handles database connection and DBOS settings:

from interloper_dbos import DBOSConfig

config = DBOSConfig(
    host="localhost",           # Database host
    port=5432,                 # Database port (default: 5432)
    database="interloper_dbos", # Database name
    user="postgres",           # Database user
    password="your_password",   # Database password
    max_connections=10,        # Max DB connections (default: 10)
    connection_timeout=30,     # Connection timeout in seconds (default: 30)
    step_timeout=3600,        # Step timeout in seconds (default: 3600)
    max_retries=3,            # Max step retries (default: 3)
)

DBOSRunner

The DBOSRunner extends Interloper's base runner with DBOS durability:

from interloper_dbos import DBOSRunner

runner = DBOSRunner(
    concurrency=10  # The concurrency of the DBOS queue
)

Workflow Recovery

DBOS enables automatic recovery from failures. You can resume workflows using the same workflow ID:

# First execution
result = runner.run(dag, workflow_id="my_pipeline_20250101")

# If it fails, retry with the same workflow ID to resume
result = runner.run(dag, workflow_id="my_pipeline_20250101")

Automatic Workflow ID Generation

If you don't specify a workflow ID, one will be generated automatically:

  • For partitioned assets: {prefix}_{partition}_{timestamp}
  • For partition windows: {prefix}_{start}_{end}_{timestamp}
  • For non-partitioned: {prefix}_{timestamp}_{uuid}

Database Setup

You need a PostgreSQL database for DBOS to store workflow state:

-- Create database
CREATE DATABASE interloper_dbos;

-- Create user (optional)
CREATE USER interloper_user WITH PASSWORD 'your_password';
GRANT ALL PRIVILEGES ON DATABASE interloper_dbos TO interloper_user;

Examples

See the examples/ directory for complete examples:

  • simple_pipeline.py: Basic pipeline with recovery demonstration
  • More examples coming soon...

How It Works

  1. Workflow Creation: Each run() call creates a DBOS workflow
  2. Asset Steps: Each asset becomes a DBOS step within the workflow
  3. Sequential Execution: Assets execute in topological order
  4. Checkpointing: DBOS automatically checkpoints each completed step
  5. Recovery: Failed workflows resume from the last successful step

Comparison with Other Runners

Feature SerialRunner MultiThreadRunner DBOSRunner
Parallelism No Yes No (planned)
Durability No No Yes
Recovery No No Yes
State Storage None None PostgreSQL

Limitations

  • Currently supports sequential execution only (parallel execution planned)
  • Requires PostgreSQL database setup
  • Workflow state is stored in the database (consider cleanup for old workflows)

Contributing

Contributions are welcome! Please see the main Interloper repository for contribution guidelines.

License

This package is part of the Interloper project. See the main repository for license information.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distributions

No source distribution files available for this release.See tutorial on generating distribution archives.

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

interloper_dbos-0.1.0-py3-none-any.whl (5.4 kB view details)

Uploaded Python 3

File details

Details for the file interloper_dbos-0.1.0-py3-none-any.whl.

File metadata

  • Download URL: interloper_dbos-0.1.0-py3-none-any.whl
  • Upload date:
  • Size: 5.4 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.10.7 {"installer":{"name":"uv","version":"0.10.7","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"macOS","version":null,"id":null,"libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for interloper_dbos-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 2fd693e1b5455d944dcd6960073cfb765e59a6270e8ca80a31d51f0d0ad7cf9b
MD5 532541d0f88464fa2a5f9a8101ec709b
BLAKE2b-256 4829bdc74ff23d5e9f254323d65b2458ef7541d5fb331f956893b4e59193a6b1

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page