Skip to main content

Enterprise-grade multi-SKU time-series forecasting and preprocessing library

Project description

faro-core

PyPI version Python License: MIT

Enterprise-grade time-series forecasting and preprocessing library.

faro-core ships two modules in a single install:

Module Import from Purpose
Forecasting engine forecasting_core Train multiple model families per SKU, get forecasts, inventory recs, scenarios
Preprocessing forecastlib Load, clean, encode, scale, engineer time-series features with a fluent API
pip install faro-core

Table of Contents

  1. Forecasting Engine (forecasting_core)
  2. Preprocessing (forecastlib)

Forecasting Engine

from forecasting_core import ForecastEngine

Trains and evaluates multiple model families simultaneously per SKU/group, with automatic feature engineering, walk-forward validation, inventory optimization, and what-if scenario analysis.

Available models:

Name Type Best for
lightgbm ML Large datasets, many features, fast training
xgboost ML General purpose, robust to outliers
prophet Statistical Trend + seasonality + calendars, tolerates missing data
arima Statistical Short univariate series, well-understood patterns
ets Statistical Exponential smoothing, fast, no regressors needed
sarimax Statistical Seasonal patterns + external regressors
croston Statistical Intermittent / sparse demand (many zeros)

Constructors

# Start empty and configure step by step
engine = ForecastEngine()

# From a JSON config file
engine = ForecastEngine.from_config("session_config.json")

# From a Python dict (for API integrations)
engine = ForecastEngine.from_dict({
    "data":     {"path": "sales.csv"},
    "columns":  {"target": "sales", "date": "date", "group": "item_id"},
    "models":   {"lightgbm": {}, "prophet": {}},
    "features": {"lags": [1, 7, 14], "rolling": [7, 14], "calendar": True},
    "training": {"walk_forward": True, "wfv_splits": 3},
    "forecast": {"horizon": 14},
})

# Replace the full config on an existing engine
engine.set_config(config_dict)

Load Data

engine.load_data("sales.csv")         # CSV (auto-detected)
engine.load_data("sales.xlsx")        # Excel
engine.load_data("sales.parquet")     # Parquet

import pandas as pd
engine.load_data(pd.read_csv("sales.csv"))  # pandas DataFrame

Inspect Before Configuring

Run these after load_data() to understand the dataset before setting column roles.

# Full column metadata + auto-detected roles
profile = engine.get_profile()
print(profile["recommended"])
# {"date": "order_date", "target": "sales_qty", "group": "sku_id"}

# Candidate columns per role (for building dropdowns in a UI)
options = engine.get_column_options()
# {"date_candidates": [...], "target_candidates": [...], ...}

# Per-column transform suggestions based on data characteristics
suggestions = engine.get_transform_suggestions()
for s in suggestions:
    print(s["column"], "→", s["suggested_spec"], "|", s["reasons"])
# sales  → {"scale": "log"}      | ["skewness=3.8 → log transform improves fit"]
# region → {"encode": "one_hot"} | ["5 categories → one-hot encoding"]

# Full schema of all configurable parameters with defaults
schema = engine.get_config_schema()

# All supported model names
models = engine.get_available_models()
# ["lightgbm", "xgboost", "prophet", "arima", "ets", "sarimax", "croston"]

Configure Columns

engine.choose_columns(
    target="sales",                     # Column to forecast — required
    date="date",                        # Date/timestamp column — required
    sku="item_id",                      # Group key (SKU, store, product) — optional
    exogenous=["price", "promo_flag"],  # External regressors for Prophet/SARIMAX — optional
)

Data Quality and Routing

# Per-SKU health score and demand pattern classification
quality = engine.get_data_quality_report()
# {
#   "SKU_A": {"quality_score": 0.92, "series_type": "regular",      "warnings": []},
#   "SKU_B": {"quality_score": 0.61, "series_type": "intermittent", "warnings": ["60% zeros"]},
# }

# Which models will be assigned to which SKUs (before training)
routing = engine.get_routing_plan()
# {
#   "SKU_A": {"models": ["lightgbm", "prophet"], "flags": ["regular", "seasonal"]},
#   "SKU_B": {"models": ["croston"],              "flags": ["intermittent"]},
# }

Configure Features

Feature engineering applies to ML models (LightGBM, XGBoost). Statistical models receive the raw series.

engine.configure_features(
    lags=[1, 7, 14, 28],    # Lag features — "what were sales 1, 7, 14, 28 days ago?"
    rolling=[7, 14, 28],    # Rolling mean + std over these windows
    diffs=[1, 7],           # Day-over-day and week-over-week change
    calendar=True,          # Month, DOW, week, quarter, sin/cos cyclical, Colombia holidays
    ewm_spans=[7, 14],      # Exponential weighted mean spans
)

Choosing lag values: Match your seasonal period — for daily/weekly data use [1, 7, 14, 28], for monthly use [1, 3, 6, 12].

Configure Data Transforms

Per-column imputation, encoding, and scaling applied before feature engineering. If the target column is scaled, forecasts are automatically inverted to the original scale.

engine.configure_transforms({
    "sales":      {"impute": "median", "scale": "log"},
    "price":      {"scale": "minmax"},
    "region":     {"encode": "label"},
    "channel":    {"impute": "mode",   "encode": "one_hot"},
    "promo_flag": {"impute": "zero"},
})
Parameter Options
impute none mean median mode forward interpolate zero smart
encode none label one_hot ordinal binary auto
scale none standard minmax robust log power

Auto-suggest transforms from the data:

suggestions = engine.get_transform_suggestions()
specs = {s["column"]: s["suggested_spec"] for s in suggestions if s["auto_apply"]}
engine.configure_transforms(specs, auto_apply=True)

Configure Training

engine.configure_training(
    train_ratio=0.8,       # Fraction used for training (rest = validation)
    walk_forward=True,     # Walk-forward validation — strongly recommended
    wfv_splits=3,          # Number of folds
    min_history=20,        # Minimum rows required per SKU
    seasonal_period=7,     # 7=weekly, 12=monthly, 52=annual weekly
)

Walk-forward validation trains on data up to a cutoff and validates on the next window, repeating wfv_splits times — correctly simulates production forecasting with no look-ahead bias.

Select Models

engine.select_models(
    models=["lightgbm", "xgboost", "prophet", "ets"],
    hyperparams={
        "lightgbm": {"n_estimators": 200, "learning_rate": 0.05, "num_leaves": 64},
        "xgboost":  {"n_estimators": 150, "max_depth": 6, "subsample": 0.8},
        "prophet":  {"changepoint_prior_scale": 0.5, "seasonality_mode": "multiplicative"},
    }
)

Configure Forecast and Business Rules

engine.configure_forecast(
    horizon=14,
    quantiles=[0.1, 0.5, 0.9],   # Confidence interval levels
)

engine.configure_business(
    service_level=0.95,            # Target fill rate (95% = stock-outs in ≤5% of cycles)
    lead_time_days=7,              # Days between placing and receiving an order
    holding_cost_pct=0.20,         # Annual holding cost as % of inventory value
    stockout_cost_multiplier=3.0,  # How much more a stock-out costs vs. holding one unit
)

Train

engine.train()

# With live progress callbacks
def on_progress(event):
    print(f"[{event['pct']:3d}%] {event['message']}")

engine.train(on_progress=on_progress)

The pipeline runs: DataTransformer → DataQualityChecker → ModelRouter → FeatureEngineer → Trainer (walk-forward) → WeightedEnsemble → Registry.

Read Results

# Training metrics per model/SKU
metrics = engine.get_metrics()
# {
#   "rows": [{"sku": "A", "model": "lightgbm", "mae": 12.3, "rmse": 15.1, "wape": 0.08}],
#   "by_model": {"lightgbm": {"avg_mae": 12.3, "avg_rmse": 15.1, "avg_wape": 0.08}},
#   "shap": {"SKU_A": {"lightgbm": {"price": 0.42, "sales_lag7": 0.35, ...}}}
# }

# Point forecasts as a pandas DataFrame
forecast_df = engine.predict(horizon=14)
# Columns: sku, model, date, forecast, p90_lo, p90_hi, step

# Single SKU
sku_df = engine.predict_by_sku("SKU_A", horizon=14)

# JSON-serializable dict (dates as ISO strings) — same format as REST API response
forecast_json = engine.get_forecast()
# {"rows": [...], "n_skus": 5, "horizon": 14}

# Nested dict {sku: {model: [{date, value, lower, upper}]}}
forecast_dict = engine.get_forecast_dict()

# Inventory recommendations
inventory = engine.get_inventory_report()
# {"recommendations": [{"sku": "A", "reorder_point": 120, "safety_stock": 35, ...}]}

# Full report (metrics + inventory + config)
report = engine.generate_report()
print(report["run_id"])

predict() tries: cached forecast → re-generate from fitted models → full pipeline re-run.

Time-Series Analysis

# Full statistical analysis for one SKU
# Covers: stationarity (ADF+KPSS), STL decomposition, seasonality (FFT+ACF),
# trend (Mann-Kendall + Sen's slope + change points), autocorrelation, outliers, distribution
analysis = engine.analyze(sku="SKU_A")

# Summary DataFrame — all SKUs in one table
# Columns: sku, n, mean, cv, zero_pct, stationarity, seasonal_strength,
#          trend_direction, dominant_period, suggested_ar_order, is_white_noise, ...
summary_df = engine.get_analysis_summary()

# STL decomposition chart data (trend + seasonal + residual with real dates)
decomp = engine.get_decomposition_chart(sku="SKU_A")
# {"dates": [...], "original": [...], "trend": [...], "seasonal": [...],
#  "residual": [...], "trend_strength": 0.82, "seasonal_strength": 0.67}

# Seasonal indices (how demand at each cycle position compares to the average)
seasonality = engine.get_seasonality_chart(sku="SKU_A")
# {"indices": [0.85, 0.90, 1.02, 1.08, 1.15, 1.25, 0.75],
#  "labels":  ["Mon", "Tue", "Wed", "Thu", "Fri", "Sat", "Sun"],
#  "grand_mean": 98.4}
# index > 1.0 = above-average demand at that position in the cycle

What-If Scenarios

Adjust forecasts without retraining, filtering by SKU, model, and date range.

# +10% across all SKUs, never below 0
result = engine.apply_scenario([
    {"multiplier": 1.10},
    {"floor": 0.0},
])

# +25% for SKU_A in June only
result = engine.apply_scenario([
    {"sku": "SKU_A", "date_start": "2025-06-01", "date_end": "2025-06-30",
     "multiplier": 1.25, "label": "June promotion"},
])

# -15% for LightGBM only, capped at 200 units
result = engine.apply_scenario([
    {"model": "lightgbm", "multiplier": 0.85, "ceiling": 200.0}
])

# Apply inplace (replaces the engine's active forecast)
engine.apply_scenario([{"multiplier": 1.10}], inplace=True)

# Returns in nested dict format — same as get_forecast_dict()
scenario_dict = engine.get_scenario_dict([{"sku": "SKU_A", "multiplier": 1.10}])

ScenarioRule fields:

Field Type Description
sku str Filter to a specific SKU. Omit for all.
model str Filter to a specific model. Omit for all.
date_start / date_end "YYYY-MM-DD" Date range filter.
multiplier float Scale by factor — 1.10 = +10%, 0.85 = −15%.
offset float Add a fixed amount to each value.
floor float Minimum allowed value.
ceiling float Maximum allowed value.
label str Human-readable name for this rule.

Drift Detection

drift = engine.detect_drift("new_data.csv")
# Or: engine.detect_drift(new_dataframe)

print(drift["has_drift"])           # True / False
print(drift["n_drifted_features"])  # How many columns drifted
print(drift["alerts"])
# ["price: PSI=0.28 (HIGH drift — recommend retraining)"]

print(drift["feature_drift"])
# {"price": {"psi": 0.28, "psi_level": "HIGH", "ks_p_value": 0.001, "drift": True}, ...}

PSI thresholds: < 0.10 = LOW (no concern) · 0.10–0.25 = MEDIUM (monitor) · > 0.25 = HIGH (retrain).

Save and Load

engine.save("models/session_jan2025.joblib")

engine = ForecastEngine.load("models/session_jan2025.joblib")
forecast_df = engine.predict(horizon=14)   # No retraining needed

Configuration Files

{
  "data":     {"path": "sales.csv"},
  "columns":  {"target": "sales", "date": "date", "group": "item_id",
               "exogenous": ["price", "promo_flag"]},
  "models":   {"lightgbm": {"n_estimators": 200}, "prophet": {}, "ets": {}},
  "features": {"lags": [1,7,14,28], "rolling": [7,14,28], "calendar": true, "ewm_spans": [7,14]},
  "training": {"train_ratio": 0.8, "walk_forward": true, "wfv_splits": 3, "seasonal_period": 7},
  "forecast": {"horizon": 14, "quantiles": [0.1, 0.5, 0.9]},
  "business": {"service_level": 0.95, "lead_time_days": 7,
               "holding_cost_pct": 0.20, "stockout_cost_multiplier": 3.0},
  "transforms": {"sales": {"impute": "median", "scale": "log"},
                 "price": {"scale": "minmax"}, "region": {"encode": "label"}}
}
engine = ForecastEngine.from_config("session_config.json")
engine.train()
engine.export_config("output/reproducible_config.json")

Preprocessing

from forecastlib.data import Loader

Fluent, chainable preprocessing API. Every transformation is recorded and can be saved as a reproducible Pipeline for production use.

Loading Data

The Loader auto-detects dtypes, attempts to parse date columns, and warns about data quality issues (duplicates, high null rates, large files).

from forecastlib.data import Loader

# Files
ds = Loader.from_csv("sales.csv")
ds = Loader.from_csv("sales.csv", sep=";", encoding="latin-1")
ds = Loader.from_excel("sales.xlsx")
ds = Loader.from_excel("sales.xlsx", sheet_name="Ventas")
ds = Loader.from_parquet("sales.parquet")
ds = Loader.from_json("sales.json")

# pandas DataFrame
import pandas as pd
ds = Loader.from_dataframe(pd.read_csv("sales.csv"))

# SQL databases — requires the matching driver (psycopg2, pymysql, pyodbc)
ds = Loader.from_sql(
    db="postgresql",          # "postgresql" | "mysql" | "sqlite" | "mssql"
    host="localhost",
    port=5432,                # optional — defaults per db type
    database="sales_db",
    user="admin",
    password="secret",
    table="transactions",     # either table= or query=
)

# Custom SQL query (filter/join before loading)
ds = Loader.from_sql(
    db="postgresql", host="localhost", database="sales_db", user="u", password="p",
    query="SELECT * FROM sales WHERE year >= 2023",
)

# Large tables — read in chunks to avoid memory issues
ds = Loader.from_sql(
    db="postgresql", host="localhost", database="sales_db", user="u", password="p",
    table="transactions", chunk_size=100_000,
)

Assign Column Roles

ds = ds.select(
    target="sales",   # Column to forecast — required for feature engineering methods
    datetime="date",  # Date/timestamp column — required for calendar features and sorting
    group="store",    # Group key (SKU, store, product) — optional, for panel datasets
)

After .select(), methods like .target().lags() and .datetime().features.calendar() know which columns to use automatically.

Cleaning

# Parse string dates to datetime64
ds = ds.clean.fix_datetime()                       # Auto-detect format
ds = ds.clean.fix_datetime(format="%d/%m/%Y")      # Explicit format

# Remove duplicate rows
ds = ds.clean.drop_duplicates()                             # All columns
ds = ds.clean.drop_duplicates(subset=["date", "store"])    # Check only these columns
ds = ds.clean.drop_duplicates(keep="last")                 # "first" (default) | "last" | False

# Drop rows that have missing values
ds = ds.clean.drop_nulls()                         # Any null → drop row
ds = ds.clean.drop_nulls(subset=["sales"])         # Only if target is null
ds = ds.clean.drop_nulls(thresh=5)                 # Keep rows with at least 5 non-null values

# Drop columns that have a single unique value (no information for the model)
ds = ds.clean.drop_constant()

# Clamp values to a range (removes extreme outliers)
ds = ds.cols(["sales"]).clean.clip(lower=0)            # No negative sales
ds = ds.cols(["age"]).clean.clip(lower=0, upper=120)   # Range clip

# Strip leading/trailing whitespace from string columns (run before encoding)
ds = ds.categorical().clean.strip()

# Auto-cast dtypes: numeric strings → float, low-cardinality strings → category
ds = ds.clean.fix_dtypes()

# Rename columns (updates schema roles automatically if renamed column has a role)
ds = ds.clean.rename({"Fecha": "date", "Ventas": "sales"})

# Sort rows — required before building lag/rolling features
ds = ds.clean.sort()                              # Sort by configured datetime column
ds = ds.clean.sort(by="date")                     # Explicit column
ds = ds.clean.sort(by=["store", "date"])          # Multi-column
ds = ds.clean.sort(by="date", ascending=False)    # Descending

Filling Missing Values

Dropping rows in time-series data creates gaps that corrupt lag features — always prefer filling.

# ── Smart auto-fill (recommended starting point) ──────────────────────────────
# Numeric <5% nulls → median | Numeric ≥5% → interpolate | Categorical → mode | Datetime → ffill
ds = ds.fill.smart()

# ── Panel-aware fill (recommended for multi-SKU data) ────────────────────────
# Fills within each group independently — prevents data from one SKU polluting another.
# After ffill: remaining leading nulls → bfill → 0 for numeric.
ds = ds.fill.time_series()

# ── Statistical fills ─────────────────────────────────────────────────────────
ds = ds.fill.mean()           # Column mean — sensitive to outliers
ds = ds.fill.median()         # Column median — robust (preferred over mean)
ds = ds.fill.mode()           # Most frequent value — works for any dtype
ds = ds.fill.constant(0)      # Fixed constant — use when 0 means "no activity"

# ── Temporal fills ────────────────────────────────────────────────────────────
ds = ds.fill.forward()              # Carry last known value forward (LOCF)
ds = ds.fill.forward(limit=3)       # Forward fill at most 3 consecutive NaNs
ds = ds.fill.backward()             # Carry next known value backward
ds = ds.fill.backward(limit=3)

ds = ds.fill.interpolate()                         # Linear interpolation
ds = ds.fill.interpolate(method="time")            # Time-weighted interpolation
ds = ds.fill.interpolate(method="polynomial")      # Polynomial interpolation
ds = ds.fill.interpolate(method="spline")          # Cubic spline

# ── KNN imputation ────────────────────────────────────────────────────────────
# Imputes based on nearest neighbors — better when missingness is not random
ds = ds.numeric().fill.knn()               # 5 neighbors (default)
ds = ds.numeric().fill.knn(n_neighbors=3)

# ── Apply fill to specific columns ────────────────────────────────────────────
ds = ds.cols(["sales"]).fill.forward()
ds = ds.cols(["price", "promo"]).fill.constant(0)
ds = ds.categorical().fill.mode()

Column Selection

Narrow which columns a transformation applies to. All selectors chain into .scale, .encode, .fill, and .clean.

ds.numeric()                       # All numeric columns (int, float)
ds.categorical()                   # All object / category columns
ds.target()                        # Target column only (requires .select())
ds.datetime()                      # Datetime column only (requires .select())
ds.cols(["price", "promo"])        # Explicit column list
ds.regex("price|promo")            # Columns matching a regex pattern

# Exclude specific columns from any selection
ds.numeric().exclude(["sales"])              # All numeric except the target
ds.cols(["a", "b", "c"]).exclude(["b"])     # ["a", "c"]

Encoding

Always encode categorical columns before scaling or feature engineering — ML models require numeric inputs.

# Auto: one-hot for ≤15 categories, label for 16–200, binary for >200
ds = ds.categorical().encode.auto()

# One-hot: creates <col>_<value> binary columns, drops original
ds = ds.categorical().encode.one_hot()
ds = ds.categorical().encode.one_hot(drop_first=True)   # Avoid multicollinearity in linear models

# Label: replace each category with an integer code 0..n-1
# Good for tree-based models (LightGBM, XGBoost), NOT for linear models
ds = ds.categorical().encode.label()

# Ordinal: encode with a specific natural order
ds = ds.cols(["size"]).encode.ordinal()

# Binary (hash-based): for very high cardinality (>200 unique values)
ds = ds.cols(["product_id"]).encode.binary()

# Apply to specific columns
ds = ds.cols(["region", "channel"]).encode.one_hot()

Scaling

Scale numeric features so gradient-based models converge faster. Tree-based models are scale-invariant but benefit from consistent ranges. Always exclude the target from scaling, or if you do scale it, invert the scaling on predictions.

# Z-score normalization: (x - mean) / std — general default
ds = ds.numeric().scale.standard()

# Scale to [0, 1] — sensitive to outliers; use robust if outliers exist
ds = ds.numeric().scale.minmax()

# Median-centered, IQR-scaled — outlier-resistant (best for retail/supply chain data with spikes)
ds = ds.numeric().scale.robust()

# Natural log: log(x + 1) — reduces right skew in sales/revenue/count data
# Requires non-negative values; use clip(lower=0) first if needed
ds = ds.cols(["sales"]).scale.log()

# Yeo-Johnson power transform — handles negatives, finds optimal normalization automatically
ds = ds.numeric().scale.power()

# Best practice: scale features, leave target untouched
ds = ds.numeric().exclude(["sales"]).scale.robust()

Time-Series Features

These require .select() to have been called. Apply after cleaning and filling — lags computed on data with nulls will propagate NaNs into all derived features.

# Lag features — "what were sales k days ago?"
ds = ds.target().lags([1, 7, 14, 28])
# Creates: sales_lag1, sales_lag7, sales_lag14, sales_lag28

# Rolling mean — captures the recent trend (smoothed signal)
ds = ds.target().rolling.mean([7, 14, 30])
# Creates: sales_rollmea7, sales_rollmea14, sales_rollmea30

# Rolling std — measures volatility / demand uncertainty
ds = ds.target().rolling.std([7])
# Creates: sales_rollstd7

# Rolling min / max
ds = ds.target().rolling.min([7, 14])
ds = ds.target().rolling.max([7, 14])

# Exponential weighted mean — weights recent values more heavily
# span=7: recent 7 periods contribute ~63% of the total weight
ds = ds.target().ewm([7, 14])
# Creates: sales_ewm7, sales_ewm14

# Differencing — models the change rather than the level
ds = ds.target().diffs([1, 7])
# Creates: sales_diff1 (day-over-day), sales_diff7 (week-over-week)

Choosing lag values: Use multiples of your natural seasonal period. Daily/weekly data: [1, 7, 14, 28]. Monthly: [1, 3, 6, 12].

Calendar Features

ds = ds.datetime().features.calendar()

Creates the following columns (prefixed with the datetime column name, e.g., date_*):

Column Description Range
date_year Calendar year 2020, 2021, …
date_month Month 1–12
date_day Day of month 1–31
date_dow Day of week (0 = Monday) 0–6
date_week ISO week number 1–53
date_quarter Quarter 1–4
date_is_weekend 1 if Sat or Sun 0 or 1
date_sin_month Cyclical sin of month −1 … +1
date_cos_month Cyclical cos of month −1 … +1
date_sin_dow Cyclical sin of day-of-week −1 … +1
date_cos_dow Cyclical cos of day-of-week −1 … +1
date_days_to_easter Days until (+) or since (−) Easter integer
date_days_to_christmas Days until (+) or since (−) Christmas integer

Why cyclical encodings? Month 12 and month 1 are consecutive, but 12 − 1 = 11 implies they are far apart. The sin/cos encoding maps the cycle onto a unit circle so December and January are correctly adjacent.

Why holiday distances? A binary is_holiday flag misses the demand ramp-up before a holiday and the hangover after. The distance feature captures the temporal proximity effect.

Inspection

# Full summary: dtype, null count, null %, min, max, mean, unique count
summary = ds.inspect.summary()

# Only null information — sorted by null %
nulls = ds.inspect.nulls()

# Column types and inferred roles (target, datetime, group, feature)
types = ds.inspect.types()

# Memory usage per column
memory = ds.inspect.memory(verbose=False)
# Columns: column, KB, MB

Dataset Properties

len(ds)            # Number of rows
ds.shape           # Tuple (rows, cols)
ds.columns         # List of column names
ds.dtypes          # pandas Series of dtypes
ds.head(n=5)       # First n rows as pandas DataFrame
ds.to_dataframe()  # Full pandas DataFrame — use this when done chaining

ds.copy()
# Fully independent deep copy — mutations to the copy do not affect the original.
# Use before branching into two different preprocessing paths from the same base.

Pipeline

Every transformation is silently recorded. .to_pipeline() packages all steps into a serializable Pipeline that can be replayed on new data — guaranteeing that production preprocessing is identical to training.

from forecastlib.pipeline import Pipeline

pipeline = ds.to_pipeline()
pipeline.summary()
# Step 1: clean.fix_datetime on ['date']
# Step 2: fill.time_series on ['sales', 'price']
# Step 3: encode.one_hot on ['channel', 'region']
# Step 4: scale.robust on ['price', 'promo']
# Step 5: target.lags([1, 7, 14]) on sales
# Step 6: calendar on date

pipeline.save("models/sales_pipeline.pkl")

loaded = Pipeline.load("models/sales_pipeline.pkl")
print(f"{len(loaded.steps)} steps recorded")

Best practice: Save the pipeline alongside the trained model. At inference time, load both, apply the pipeline to raw incoming data, then pass the result to the model.

Train/Test Splitting

from forecastlib.time_series import TimeSeriesSplitter

splitter = TimeSeriesSplitter()

# Simple chronological split — NOT a random shuffle
train, test = splitter.train_test_split(ds, test_ratio=0.2)
df_train = train.to_dataframe()
df_test  = test.to_dataframe()

# Walk-forward expanding-window cross-validation
# Each fold: all data up to cutoff → train, next window → test
splitter_cv = TimeSeriesSplitter(n_splits=5)
for fold_n, (train_fold, test_fold) in enumerate(splitter_cv.split(ds)):
    df_train = train_fold.to_dataframe()
    df_test  = test_fold.to_dataframe()
    # train your model on df_train, evaluate on df_test

Walk-forward CV avoids look-ahead bias — standard k-fold randomly leaks future data into training, making models score unrealistically well on time-series problems.

Data Quality Validation

from forecastlib.time_series import TimeSeriesValidator

validator = TimeSeriesValidator()
report = validator.check(ds, datetime_col="date")

print(report.sorted)          # True if rows are chronologically ordered
print(report.has_gaps)        # True if time steps are missing (e.g., no row for 2024-03-15)
print(report.has_duplicates)  # True if the same (date, group) pair appears more than once

Run this before building lag features — if has_gaps is True, lag-1 will point to the wrong row.


Complete Example

from forecasting_core import ForecastEngine
from forecastlib.data import Loader
from forecastlib.pipeline import Pipeline
from forecastlib.time_series import TimeSeriesSplitter

# ── 1. Preprocess with forecastlib ───────────────────────────────────────────
ds = (
    Loader.from_csv("sales.csv")
    .select(target="sales", datetime="date", group="store")
    .clean.fix_datetime()
    .clean.drop_duplicates()
    .clean.sort()
    .fill.time_series()
    .categorical().clean.strip()
    .categorical().encode.auto()
    .numeric().exclude(["sales"]).scale.robust()
    .target().lags([1, 7, 14, 28])
    .target().rolling.mean([7, 14, 30])
    .target().rolling.std([7])
    .target().ewm([7, 14])
    .target().diffs([1, 7])
    .datetime().features.calendar()
)

pipeline = ds.to_pipeline()
pipeline.save("models/pipeline.pkl")
df = ds.to_dataframe()

# ── 2. Forecast with forecasting_core ────────────────────────────────────────
engine = (
    ForecastEngine()
    .load_data("sales.csv")
    .choose_columns(target="sales", date="date", sku="store")
    .configure_features(lags=[1, 7, 14], rolling=[7, 14], calendar=True)
    .configure_training(walk_forward=True, wfv_splits=3)
    .configure_forecast(horizon=14)
    .configure_business(service_level=0.95, lead_time_days=7)
    .select_models(["lightgbm", "prophet", "ets"])
    .train()
)

print(engine.get_metrics()["by_model"])
forecast = engine.predict(horizon=14)
inventory = engine.get_inventory_report()

engine.save("models/engine.joblib")

License

MIT — see LICENSE

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

faro_core-1.1.0.tar.gz (187.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

faro_core-1.1.0-py3-none-any.whl (164.5 kB view details)

Uploaded Python 3

File details

Details for the file faro_core-1.1.0.tar.gz.

File metadata

  • Download URL: faro_core-1.1.0.tar.gz
  • Upload date:
  • Size: 187.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.10

File hashes

Hashes for faro_core-1.1.0.tar.gz
Algorithm Hash digest
SHA256 9434fb416db3788bd371e06dd039e67e1dddf36a0f8e477bdc9c13e52eb83e8b
MD5 074dd53b1688ec4b8b86997f38f518a6
BLAKE2b-256 0800a024a46a2339161d468bc4745700faa7947973f87f83355828cbfa42b551

See more details on using hashes here.

File details

Details for the file faro_core-1.1.0-py3-none-any.whl.

File metadata

  • Download URL: faro_core-1.1.0-py3-none-any.whl
  • Upload date:
  • Size: 164.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.10

File hashes

Hashes for faro_core-1.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 cabc8375c322b843f4383fca5afdcd7c1ceda2fa4f75d328e7fc31a02dcad354
MD5 eb537d728d3c963dcd447d0570580432
BLAKE2b-256 68a3cc2271d50880c0f8669d8ffcb496bf6009c83aa41aa973eb7582cc917bff

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page