Build modular data pipelines running inside the postgres database
Project description
Ralsei
Ralsei is a Python framework for building modular data pipelines acting on a SQL database. Inspired by kedro and dbt, it aims to combine data collection (through scraping/APIs) and data processing in a single declarative pipeline.
Installation
pip install ralsei
Example
See the documentation for an in-depth explaination
init_sources.sql
CREATE TABLE {{table}}(
id INTEGER PRIMARY KEY,
year INT,
name TEXT
);
{%split%}
INSERT INTO {{table}}(year, name) VALUES
(2015, 'Physics'),
(2018, 'Computer Science'),
(2021, 'Philosophy');
logic.py
import requests
import json
def download(year: int, name: str):
response = requests.get(
"https://foo.com/api",
params={"year": year, "name": name},
)
response.raise_for_status()
return {"json": response.text}
def parse_page(data: str):
for item in json.loads(data)["items"]:
yield {"university": item["name"], "rank": item["rank"]}
app.py
from typing import Optional
from pathlib import Path
import click
import sqlalchemy
from ralsei import (
Ralsei,
Pipeline,
Table,
ValueColumn,
Placeholder,
compose_one,
pop_id_fields,
)
from .logic import download, parse_page
# Define your tasks
class MyPipeline(Pipeline):
def __init__(self, schema: Optional[str]):
self.schema = schema
def create_tasks(self):
return {
"init": CreateTableSql(
table=Table("sources", self.schema),
sql=Path("./init_sources.sql").read_text(),
),
"download": MapToNewColumns(
table=self.outputof("init"), # (1)!
select=(
"SELECT id, year, name FROM {{table}} WHERE NOT {{is_done}}" # (2)!
),
columns=[ValueColumn("json", "TEXT")], # (3)!
is_done_column="_downloaded", # (4)!
fn=compose_one(download, pop_id_fields("id")) # (5)!
),
"parse": MapToNewTable(
source_table=self.outputof("download"),
select="SELECT id, json FROM {{source}}",
table=Table("records", self.schema),
columns=[
"record_id INTEGER PRIMARY KEY", # (6)!
ValueColumn(
"source_id",
"INT REFERENCES {{source}}",
Placeholder("id"),
),
ValueColumn("university", "TEXT"),
ValueColumn("rank", "INT"),
],
fn=compose(parse_page, pop_id_fields("id")),
)
}
# Create a CLI application
@click.option("-s", "--schema", help="Database schema")
class App(Ralsei):
def __init__(self, db: sqlalchemy.URL, schema: Optional[str]):
super().__init__(db, MyPipeline(schema))
if __name__ == "__main__":
App.run_cli()
The resulting app can be used like:
python ./app.py -d sqlite:///result.sqlite --schema dev run
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
ralsei-3.0.0.dev5.tar.gz
(30.2 kB
view hashes)
Built Distribution
Close
Hashes for ralsei-3.0.0.dev5-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 1d8de074630feb39466825f6759257945ea9c5ad70970901da3f4f0928f152e6 |
|
MD5 | 8427470a9e5224180a6d242134e79434 |
|
BLAKE2b-256 | e3703c464f6fcabd5971bc03d1c80c237ef70a06785b2a657db8d311f9245584 |