Skip to main content

Build modular data pipelines running inside the postgres database

Project description

Ralsei

Ralsei is a lightweight and portable Python framework designed for analysts who need to quickly build modular data pipelines. It enables users to create comprehensive data preparation workflows that integrate both data collection and processing in a single, declarative pipeline. This framework is particularly beneficial for those who prefer not to depend on cloud-based solutions or local infrastructure setups.

Design goals

  • Modular Design: Allows for the creation of reusable tasks, making it easy to maintain and adapt pipelines as data requirements evolve.
  • SQL Database Integration: Operates directly on SQL databases, storing everything from raw data to processed results, thereby simplifying data tracking and analysis.
  • Resumable Tasks: Supports long-running tasks with the ability to resume operations at the row level, minimizing reprocessing in case of interruptions.
  • Workflow Control: Provides full control over the workflow, enabling users to rerun specific tasks on-demand and manage dependencies effectively.

Installation

pip install ralsei

Tip: consider using Poetry for project-based dependency management

Example

See the documentation for an in-depth explaination

init_sources.sql

CREATE TABLE {{table}}(
  id INTEGER PRIMARY KEY,
  year INT,
  name TEXT
);
{%split%}
INSERT INTO {{table}}(year, name) VALUES
(2015, 'Physics'),
(2018, 'Computer Science'),
(2021, 'Philosophy');

logic.py

import requests
import json

def download(year: int, name: str):
  response = requests.get(
     "https://foo.com/api",
     params={"year": year, "name": name},
  )
  response.raise_for_status()
  return {"json": response.text}

def parse_page(data: str):
  for item in json.loads(data)["items"]:
     yield {"university": item["name"], "rank": item["rank"]}

app.py

from typing import Optional
from pathlib import Path
import click
import sqlalchemy
from ralsei import (
  Ralsei,
  Pipeline,
  Table,
  ValueColumn,
  Placeholder,
  compose_one,
  pop_id_fields,
)
from .logic import download, parse_page

# Define your tasks
class MyPipeline(Pipeline):
  def __init__(self, schema: Optional[str]):
     self.schema = schema

  def create_tasks(self):
     return {
        "init": CreateTableSql(
           table=Table("sources", self.schema),
           sql=Path("./init_sources.sql").read_text(),
        ),
        "download": MapToNewColumns(
           table=self.outputof("init"), # (1)!
           select=(
              "SELECT id, year, name FROM {{table}} WHERE NOT {{is_done}}" # (2)!
           ),
           columns=[ValueColumn("json", "TEXT")], # (3)!
           is_done_column="_downloaded", # (4)!
           fn=compose_one(download, pop_id_fields("id")) # (5)!
        ),
        "parse": MapToNewTable(
           source_table=self.outputof("download"),
           select="SELECT id, json FROM {{source}}",
           table=Table("records", self.schema),
           columns=[
              "record_id INTEGER PRIMARY KEY", # (6)!
              ValueColumn(
                 "source_id",
                 "INT REFERENCES {{source}}",
                 Placeholder("id"),
              ),
              ValueColumn("university", "TEXT"),
              ValueColumn("rank", "INT"),
           ],
           fn=compose(parse_page, pop_id_fields("id")),
        )
     }

# Create a CLI application
@click.option("-s", "--schema", help="Database schema")
class App(Ralsei):
  def __init__(self, db: sqlalchemy.URL, schema: Optional[str]):
     super().__init__(db, MyPipeline(schema))

if __name__ == "__main__":
  App.run_cli()

The resulting app can be used like:

python ./app.py -d sqlite:///result.sqlite --schema dev run

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

ralsei-3.1.0.post4.tar.gz (35.1 kB view details)

Uploaded Source

Built Distribution

ralsei-3.1.0.post4-py3-none-any.whl (51.4 kB view details)

Uploaded Python 3

File details

Details for the file ralsei-3.1.0.post4.tar.gz.

File metadata

  • Download URL: ralsei-3.1.0.post4.tar.gz
  • Upload date:
  • Size: 35.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.8.4 CPython/3.12.7 Linux/6.5.0-1025-azure

File hashes

Hashes for ralsei-3.1.0.post4.tar.gz
Algorithm Hash digest
SHA256 6685da4c4a937edd89f5b645c7feefda6f8ba7a512ad04709830dd3d5973a8e6
MD5 a53d6845215542e35cd4ddaae3a99fde
BLAKE2b-256 2eae7de5e13e8347883246fd3dcf6a36e12026d6d86769cf5e624fffd03a6d0b

See more details on using hashes here.

File details

Details for the file ralsei-3.1.0.post4-py3-none-any.whl.

File metadata

  • Download URL: ralsei-3.1.0.post4-py3-none-any.whl
  • Upload date:
  • Size: 51.4 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.8.4 CPython/3.12.7 Linux/6.5.0-1025-azure

File hashes

Hashes for ralsei-3.1.0.post4-py3-none-any.whl
Algorithm Hash digest
SHA256 c9786d5ac2f4240f9d1759bb59bc85813fd648b0320d35e550fe4e160e814a4e
MD5 c18aa0f77a2683cfd340f37d8828fda2
BLAKE2b-256 f89a3dd1190e0f85158d70ab68bd8118d9a65be616cad0dff8f2f8abaded15a0

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page