Skip to main content

ROTAB: a template that moves with your thinking.

Project description

ROTAB

A template that moves with your thinking. Fully compatible with LLM-based generation and validation.

ROTAB is a lightweight tool that defines data processing using YAML templates and automatically converts them into executable Python code. No implementation code required—just describe what you want to do. This is the minimal system designed to realize that philosophy.


Use Cases

  • When you want to record data processing as a reproducible, reusable structure
  • When you need to share and review processing logic with non-engineers
  • When you want LLMs to generate, modify, or validate processing templates
  • When you need to rapidly prototype and test different processing pipelines
  • When you want to visualize the entire workflow clearly

What ROTAB Offers

  • Intuitive, readable syntax for describing processing logic—even for non-engineers
  • No scripting or boilerplate code—just write a YAML template
  • Auto-generates standalone, executable Python code
  • Automatically visualizes the pipeline as a DAG
  • Easily extendable using your own function files

1. How to Use

Template Example (YAML)

name: main_template

depends:
  - user_filter_template
  - transaction_summary_template

processes:
  - name: transaction_enrichment
    description: |
      This process enriches user transactions by filtering users based on age and
      transactions based on amount, then merging the two datasets.
    io:
      inputs:
        - name: user
          io_type: csv
          path: ../../source/outputs/filtered_users.csv
          schema: user

        - name: trans
          io_type: csv
          path: ../../source/outputs/filtered_transactions.csv
          schema: trans

      outputs:
        - name: final_output
          io_type: csv
          path: ../../source/outputs/final_output.csv
          schema: final_output

    steps:
      - name: filter_users_main
        with: user
        mutate:
          - filter: age > ${params.min_age}
          - derive: |
              log_age = log(age)
              age_bucket = age // 10 * 10
          - select: [user_id, log_age, age_bucket]
        as: filtered_users
        when: ${params.test}

      - name: filter_transactions_main
        with: trans
        mutate:
          - filter: amount > 1000
        as: filtered_trans

      - name: merge_transactions
        with: [filtered_users, filtered_trans]
        transform: merge(left=filtered_users, right=filtered_trans, on='user_id')
        as: enriched

      - name: enrich_transactions
        with: enriched
        mutate:
          - derive: |
              high_value = amount > 10000
          - select: ${params.enrich_transactions.columns}
        as: final_output

Parameter Injection

You can inject values from a parameter YAML file using the ${...} syntax inside your templates.

# params.yaml
params:
  min_age: 18

This allows dynamic and reusable templates by separating logic from configuration.

Running the Pipeline

rotab --template-dir ./examples/config/templates \
      --source-dir ./examples/source_polars \
      --param-dir ./examples/config/params \
      --schema-dir ./examples/config/schemas \
      --backend polars \
      --execute \
      --dag
  • Python code is generated at the path specified in the template
  • The generated code is standalone and directly executable

2. Generated Python Code

import os
import polars as pl
import fsspec
from rotab.core.parse.parse import parse
from rotab.core.operation.derive_funcs_polars import *
from rotab.core.operation.transform_funcs_polars import *

def step_filter_users_main_transaction_enrichment(filtered_users):
    filtered_users_main = filtered_users
    filtered_users_main = filtered_users_main.filter(parse('age > 18'))
    filtered_users_main = filtered_users_main.with_columns(parse("""
        log_age = log(age)
        age_bucket = age // 10 * 10
        """))
    filtered_users_main = filtered_users_main.select(['user_id', 'log_age', 'age_bucket'])
    return filtered_users_main

def step_filter_transactions_main_transaction_enrichment(filtered_transactions):
    filtered_trans = filtered_transactions
    filtered_trans = filtered_trans.filter(parse('amount > 1000'))
    return filtered_trans

def step_merge_transactions_transaction_enrichment(filtered_users_main, filtered_trans):
    enriched = merge(left=filtered_users_main, right=filtered_trans, on='user_id')
    return enriched

def step_enrich_transactions_transaction_enrichment(enriched):
    final_output = enriched
    final_output = final_output.with_columns(parse("""
        high_value = amount > 10000
        """))
    final_output = final_output.select(['user_id', 'log_age', 'amount', 'high_value'])
    return final_output

def transaction_enrichment():
    """This process enriches user transactions by filtering users based on age and
    transactions based on amount, then merging the two datasets."""
    filtered_users = pl.scan_csv("data/outputs/filtered_users.csv", dtypes={"user_id": pl.Utf8, "age": pl.Int64, "age_group": pl.Int64})
    filtered_transactions = pl.scan_csv("data/outputs/filtered_transactions.csv", dtypes={"user_id": pl.Utf8, "amount": pl.Int64, "is_large": pl.Boolean})
    filtered_users_main = step_filter_users_main_transaction_enrichment(filtered_users)
    filtered_trans = step_filter_transactions_main_transaction_enrichment(filtered_transactions)
    enriched = step_merge_transactions_transaction_enrichment(filtered_users_main, filtered_trans)
    final_output = step_enrich_transactions_transaction_enrichment(enriched)
    final_output = final_output.with_columns(pl.col("user_id").cast(pl.Utf8))
    final_output = final_output.with_columns(pl.col("log_age").cast(pl.Float64))
    final_output = final_output.with_columns(pl.col("amount").cast(pl.Int64))
    final_output = final_output.with_columns(pl.col("high_value").cast(pl.Boolean))
    with fsspec.open("data/outputs/final_output.csv", "w") as f:
        final_output.collect(streaming=True).write_csv(f)
    return final_output

if __name__ == "__main__":
    transaction_enrichment()

3. Automatic DAG Generation

graph TB
%% Nodes
%% Template: user_filter_template
subgraph T_user_filter_template ["user_filter_template"]
  %% Process: user_filter
  subgraph P_user_filter ["user_filter"]
    I_user_filter_template__user(["[I]user"])
    S_user_filter_template__filter_users(["[S]filter_users"])
    O_user_filter_template__filtered_users(["[O]filtered_users"])
    I_user_filter_template__user --> S_user_filter_template__filter_users
    S_user_filter_template__filter_users --> O_user_filter_template__filtered_users
  end
end
%% Template: transaction_summary_template
subgraph T_transaction_summary_template ["transaction_summary_template"]
  %% Process: trans_summary
  subgraph P_trans_summary ["trans_summary"]
    I_transaction_summary_template__trans(["[I]trans"])
    S_transaction_summary_template__summarize_transactions(["[S]summarize_transactions"])
    O_transaction_summary_template__filtered_transactions(["[O]filtered_transactions"])
    I_transaction_summary_template__trans --> S_transaction_summary_template__summarize_transactions
    S_transaction_summary_template__summarize_transactions --> O_transaction_summary_template__filtered_transactions
  end
end
%% Template: main_template
subgraph T_main_template ["main_template"]
  %% Process: transaction_enrichment
  subgraph P_transaction_enrichment ["transaction_enrichment"]
    I_main_template__user(["[I]user"])
    I_main_template__trans(["[I]trans"])
    S_main_template__filter_users_main(["[S]filter_users_main"])
    S_main_template__filter_transactions_main(["[S]filter_transactions_main"])
    S_main_template__merge_transactions(["[S]merge_transactions"])
    S_main_template__enrich_transactions(["[S]enrich_transactions"])
    O_main_template__final_output(["[O]final_output"])
    I_main_template__user --> S_main_template__filter_users_main
    I_main_template__trans --> S_main_template__filter_transactions_main
    S_main_template__filter_users_main --> S_main_template__merge_transactions
    S_main_template__filter_transactions_main --> S_main_template__merge_transactions
    S_main_template__merge_transactions --> S_main_template__enrich_transactions
    S_main_template__enrich_transactions --> O_main_template__final_output
  end
end
%% Template Dependencies
T_user_filter_template --> T_main_template
T_transaction_summary_template --> T_main_template

4. Built-in Functions

Column Definition (define)

Function Description
log(x, base=10) Logarithm with custom base
log1p(x) Natural logarithm of (1 + x)
exp(x) Exponential function
sqrt(x) Square root
clip(x, min, max) Clamp value between min and max
round(x, n=0) Round to n decimal places
floor(x) Round down to nearest integer
ceil(x) Round up to nearest integer
abs(x) Absolute value
len(x) Length of a string or list
startswith(x, p) Check if string starts with prefix
endswith(x, s) Check if string ends with suffix
lower(x) Convert string to lowercase
upper(x) Convert string to uppercase
replace_values(x, old, new) Replace substring
strip(x) Trim whitespace
format_datetime(x, format) Format datetime string
year(x) Extract year from datetime
month(x) Extract month
day(x) Extract day
weekday(x) Extract weekday (0 = Monday)
hour(x) Extract hour
days_between(x1, x2) Number of days between two dates
is_null(x) Check for null or NaN
not_null(x) Check for non-null value
min(x1, x2) Minimum of two values
max(x1, x2) Maximum of two values

Table Transformation (transform)

Function Description
sort_by(table, column, ascending=True) Sort table by a column
groupby_agg(table, by, aggregations) Group and aggregate. Example: {"amount": "sum"}
drop_duplicates(table, subset=None) Remove duplicate rows
merge(left, right, on, how='inner') Merge two dataframes on a column
reshape(table, column_to, columns_from, column_value, agg) Pivot/melt depending on parameters
fillna(table, mapping) Fill missing values. Example: { "age": 0 }
sample(table, frac) Random sample by fraction
concat(tables) Concatenate tables vertically
drop_na(table, subset=None) Drop rows with missing values
replace(table, columns, old, new) Replace values in specified columns

License

MIT License © 2025 PROJECT RO

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

rotab-0.2.2.tar.gz (52.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

rotab-0.2.2-py3-none-any.whl (39.8 kB view details)

Uploaded Python 3

File details

Details for the file rotab-0.2.2.tar.gz.

File metadata

  • Download URL: rotab-0.2.2.tar.gz
  • Upload date:
  • Size: 52.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.8.2 CPython/3.8.10 Linux/5.10.16.3-microsoft-standard-WSL2

File hashes

Hashes for rotab-0.2.2.tar.gz
Algorithm Hash digest
SHA256 76d30123ce2bfb8f39bf277aa95bf48bb0320fa8b8c9dcb171808d452714bb69
MD5 3bd6851d3066b73d6b89bc928a325ad5
BLAKE2b-256 917c540d1e8000eb3bba94464514387e79fbbbdd97b944fb28f5a1a2d166d687

See more details on using hashes here.

File details

Details for the file rotab-0.2.2-py3-none-any.whl.

File metadata

  • Download URL: rotab-0.2.2-py3-none-any.whl
  • Upload date:
  • Size: 39.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.8.2 CPython/3.8.10 Linux/5.10.16.3-microsoft-standard-WSL2

File hashes

Hashes for rotab-0.2.2-py3-none-any.whl
Algorithm Hash digest
SHA256 f1e17b07d52386c7cf30d298a03b85778624e0ff603f27239603cf6cfdfcc769
MD5 9a5748ec3877cfbc4a6806bbe122d917
BLAKE2b-256 e304171ab5a947d9c1dd4f11a95e2cb90a15bc728e1ace8ec5354c70515b6307

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page