Config-driven ETL framework for Spark, Pandas, and Polars.
Project description
bolt_pipeliner
A config-driven ETL framework for Apache Spark + Iceberg, Pandas, and Polars, with sibling base classes for Spark + Delta and Spark + Parquet. Pipelines are declared in a single YAML file and executed through one CLI:
bolt init my_project --preset medallion
cd my_project
bolt run --silver
bolt test
bolt generate documentation
The framework is inspired by dbt's tests: ergonomics but stays Python-first: jobs are plain modules exposing a process_data(self, input_tables) function, the runtime wires them onto a shared ETLBase, and downstream artifacts (Airflow DAGs, HTML docs, standalone layer scripts, notebooks, Snowflake DDLs) are regenerated from the same config.
Browsing the repo? The package code lives under
src/bolt_pipeliner/. Sample projects (a runnable demo plus two large reference projects) live underexamples/.
Table of contents
- Installation
- Quick start —
bolt init - CLI reference
- Config schema (
etl_config.yaml) - Base classes (engine selection)
- Writing an ETL job
- Incremental processing
- Data-quality tests (
bolt test) - Code generation (
bolt generate) - Spark session profiles
- Macros (reusable transforms)
- Documentation flow without Spark
- Project layout
- Troubleshooting
Installation
Requires Python ≥ 3.10.
pip install -e .
# add dev tooling for pytest / ruff / mypy
pip install -e ".[dev]"
# add Databricks Connect / PySpark when you need Spark locally
pip install -e ".[spark]"
The install registers the bolt console script.
Quick start — bolt init
Interactive scaffolder:
bolt init my_project
The wizard asks:
| Question | Choices |
|---|---|
| Architecture | flat • medallion (bronze/silver/gold) • diamond (bronze/silver/gold/diamond) • custom |
| Engine | pyspark • pandas • polars |
| Spark profile (pyspark only) | local • databricks • emr • glue • gcp • azure • k8s |
| Execution env | terminal • notebook • airflow • databricks-jobs |
| ML training layer | yes / no |
Skip the prompts with a preset:
bolt init my_project --preset minimal # pandas, flatfile + bronze
bolt init my_project --preset medallion # pyspark/local, bronze/silver/gold
bolt init my_project --preset diamond # full medallion + diamond + ML, airflow
bolt init my_project --preset pandas # pandas medallion, notebook
bolt init my_project --preset polars # polars medallion, notebook
The scaffolder writes:
my_project/
├── configs/
│ ├── etl_config.yaml
│ └── spark/<profile>.toml # only when engine=pyspark
├── etl/
│ ├── _flatfile/flatfile_example.py
│ ├── 0_bronze/bronze_example.py
│ ├── 1_silver/silver_example.py
│ └── 2_gold/gold_example.py
├── macros/__init__.py
├── models/train_example.py # only when ML is enabled
├── tests/test_smoke.py
└── README.md
It refuses to write into a non-empty directory.
CLI reference
bolt init PROJECT_NAME [--path PATH] [--preset NAME]
bolt run [--config PATH] [--flatfile|--bronze|--silver|--gold|--diamond]
bolt test [--config PATH] [--layer L] [--module M]
bolt generate {airflow|documentation|layers|notebook|snowflakeddl|all} [--config PATH]
| Command | What it does |
|---|---|
bolt init |
Interactive project scaffolder (above). |
bolt run |
Walks the layers declared in configs/etl_config.yaml and executes every job in dependency order. Pick a subset with the layer flags. |
bolt test |
Runs the tests: block on each job. Exits non-zero on failure. |
bolt generate |
Regenerates Airflow DAGs, HTML docs, standalone layer scripts, the notebook, and Snowflake DDLs from the config. Use all to run all of them. |
bolt --help and bolt <subcommand> --help print the full option set.
Config schema (etl_config.yaml)
configs:
output_bucket: "s3://my_project/tables/"
flatfile_bucket: "s3://my_project/flatfiles/"
schema: my_project # destination schema (Iceberg namespace, Snowflake schema, …)
catalog: dev_catalog # destination catalog for non-bronze reads/writes
incremental_column: year_month # optional; default "year_month" for Iceberg
# base, "yearMonth" for Pandas/Polars bases
layers:
flatfile: etl/_flatfile
bronze: etl/0_bronze
silver: etl/1_silver
gold: etl/2_gold
flatfile:
- module: flatfile_storm_events
description: "NOAA storm events."
class_name: ETLBase # picks the Spark+Iceberg base
input_tables:
storm_events: "storm_events.csv"
output_table_name: storm_events
silver:
- module: silver_fct_state_gas_cpi_monthly
description: "Monthly CPI + gas prices by state."
class_name: ETLBase
incremental: true
input_tables:
cpi: flatfile_cpi_regional
gas: flatfile_gas_prices
output_table_name: fct_state_gas_cpi_monthly
partition_by: [year_month]
tests:
- not_null: [year_month, state]
- unique: [year_month, state]
- row_count: { min: 1 }
- freshness: { column: year_month, max_age_days: 90 }
Per-job keys
| Key | Required | Notes |
|---|---|---|
module |
yes | Filename of the job module inside the layer's directory (no .py). |
input_tables |
yes | Dict of alias → upstream table or file. Peco-style _input_tables: is normalized to input_tables: at load time. |
output_table_name |
yes | Becomes {layer}_{output_table_name} in the destination. |
class_name |
no (default ETLBase) |
Picks the base class. Built-ins: ETLBase, ETLBaseDelta, ETLBaseParquet, ETLBaseParquetPandas, ETLBaseParquetPolars. Also accepts dotted paths like mypkg.bases.MyCustom. |
partition_by |
no | List of column names. |
incremental |
no (default false) | See Incremental processing. |
unload |
no (default true) | If false, the runtime won't call unload_data — useful for manual partition writes. |
description |
no | Free text; surfaces in bolt generate documentation. |
tests |
no | See Data-quality tests. |
Base classes (engine selection)
Five sibling base classes ship in bolt_pipeliner.bases.*. They expose the same lifecycle methods (check_if_tables_exists_find_yearmonths → load_data → process_data → unload_data); jobs don't subclass them — the runner picks one per job via the YAML class_name: key.
class_name |
Engine | Storage | When to use |
|---|---|---|---|
ETLBase (default) |
PySpark | Iceberg (Glue) | Large-scale ETL with ACID Iceberg tables. |
ETLBaseDelta |
PySpark | Delta (Synapse) | Synapse / Databricks Delta lake. |
ETLBaseParquet |
PySpark | Parquet on S3 | Spark without a metastore. |
ETLBaseParquetPandas |
Pandas + PyArrow | Parquet | Notebook / single-node ETL. |
ETLBaseParquetPolars |
Polars + PyArrow | Parquet | Single-node ETL with Polars ergonomics. |
Engines are imported lazily: importing bolt_pipeliner does not pull in PySpark / Polars / Pandas. Engine modules are loaded only when a job actually instantiates one. You can therefore run a pure-Pandas project without installing PySpark.
To register your own base class, point class_name: at a dotted path:
- module: silver_custom
class_name: mypkg.bases.MyAuditingBase
input_tables: { src: bronze_src }
output_table_name: custom
Writing an ETL job
Every job module exports one top-level function:
# etl/1_silver/silver_fct_account_calls_monthly.py
from pyspark.sql import functions as F
def process_data(self, input_tables):
"""`self` is the ETLBase instance, so you can call self.spark, self.year_months,
self._create_table, self._replace_table_partitions, etc.
"""
calls = input_tables["t_agent_calls"]
return (
calls
.groupBy("account_id", "year_month")
.agg(F.count("*").alias("call_count"))
)
The runner monkey-patches your function onto the ETLBase instance via types.MethodType, so self exposes:
| Attribute / method | Purpose |
|---|---|
self.spark |
The Spark session (Spark bases only). |
self.input_tables |
Dict of alias → DataFrame, already loaded. |
self.year_months |
The incremental window (list of YYYYMM ints), or None for a full run. |
self.partition_by, self.incremental, self.unload |
Echo of the YAML config. |
self._create_table(df) / self._replace_table_partitions(df) |
Manual write helpers. |
self.iceberg_table / self._write_table / self.parquet_path / self.dataset_path |
Destination identifier (varies by base). |
self.logging_string |
A short "<layer> <output_table_name>" label for logs. |
A Pandas job looks the same but returns a pd.DataFrame; a Polars job returns a pl.DataFrame. The base class decides how to persist it.
Incremental processing
Set incremental: true and list a partition column whose name matches incremental_column (default year_month):
- module: silver_fct_calls
incremental: true
partition_by: [year_month]
When the output table already exists, ETLBase.run() computes self.year_months as [current_month - 3 … current_month]. The base then filters processed_df by year_month ∈ self.year_months before overwritePartitions().
Requirements: the returned DataFrame must include the configured incremental column, and that column must be in partition_by. If a job does not fit this monthly model, either set incremental: false or set unload: false and write partitions yourself inside process_data, then return an empty DataFrame.
Advanced — manual partition unloading
For memory-heavy jobs, process month-by-month and write each partition yourself:
def process_data(self, input_tables):
raw = input_tables["raw"]
months_to_process = self.year_months or _list_all_months(raw)
for ym in months_to_process:
chunk = transform_one_month(raw, ym)
if not self.table_exists:
self._create_table(chunk)
self.table_exists = True
else:
self._replace_table_partitions(chunk)
return self.spark.createDataFrame([], chunk.schema) # empty → unload_data no-ops
Pair it with unload: false in YAML.
Data-quality tests (bolt test)
Declare checks under each job's tests: block (dbt-style):
silver:
- module: silver_fct_account_calls_monthly
output_table_name: fct_account_calls_monthly
tests:
- not_null: [year_month, account_id]
- unique: [year_month, account_id]
- row_count: { min: 1 }
- freshness: { column: year_month, max_age_days: 90 }
- schema: [year_month, account_id, call_count]
Built-in checks (all five work uniformly on Spark / Pandas / Polars):
| Check | Parameters |
|---|---|
not_null |
columns: [str] |
unique |
columns: [str] (composite key) |
row_count |
min: int = 1, max: int | None |
schema |
expected: [str] (extras allowed) |
freshness |
column: str, max_age_days: int (accepts YYYYMM int or date) |
Run them:
bolt test # every job in every layer
bolt test --layer silver
bolt test --module fct_account_calls_monthly
bolt test exits non-zero if any check fails, so it slots straight into CI.
Notebook usage
Each TestResult implements _repr_html_(), so the results render with colored PASS/FAIL banners in Jupyter:
from bolt_pipeliner.testing import run_checks
results = run_checks(df, [{"not_null": ["year_month"]}, {"row_count": {"min": 1}}])
results # rendered inline as HTML
Code generation (bolt generate)
bolt generate <target> [--config PATH] regenerates downstream artifacts from etl_config.yaml. Targets:
| Target | Output | What you get |
|---|---|---|
airflow |
outputs/airflow/{code,dags}/ |
One DAG per layer + one standalone Spark script per job. The DAG template uses plain Airflow operators; swap in EmrContainerOperator / DatabricksSubmitRunOperator / KubernetesPodOperator as needed. |
documentation |
outputs/documentation/ |
HTML index + per-table pages with Mermaid lineage. Always emits outputs/schema/schema.py for the Spark-free documentation flow. |
layers |
outputs/layers/<layer>.py |
One executable script per layer, inlining every job in dependency order. Useful for ad-hoc runs without Airflow. |
notebook |
outputs/notebook/etl_jobs_notebook.ipynb |
A Jupyter notebook with one cell per job (plus Spark session + ETLBase setup cells). |
snowflakeddl |
outputs/snowflake_ddls/ |
CREATE TABLE DDLs derived from outputs/schema/schema.csv. |
all |
(all of the above) |
The generators read templates from the package (src/bolt_pipeliner/templates/); they're engine-agnostic and don't ship any cloud-specific code paths.
Smart dependency resolution
The order of jobs inside each layer in YAML is irrelevant — the generators build a DAG by matching each job's input_tables values against other jobs' output_table_name (prefixed with the layer name) and topologically sort.
Spark session profiles
bolt_pipeliner.sessions.create_session(profile) dispatches to one module per runtime under bolt_pipeliner.sessions/:
| Profile | Module | Status |
|---|---|---|
local |
sessions/local.py |
Implemented — returns the active SparkSession or builds one. |
databricks, emr, glue, gcp, azure, k8s |
sessions/<profile>.py |
Stubs today; planned. |
Override via BOLT_SPARK_PROFILE (read by the generated standalone scripts) or by editing configs/spark/<profile>.toml.
# configs/spark/local.toml
[runtime]
target = "local"
[spark]
"spark.sql.shuffle.partitions" = 200
"spark.serializer" = "org.apache.spark.serializer.KryoSerializer"
Macros (reusable transforms)
Project-local reusable transforms live in macros/ and are plain Python — no DSL:
# macros/dates.py
def month_floor(df, column):
"""Round `column` down to the first of the month. Engine-aware via isinstance."""
...
# etl/1_silver/silver_invoice.py
from macros.dates import month_floor
def process_data(self, input_tables):
return month_floor(input_tables["raw"], "issue_date")
The framework deliberately does not ship a macro DSL or registry. Use plain imports.
Documentation flow without Spark
bolt generate documentation always emits a Spark-free schema-extraction script alongside the HTML so notebook-only developers can produce schemas without a local Spark install:
bolt generate documentation→ writesoutputs/schema/schema.pyand the HTML (falling back to whateverschema.csvexists, or empty if none).- Copy
schema.pyinto the environment that has Spark (Databricks notebook, EMR shell, …) and run it. It prints a CSV. - Save that CSV to
outputs/schema/schema.csv. - Re-run
bolt generate documentation— the HTML now picks up the real column definitions, andbolt generate snowflakeddlcan derive Snowflake DDLs from the same CSV.
Project layout
my_project/
├── configs/
│ ├── etl_config.yaml # source of truth — layers, jobs, tests
│ ├── style_config.yaml # colors used by `bolt generate documentation`
│ └── spark/<profile>.toml # cluster overrides per runtime profile
├── etl/
│ ├── _flatfile/ # raw ingestion
│ ├── 0_bronze/ # cleaned raw
│ ├── 1_silver/ # business logic
│ └── 2_gold/ # domain-specific facts / marts
├── macros/ # reusable Python transforms
├── models/ # optional ML training jobs
├── tests/ # pytest unit tests (project-specific)
└── outputs/ # generated; gitignored
├── airflow/{dags,code}/
├── documentation/
├── layers/
├── notebook/
├── schema/
└── snowflake_ddls/
Framework code itself lives under src/bolt_pipeliner/:
src/bolt_pipeliner/
├── runner.py # job loop
├── bases/ # five sibling ETLBase variants
├── sessions/ # Spark profile dispatch
├── config/loader.py # YAML loader + key normalization
├── generators/ # airflow / documentation / layers / notebook / snowflake_ddl
├── templates/{airflow,docs}/ # bundled templates
├── testing/ # data-quality checks + runner
└── cli/ # typer app (init / run / generate / test)
And sample projects live under examples/:
examples/
├── demo/ # runnable PySpark + Iceberg medallion (used in this repo's CI)
├── entergy/ # large production-scale snapshot — read-only reference
└── peco/ # multi-engine reference (PySpark + Pandas + Polars on Parquet)
See examples/README.md for a guided tour.
Troubleshooting
ImportError: cannot import name 'AnalyzeArgument' from 'pyspark.sql.udtf'
Your local PySpark install is older than the package's expected version. Reinstall PySpark or pin to a version matching your cluster.
bolt run finds no jobs for a layer
Check that the layer is declared under layers: in etl_config.yaml and has a matching top-level section (e.g. silver:). The loader treats placeholder values like silver: ... as empty layers.
Generated documentation has empty schema columns
Spark wasn't reachable. Run outputs/schema/schema.py in your Spark environment, save the printed CSV to outputs/schema/schema.csv, and rerun bolt generate documentation.
Adding a new layer
- Add
mylayer: etl/9_mylayerunderlayers:inetl_config.yaml. - Create
etl/9_mylayer/with one or more job modules exposingprocess_data(self, input_tables). - List the jobs under the new top-level
mylayer:section. - Run
bolt run --mylayer— wait, layer flags are baked into the CLI today (--flatfile,--bronze,--silver,--gold,--diamond). For arbitrary layer names, runbolt run(all layers) or invoke the runner programmatically:from bolt_pipeliner.runner import run; run("configs/etl_config.yaml", layers=["mylayer"]).
Contributing
pip install -e ".[dev]"
ruff check src/ tests/
mypy src/bolt_pipeliner
pytest -q
Pull requests should:
- Keep changes minimal and scoped.
- Add tests for any new behavior (the suite covers config loading, runner resolution, generators, the CLI, and all data-quality checks across Pandas + Polars).
- Avoid breaking the lazy-import invariant — importing
bolt_pipelinermust not pull in PySpark / Polars / Pandas at module load time. Thetests/test_package_imports_lazily.pysuite enforces this.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file bolt_pipeliner-0.1.0.tar.gz.
File metadata
- Download URL: bolt_pipeliner-0.1.0.tar.gz
- Upload date:
- Size: 466.6 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.5
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
5514ddbf04faba40293efef29db3c2a26ea8d1913a2b48943e5376b4bf268ea8
|
|
| MD5 |
4cb281d6fd129f8eba2c4a8e8e2e59ad
|
|
| BLAKE2b-256 |
c58c9f644553bcada1ddd7cec0c615c636d75b771367f22400e7d00912500295
|
File details
Details for the file bolt_pipeliner-0.1.0-py3-none-any.whl.
File metadata
- Download URL: bolt_pipeliner-0.1.0-py3-none-any.whl
- Upload date:
- Size: 471.1 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.5
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
abb6f34093d9a21e23970d947a925db6a6ecd6f15e49ec46695b9625a4c2bac2
|
|
| MD5 |
6fa2fd7c09d714302f7cacd80ea89abd
|
|
| BLAKE2b-256 |
c8c8a2aef58f1ed437f03cec071566fad8733030052584ab720cafccdce5071e
|