Interloper DBOS integration
Project description
Interloper DBOS Integration
This package provides DBOS integration for Interloper, enabling durable workflow execution with automatic recovery from failures.
Features
- Durable Execution: Assets are executed as DBOS steps within durable workflows
- Automatic Recovery: Failed workflows can be resumed from the last successful step
- Sequential Execution: Assets execute in dependency order with DBOS checkpointing
- Workflow Management: Support for custom workflow IDs and automatic ID generation
Installation
pip install interloper-dbos
Prerequisites
- PostgreSQL database for DBOS state storage
- Python 3.10+
Quick Start
import interloper as il
import datetime as dt
from interloper_dbos import DBOSConfig, DBOSRunner
# Define your pipeline
@il.asset(io=il.FileIO("data/"))
def my_asset(context: il.Context) -> pd.DataFrame:
return pd.DataFrame({"value": [1, 2, 3]})
# Create DAG
dag = il.DAG(my_asset)
# Configure DBOS
dbos_config = DBOSConfig(
host="localhost",
database="interloper_dbos",
user="postgres",
password="your_password"
)
# Create runner
runner = DBOSRunner()
# Execute with durability
result = runner.run(dag)
Configuration
DBOSConfig
The DBOSConfig class handles database connection and DBOS settings:
from interloper_dbos import DBOSConfig
config = DBOSConfig(
host="localhost", # Database host
port=5432, # Database port (default: 5432)
database="interloper_dbos", # Database name
user="postgres", # Database user
password="your_password", # Database password
max_connections=10, # Max DB connections (default: 10)
connection_timeout=30, # Connection timeout in seconds (default: 30)
step_timeout=3600, # Step timeout in seconds (default: 3600)
max_retries=3, # Max step retries (default: 3)
)
DBOSRunner
The DBOSRunner extends Interloper's base runner with DBOS durability:
from interloper_dbos import DBOSRunner
runner = DBOSRunner(
concurrency=10 # The concurrency of the DBOS queue
)
Workflow Recovery
DBOS enables automatic recovery from failures. You can resume workflows using the same workflow ID:
# First execution
result = runner.run(dag, workflow_id="my_pipeline_20250101")
# If it fails, retry with the same workflow ID to resume
result = runner.run(dag, workflow_id="my_pipeline_20250101")
Automatic Workflow ID Generation
If you don't specify a workflow ID, one will be generated automatically:
- For partitioned assets:
{prefix}_{partition}_{timestamp} - For partition windows:
{prefix}_{start}_{end}_{timestamp} - For non-partitioned:
{prefix}_{timestamp}_{uuid}
Database Setup
You need a PostgreSQL database for DBOS to store workflow state:
-- Create database
CREATE DATABASE interloper_dbos;
-- Create user (optional)
CREATE USER interloper_user WITH PASSWORD 'your_password';
GRANT ALL PRIVILEGES ON DATABASE interloper_dbos TO interloper_user;
Examples
See the examples/ directory for complete examples:
simple_pipeline.py: Basic pipeline with recovery demonstration- More examples coming soon...
How It Works
- Workflow Creation: Each
run()call creates a DBOS workflow - Asset Steps: Each asset becomes a DBOS step within the workflow
- Sequential Execution: Assets execute in topological order
- Checkpointing: DBOS automatically checkpoints each completed step
- Recovery: Failed workflows resume from the last successful step
Comparison with Other Runners
| Feature | SerialRunner | MultiThreadRunner | DBOSRunner |
|---|---|---|---|
| Parallelism | No | Yes | No (planned) |
| Durability | No | No | Yes |
| Recovery | No | No | Yes |
| State Storage | None | None | PostgreSQL |
Limitations
- Currently supports sequential execution only (parallel execution planned)
- Requires PostgreSQL database setup
- Workflow state is stored in the database (consider cleanup for old workflows)
Contributing
Contributions are welcome! Please see the main Interloper repository for contribution guidelines.
License
This package is part of the Interloper project. See the main repository for license information.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distributions
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file interloper_dbos-0.1.0-py3-none-any.whl.
File metadata
- Download URL: interloper_dbos-0.1.0-py3-none-any.whl
- Upload date:
- Size: 5.4 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.10.7 {"installer":{"name":"uv","version":"0.10.7","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"macOS","version":null,"id":null,"libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
2fd693e1b5455d944dcd6960073cfb765e59a6270e8ca80a31d51f0d0ad7cf9b
|
|
| MD5 |
532541d0f88464fa2a5f9a8101ec709b
|
|
| BLAKE2b-256 |
4829bdc74ff23d5e9f254323d65b2458ef7541d5fb331f956893b4e59193a6b1
|