Data preprocessing and feature engineering for time-series forecasting
Project description
faro-prep
Data preprocessing and feature engineering library for time-series forecasting. Fluent, chainable API that reads like a recipe — load, clean, encode, scale, engineer, inspect — and produces a serializable preprocessing pipeline for reproducibility.
Installation
pip install faro-prep
Quick Start
from forecastlib.data import Loader
ds = (
Loader.from_csv("sales.csv")
.select(target="sales", datetime="date", group="store")
.clean.fix_datetime()
.clean.drop_duplicates()
.fill.smart()
.categorical().encode.auto()
.numeric().exclude(["sales"]).scale.standard()
.target().lags([1, 7, 14])
.target().rolling.mean([7, 30])
.target().ewm([7, 14])
.datetime().features.calendar()
)
df = ds.to_dataframe() # Final pandas DataFrame
pipeline = ds.to_pipeline() # Reproducible pipeline
pipeline.save("pipeline.pkl")
Loading Data
From files
from forecastlib.data import Loader
ds = Loader.from_csv("sales.csv")
ds = Loader.from_csv("sales.csv", sep=";", encoding="latin-1") # custom params
ds = Loader.from_excel("sales.xlsx")
ds = Loader.from_excel("sales.xlsx", sheet_name="Sheet2")
ds = Loader.from_parquet("sales.parquet")
ds = Loader.from_json("sales.json")
From a DataFrame
import pandas as pd
df = pd.read_csv("sales.csv")
ds = Loader.from_dataframe(df)
From SQL
# PostgreSQL
ds = Loader.from_sql(
db="postgresql",
host="localhost",
database="sales_db",
user="admin",
password="secret",
table="transactions",
)
# MySQL
ds = Loader.from_sql(
db="mysql", host="localhost", database="mydb",
user="root", password="pass",
query="SELECT * FROM sales WHERE year = 2024",
)
# SQLite
ds = Loader.from_sql(db="sqlite", database="local.db", table="sales")
# SQL Server
ds = Loader.from_sql(db="mssql", host="srv", database="db", user="u", password="p", table="t")
Supported databases: postgresql, mysql, sqlite, mssql.
Requires the matching driver:
psycopg2,pymysql, orpyodbc.
Column Role Assignment
Tell the library which columns play which roles:
ds = ds.select(
target="sales", # Column to forecast (required)
datetime="date", # Date / timestamp column (required)
group="store", # Group key — SKU, store, region (optional)
)
Cleaning
ds = ds.clean.fix_datetime() # Parse date strings → datetime64
ds = ds.clean.fix_datetime(format="%d/%m/%Y") # Explicit format
ds = ds.cols(["date_str"]).clean.fix_datetime() # On specific columns
ds = ds.clean.drop_duplicates() # Remove exact duplicate rows
ds = ds.clean.drop_duplicates(subset=["date", "store"]) # Consider only these columns
ds = ds.clean.drop_duplicates(keep="last") # keep: "first" | "last" | False
ds = ds.clean.drop_nulls() # Drop rows with any null
ds = ds.clean.drop_nulls(subset=["sales", "date"]) # Only check these columns
ds = ds.clean.drop_nulls(thresh=5) # Require at least 5 non-null values
ds = ds.clean.drop_constant() # Remove columns with a single unique value
ds = ds.cols(["price"]).clean.clip(lower=0) # Clip to minimum
ds = ds.cols(["age"]).clean.clip(lower=0, upper=120) # Clip to range
ds = ds.categorical().clean.strip() # Strip whitespace from string columns
ds = ds.clean.fix_dtypes() # Auto-cast: numeric strings→float, low-cardinality→category
ds = ds.clean.rename({"old_col": "new_col"}) # Rename columns
ds = ds.clean.sort(by="date") # Sort ascending by column
ds = ds.clean.sort(by=["store", "date"]) # Sort by multiple columns
ds = ds.clean.sort(by="date", ascending=False) # Descending sort
Filling Missing Values
# Smart auto-select per column (recommended starting point)
ds = ds.fill.smart()
# Logic: numeric <5% nulls → median, numeric ≥5% → interpolate, categorical → mode, datetime → ffill
# Statistical fills
ds = ds.fill.mean() # Column mean (numeric only)
ds = ds.fill.median() # Column median (numeric only)
ds = ds.fill.mode() # Most frequent value (any dtype)
ds = ds.fill.constant(0) # Fixed constant for all NaN
# Temporal fills
ds = ds.fill.forward() # Forward fill (ffill)
ds = ds.fill.forward(limit=3) # Forward fill but at most 3 consecutive NaNs
ds = ds.fill.backward() # Backward fill (bfill)
ds = ds.fill.backward(limit=3) # Backward fill with limit
# Interpolation
ds = ds.fill.interpolate() # Linear interpolation (default)
ds = ds.fill.interpolate(method="time") # Time-based interpolation
ds = ds.fill.interpolate(method="polynomial") # Any method from pd.Series.interpolate
# Panel-aware fill (group-aware — correct for multi-SKU data)
ds = ds.fill.time_series()
# Fills within each group separately; remaining leading NaNs → bfill → 0
# KNN imputation
ds = ds.numeric().fill.knn() # Default 5 neighbors
ds = ds.numeric().fill.knn(n_neighbors=3)
Apply any fill to specific columns:
ds = ds.cols(["sales", "price"]).fill.forward()
ds = ds.cols(["sales"]).fill.interpolate(method="time")
ds = ds.numeric().fill.knn()
Column Selection
Select subsets of columns before applying a transformation:
# By type
ds.numeric() # All numeric columns
ds.categorical() # All object / category columns
ds.target() # The target column only (requires .select() first)
ds.datetime() # The datetime column only
# By name
ds.cols(["price", "promo"])
# By regex
ds.regex("price|promo")
# Exclude specific columns from a type selection
ds.numeric().exclude(["sales"]) # All numeric except "sales"
Encoding Categorical Columns
# Auto: one-hot for ≤15 categories, label for ≤200, binary for >200
ds = ds.categorical().encode.auto()
# One-hot: creates <col>_<value> binary columns, drops original
ds = ds.categorical().encode.one_hot()
ds = ds.categorical().encode.one_hot(drop_first=True) # Drop first category (avoids multicollinearity)
# Label encoding: replaces values with integer codes 0..n-1
ds = ds.categorical().encode.label()
# Ordinal: encode with a specific order
ds = ds.cols(["size"]).encode.ordinal()
# Binary (hash-based): for very high cardinality (>200 unique values)
ds = ds.cols(["product_id"]).encode.binary()
# Target specific columns
ds = ds.cols(["region", "channel"]).encode.one_hot()
ds = ds.cols(["category"]).encode.label()
Scaling Numeric Columns
ds = ds.numeric().scale.standard() # Z-score: (x - mean) / std
ds = ds.numeric().scale.minmax() # Scale to [0, 1]
ds = ds.numeric().scale.robust() # Median-centered, IQR-scaled (outlier-resistant)
ds = ds.numeric().scale.log() # Natural log: log(x + 1)
ds = ds.numeric().scale.power() # Yeo-Johnson power transform (handles negatives)
# Scale features, leave target untouched
ds = ds.numeric().exclude(["sales"]).scale.standard()
# Scale specific columns
ds = ds.cols(["price", "promo"]).scale.minmax()
ds = ds.cols(["revenue"]).scale.log()
Time-Series Feature Engineering
These methods require .select() to have been called first.
Lag Features
ds = ds.target().lags([1, 7, 14])
# Creates: sales_lag1, sales_lag7, sales_lag14
Rolling Statistics
ds = ds.target().rolling.mean([7, 30]) # → sales_rollmea7, sales_rollmea30
ds = ds.target().rolling.std([7]) # → sales_rollstd7
ds = ds.target().rolling.min([7, 14]) # → sales_rollmin7, sales_rollmin14
ds = ds.target().rolling.max([7, 14]) # → sales_rollmax7, sales_rollmax14
Exponential Weighted Mean
ds = ds.target().ewm([7, 14])
# Creates: sales_ewm7, sales_ewm14
Differencing
ds = ds.target().diffs([1, 7])
# Creates: sales_diff1, sales_diff7
Calendar Features
ds = ds.datetime().features.calendar()
Creates the following columns (prefixed with the datetime column name):
| Column | Description |
|---|---|
date_year |
Year (integer) |
date_month |
Month 1–12 |
date_day |
Day of month |
date_dow |
Day of week (0=Monday) |
date_week |
ISO week number |
date_quarter |
Quarter 1–4 |
date_is_weekend |
1 if Saturday or Sunday |
date_sin_month |
Cyclical sin encoding of month |
date_cos_month |
Cyclical cos encoding of month |
date_sin_dow |
Cyclical sin encoding of day-of-week |
date_cos_dow |
Cyclical cos encoding of day-of-week |
date_days_to_easter |
Days until/since Easter (Colombia-calibrated) |
date_days_to_christmas |
Days until/since Christmas |
Inspection
summary = ds.inspect.summary() # DataFrame: column, dtype, nulls, nunique, min, max, mean
nulls = ds.inspect.nulls() # DataFrame: column, null_count, null_pct
types = ds.inspect.types() # DataFrame: column, dtype, inferred_role
memory = ds.inspect.memory() # DataFrame: column, KB, MB
Dataset Properties
len(ds) # Number of rows
ds.shape # (rows, cols)
ds.columns # List of column names
ds.dtypes # Series of dtypes
ds.head(n=5) # First n rows as DataFrame
ds.to_dataframe() # Full pandas DataFrame
ds.copy() # Deep copy — fully independent from the original
# (mutations to ds do not affect the copy and vice versa)
Preprocessing Pipeline
Capture all transformations as a reproducible pipeline:
from forecastlib.pipeline import Pipeline
# After any chain of transforms
pipeline = ds.to_pipeline()
pipeline.summary() # Print all steps
# Save to disk
pipeline.save("pipeline.pkl")
# Load and inspect later
loaded = Pipeline.load("pipeline.pkl")
print(loaded.steps)
Train / Test Splitting
from forecastlib.time_series import TimeSeriesSplitter
splitter = TimeSeriesSplitter()
# Simple train/test split
train, test = splitter.train_test_split(ds, test_ratio=0.2)
print(len(train), len(test))
# Walk-forward cross-validation (expanding window)
splitter_cv = TimeSeriesSplitter(n_splits=5)
for train_fold, test_fold in splitter_cv.split(ds):
print(f" train={len(train_fold)}, test={len(test_fold)}")
Data Quality Validation
from forecastlib.time_series import TimeSeriesValidator
validator = TimeSeriesValidator()
report = validator.check(ds, datetime_col="date")
print(report.sorted) # True if sorted chronologically
print(report.has_gaps) # True if there are missing time steps
print(report.has_duplicates) # True if duplicate timestamps exist
Transform Registry
Every operation is recorded and can be audited:
steps = ds._registry.summary()
for step in steps:
print(step) # e.g., {"op": "scale.standard", "cols": ["price"], ...}
Complete Example
from forecastlib.data import Loader
from forecastlib.pipeline import Pipeline
from forecastlib.time_series import TimeSeriesSplitter, TimeSeriesValidator
# 1. Load
ds = Loader.from_csv("sales.csv")
# 2. Assign roles
ds = ds.select(target="sales", datetime="date", group="store")
# 3. Validate before transforming
validator = TimeSeriesValidator()
report = validator.check(ds, datetime_col="date")
if report.has_gaps:
print("Warning: time gaps detected")
# 4. Clean
ds = (
ds
.clean.fix_datetime()
.clean.drop_duplicates()
.clean.sort(by="date")
)
# 5. Fill
ds = ds.fill.smart()
# 6. Encode
ds = ds.categorical().encode.auto()
# 7. Scale features (not target)
ds = ds.numeric().exclude(["sales"]).scale.standard()
# 8. Time-series features
ds = (
ds
.target().lags([1, 7, 14])
.target().rolling.mean([7, 30])
.target().rolling.std([7])
.target().ewm([7, 14])
.target().diffs([1])
.datetime().features.calendar()
)
# 9. Inspect
print(ds.inspect.summary())
# 10. Split
splitter = TimeSeriesSplitter(n_splits=3)
for train, test in splitter.split(ds):
df_train = train.to_dataframe()
df_test = test.to_dataframe()
# ... train your model ...
# 11. Save pipeline
pipeline = ds.to_pipeline()
pipeline.save("sales_pipeline.pkl")
License
MIT — see LICENSE
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file faro_prep-0.1.2.tar.gz.
File metadata
- Download URL: faro_prep-0.1.2.tar.gz
- Upload date:
- Size: 7.6 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.10
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
b08962488e75daa7a53eb8cafc2d7d09c838b28d92ecbac3952b46fb7c74b579
|
|
| MD5 |
da4d864c2ac9acac441f4ef0a236a22f
|
|
| BLAKE2b-256 |
bde570aec156b263f5ff0945ced522674ff8d5a415ee6ff3ff75663280091b62
|
File details
Details for the file faro_prep-0.1.2-py3-none-any.whl.
File metadata
- Download URL: faro_prep-0.1.2-py3-none-any.whl
- Upload date:
- Size: 7.3 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.10
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
55347e49e355696838bc8c0cb34ad8e0d3568033475e2771ba838b11d90d5eaf
|
|
| MD5 |
f993d43e79060bd800925bec78073b38
|
|
| BLAKE2b-256 |
02b23d485343585ff2a371d9b69265f8bb2b61f154a03a1caa8d6ebb3eff2054
|