Skip to main content

Build modular data pipelines running inside the postgres database

Project description

Ralsei

Ralsei is a lightweight and portable Python framework designed for analysts who need to quickly build modular data pipelines. It enables users to create comprehensive data preparation workflows that integrate both data collection and processing in a single, declarative pipeline. This framework is particularly beneficial for those who prefer not to depend on cloud-based solutions or local infrastructure setups.

Design goals

  • Modular Design: Allows for the creation of reusable tasks, making it easy to maintain and adapt pipelines as data requirements evolve.
  • SQL Database Integration: Operates directly on SQL databases, storing everything from raw data to processed results, thereby simplifying data tracking and analysis.
  • Resumable Tasks: Supports long-running tasks with the ability to resume operations at the row level, minimizing reprocessing in case of interruptions.
  • Workflow Control: Provides full control over the workflow, enabling users to rerun specific tasks on-demand and manage dependencies effectively.

Installation

pip install ralsei

Tip: consider using Poetry for project-based dependency management

Example

See the documentation for an in-depth explaination

init_sources.sql

CREATE TABLE {{table}}(
  id INTEGER PRIMARY KEY,
  year INT,
  name TEXT
);
{%split%}
INSERT INTO {{table}}(year, name) VALUES
(2015, 'Physics'),
(2018, 'Computer Science'),
(2021, 'Philosophy');

logic.py

import requests
import json

def download(year: int, name: str):
  response = requests.get(
     "https://foo.com/api",
     params={"year": year, "name": name},
  )
  response.raise_for_status()
  return {"json": response.text}

def parse_page(data: str):
  for item in json.loads(data)["items"]:
     yield {"university": item["name"], "rank": item["rank"]}

app.py

from typing import Optional
from pathlib import Path
import click
import sqlalchemy
from ralsei import (
  Ralsei,
  Pipeline,
  Table,
  ValueColumn,
  Placeholder,
  compose_one,
  pop_id_fields,
)
from .logic import download, parse_page

# Define your tasks
class MyPipeline(Pipeline):
  def __init__(self, schema: Optional[str]):
     self.schema = schema

  def create_tasks(self):
     return {
        "init": CreateTableSql(
           table=Table("sources", self.schema),
           sql=Path("./init_sources.sql").read_text(),
        ),
        "download": MapToNewColumns(
           table=self.outputof("init"), # (1)!
           select=(
              "SELECT id, year, name FROM {{table}} WHERE NOT {{is_done}}" # (2)!
           ),
           columns=[ValueColumn("json", "TEXT")], # (3)!
           is_done_column="_downloaded", # (4)!
           fn=compose_one(download, pop_id_fields("id")) # (5)!
        ),
        "parse": MapToNewTable(
           source_table=self.outputof("download"),
           select="SELECT id, json FROM {{source}}",
           table=Table("records", self.schema),
           columns=[
              "record_id INTEGER PRIMARY KEY", # (6)!
              ValueColumn(
                 "source_id",
                 "INT REFERENCES {{source}}",
                 Placeholder("id"),
              ),
              ValueColumn("university", "TEXT"),
              ValueColumn("rank", "INT"),
           ],
           fn=compose(parse_page, pop_id_fields("id")),
        )
     }

# Create a CLI application
@click.option("-s", "--schema", help="Database schema")
class App(Ralsei):
  def __init__(self, db: sqlalchemy.URL, schema: Optional[str]):
     super().__init__(db, MyPipeline(schema))

if __name__ == "__main__":
  App.run_cli()

The resulting app can be used like:

python ./app.py -d sqlite:///result.sqlite --schema dev run

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

ralsei-3.2.0.tar.gz (36.4 kB view details)

Uploaded Source

Built Distribution

ralsei-3.2.0-py3-none-any.whl (53.1 kB view details)

Uploaded Python 3

File details

Details for the file ralsei-3.2.0.tar.gz.

File metadata

  • Download URL: ralsei-3.2.0.tar.gz
  • Upload date:
  • Size: 36.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.8.4 CPython/3.12.7 Linux/6.5.0-1025-azure

File hashes

Hashes for ralsei-3.2.0.tar.gz
Algorithm Hash digest
SHA256 1bc693bfe23091142f05aef86268e111bd0bd74b185c76098aff7293780aade8
MD5 187e9452824f07470aab1e1915949ee7
BLAKE2b-256 dee49e18b405263749d33dddf077ae1132f04cdfa06313e28c31342c79602f71

See more details on using hashes here.

File details

Details for the file ralsei-3.2.0-py3-none-any.whl.

File metadata

  • Download URL: ralsei-3.2.0-py3-none-any.whl
  • Upload date:
  • Size: 53.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.8.4 CPython/3.12.7 Linux/6.5.0-1025-azure

File hashes

Hashes for ralsei-3.2.0-py3-none-any.whl
Algorithm Hash digest
SHA256 5115c8fd025c05b6847f60cc2a4bd14c607a723da1a6f444dd0620ed72e996b7
MD5 80ac28b63b512761ea5cffd46d14a6ef
BLAKE2b-256 bf16b350996394abfce7668e5e6f428661ba11d0bbe5f1553555bdcb2b8e3d12

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page