Data preprocessing and feature engineering for time-series forecasting
Project description
faro-prep
Data preprocessing and feature engineering library for time-series forecasting. Fluent, chainable API that reads like a recipe — load, clean, encode, scale, engineer, inspect — and produces a serializable preprocessing pipeline for reproducibility.
Installation
pip install faro-prep
Quick Start
from forecastlib.data import Loader
ds = (
Loader.from_csv("sales.csv")
.select(target="sales", datetime="date", group="store")
.clean.fix_datetime()
.clean.drop_duplicates()
.fill.smart()
.categorical().encode.auto()
.numeric().exclude(["sales"]).scale.standard()
.target().lags([1, 7, 14])
.target().rolling.mean([7, 30])
.target().ewm([7, 14])
.datetime().features.calendar()
)
df = ds.to_dataframe() # Final pandas DataFrame
pipeline = ds.to_pipeline() # Reproducible pipeline
pipeline.save("pipeline.pkl")
Loading Data
From files
from forecastlib.data import Loader
ds = Loader.from_csv("sales.csv")
ds = Loader.from_csv("sales.csv", sep=";", encoding="latin-1") # custom params
ds = Loader.from_excel("sales.xlsx")
ds = Loader.from_excel("sales.xlsx", sheet_name="Sheet2")
ds = Loader.from_parquet("sales.parquet")
ds = Loader.from_json("sales.json")
From a DataFrame
import pandas as pd
df = pd.read_csv("sales.csv")
ds = Loader.from_dataframe(df)
From SQL
# PostgreSQL
ds = Loader.from_sql(
db="postgresql",
host="localhost",
database="sales_db",
user="admin",
password="secret",
table="transactions",
)
# MySQL
ds = Loader.from_sql(
db="mysql", host="localhost", database="mydb",
user="root", password="pass",
query="SELECT * FROM sales WHERE year = 2024",
)
# SQLite
ds = Loader.from_sql(db="sqlite", database="local.db", table="sales")
# SQL Server
ds = Loader.from_sql(db="mssql", host="srv", database="db", user="u", password="p", table="t")
Supported databases: postgresql, mysql, sqlite, mssql.
Requires the matching driver:
psycopg2,pymysql, orpyodbc.
Column Role Assignment
Tell the library which columns play which roles:
ds = ds.select(
target="sales", # Column to forecast (required)
datetime="date", # Date / timestamp column (required)
group="store", # Group key — SKU, store, region (optional)
)
Cleaning
ds = ds.clean.fix_datetime() # Parse date strings → datetime64
ds = ds.clean.drop_duplicates() # Remove exact duplicate rows
ds = ds.clean.sort(by="date") # Sort by any column
Filling Missing Values
ds = ds.fill.smart() # Median for numeric, mode for categorical
ds = ds.fill.forward() # Forward fill
ds = ds.fill.backward() # Backward fill
ds = ds.fill.median() # Median of each column
ds = ds.fill.mean() # Mean of each column
ds = ds.fill.constant(0) # Fill all NaN with a constant
ds = ds.fill.interpolate() # Linear interpolation
Apply fill to specific columns:
ds = ds.cols(["sales", "price"]).fill.forward()
Column Selection
Select subsets of columns before applying a transformation:
# By type
ds.numeric() # All numeric columns
ds.categorical() # All object / category columns
ds.target() # The target column only (requires .select() first)
ds.datetime() # The datetime column only
# By name
ds.cols(["price", "promo"])
# By regex
ds.regex("price|promo")
# Exclude specific columns from a type selection
ds.numeric().exclude(["sales"]) # All numeric except "sales"
Encoding Categorical Columns
ds = ds.categorical().encode.auto() # Automatic: one-hot for low cardinality, label for high
ds = ds.categorical().encode.one_hot() # One-hot encoding (drops original column)
ds = ds.categorical().encode.label() # Label encoding (0, 1, 2, ...)
ds = ds.categorical().encode.ordinal() # Ordinal encoding
# Target specific columns
ds = ds.cols(["region", "channel"]).encode.one_hot()
ds = ds.cols(["category"]).encode.label()
Scaling Numeric Columns
ds = ds.numeric().scale.standard() # Z-score: (x - mean) / std
ds = ds.numeric().scale.minmax() # Scale to [0, 1]
ds = ds.numeric().scale.robust() # Median-centered, IQR-scaled (outlier-resistant)
ds = ds.numeric().scale.log() # Natural log transform
# Scale features, leave target untouched
ds = ds.numeric().exclude(["sales"]).scale.standard()
# Scale specific columns
ds = ds.cols(["price", "promo"]).scale.minmax()
Time-Series Feature Engineering
These methods require .select() to have been called first.
Lag Features
ds = ds.target().lags([1, 7, 14])
# Creates: sales_lag1, sales_lag7, sales_lag14
Rolling Statistics
ds = ds.target().rolling.mean([7, 30]) # → sales_rollmea7, sales_rollmea30
ds = ds.target().rolling.std([7]) # → sales_rollstd7
ds = ds.target().rolling.min([7, 14]) # → sales_rollmin7, sales_rollmin14
ds = ds.target().rolling.max([7, 14]) # → sales_rollmax7, sales_rollmax14
Exponential Weighted Mean
ds = ds.target().ewm([7, 14])
# Creates: sales_ewm7, sales_ewm14
Differencing
ds = ds.target().diffs([1, 7])
# Creates: sales_diff1, sales_diff7
Calendar Features
ds = ds.datetime().features.calendar()
Creates the following columns (prefixed with the datetime column name):
| Column | Description |
|---|---|
date_year |
Year (integer) |
date_month |
Month 1–12 |
date_day |
Day of month |
date_dow |
Day of week (0=Monday) |
date_week |
ISO week number |
date_quarter |
Quarter 1–4 |
date_is_weekend |
1 if Saturday or Sunday |
date_sin_month |
Cyclical sin encoding of month |
date_cos_month |
Cyclical cos encoding of month |
date_sin_dow |
Cyclical sin encoding of day-of-week |
date_cos_dow |
Cyclical cos encoding of day-of-week |
date_days_to_easter |
Days until/since Easter (Colombia-calibrated) |
date_days_to_christmas |
Days until/since Christmas |
Inspection
summary = ds.inspect.summary() # DataFrame: column, dtype, nulls, nunique, min, max, mean
nulls = ds.inspect.nulls() # DataFrame: column, null_count, null_pct
types = ds.inspect.types() # DataFrame: column, dtype, inferred_role
memory = ds.inspect.memory() # DataFrame: column, KB, MB
Dataset Properties
len(ds) # Number of rows
ds.shape # (rows, cols)
ds.columns # List of column names
ds.dtypes # Series of dtypes
ds.head(n=5) # First n rows as DataFrame
ds.to_dataframe() # Full pandas DataFrame
ds.copy() # Deep copy (independent of original)
Preprocessing Pipeline
Capture all transformations as a reproducible pipeline:
from forecastlib.pipeline import Pipeline
# After any chain of transforms
pipeline = ds.to_pipeline()
pipeline.summary() # Print all steps
# Save to disk
pipeline.save("pipeline.pkl")
# Load and inspect later
loaded = Pipeline.load("pipeline.pkl")
print(loaded.steps)
Train / Test Splitting
from forecastlib.time_series import TimeSeriesSplitter
splitter = TimeSeriesSplitter()
# Simple train/test split
train, test = splitter.train_test_split(ds, test_ratio=0.2)
print(len(train), len(test))
# Walk-forward cross-validation (expanding window)
splitter_cv = TimeSeriesSplitter(n_splits=5)
for train_fold, test_fold in splitter_cv.split(ds):
print(f" train={len(train_fold)}, test={len(test_fold)}")
Data Quality Validation
from forecastlib.time_series import TimeSeriesValidator
validator = TimeSeriesValidator()
report = validator.check(ds, datetime_col="date")
print(report.sorted) # True if sorted chronologically
print(report.has_gaps) # True if there are missing time steps
print(report.has_duplicates) # True if duplicate timestamps exist
Transform Registry
Every operation is recorded and can be audited:
steps = ds._registry.summary()
for step in steps:
print(step) # e.g., {"op": "scale.standard", "cols": ["price"], ...}
Complete Example
from forecastlib.data import Loader
from forecastlib.pipeline import Pipeline
from forecastlib.time_series import TimeSeriesSplitter, TimeSeriesValidator
# 1. Load
ds = Loader.from_csv("sales.csv")
# 2. Assign roles
ds = ds.select(target="sales", datetime="date", group="store")
# 3. Validate before transforming
validator = TimeSeriesValidator()
report = validator.check(ds, datetime_col="date")
if report.has_gaps:
print("Warning: time gaps detected")
# 4. Clean
ds = (
ds
.clean.fix_datetime()
.clean.drop_duplicates()
.clean.sort(by="date")
)
# 5. Fill
ds = ds.fill.smart()
# 6. Encode
ds = ds.categorical().encode.auto()
# 7. Scale features (not target)
ds = ds.numeric().exclude(["sales"]).scale.standard()
# 8. Time-series features
ds = (
ds
.target().lags([1, 7, 14])
.target().rolling.mean([7, 30])
.target().rolling.std([7])
.target().ewm([7, 14])
.target().diffs([1])
.datetime().features.calendar()
)
# 9. Inspect
print(ds.inspect.summary())
# 10. Split
splitter = TimeSeriesSplitter(n_splits=3)
for train, test in splitter.split(ds):
df_train = train.to_dataframe()
df_test = test.to_dataframe()
# ... train your model ...
# 11. Save pipeline
pipeline = ds.to_pipeline()
pipeline.save("sales_pipeline.pkl")
License
MIT — see LICENSE
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file faro_prep-0.1.1.tar.gz.
File metadata
- Download URL: faro_prep-0.1.1.tar.gz
- Upload date:
- Size: 6.4 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.10
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
ddd37740a64e3a168397bf280f35eb85c09f9d4df1d304a6538c415534ce95d1
|
|
| MD5 |
94d0b3e4c4d9db41ca5c6e268221fd43
|
|
| BLAKE2b-256 |
2370cafc982acb7c90930346acf8659a763673d002cf3857aa8d7a9aee883e95
|
File details
Details for the file faro_prep-0.1.1-py3-none-any.whl.
File metadata
- Download URL: faro_prep-0.1.1-py3-none-any.whl
- Upload date:
- Size: 6.3 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.10
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
8ffe4f3ebcc35e398218ad7408016e04b55b25602f1625bb90367636e1ff5cc4
|
|
| MD5 |
05c46c8bdca8af1728732c39eaf2df3e
|
|
| BLAKE2b-256 |
02c9b41383f356080f5a555630ab8ee560199a58ae1943a6f9897baf31b9109d
|