Jerry-Thomas: a stream-first, plugin-friendly data pipeline (mixology-themed CLI)
Project description
Datapipeline Runtime
Named after the famous bartender, Jerry Thomas is a time-series-first data pipeline runtime that mixes disparate data sources into fresh, ready-to-serve vectors using declarative YAML recipes. Everything is on-demand, iterator-first: data streams through the pipeline without pre-batching the whole dataset in memory. Like any good bartender, Jerry obsesses over quality control and service, offering stage-by-stage observability along the way. And no bar is complete without proper tools: deterministic artifacts and plugin scaffolding for custom loaders, parsers, transforms, and filters.
Contributing: PRs welcome on GitHub.
Core assumptions
- Every record carries a timezone-aware
timeattribute and a numericvalue. The time-zone awareness is a quality gate to ensure correct vector assembly.- Grouping is purely temporal. Dimensional splits belong in
partition_by.
Why You Might Use It
- Materialize canonical time-series datasets from disparate sources.
- Preview and debug each stage of the pipeline without writing ad-hoc scripts.
- Enforce coverage/quality gates and publish artifacts (schema, scaler stats) for downstream ML teams.
- Extend the runtime with entry-point driven plugins for domain-specific I/O or feature engineering.
- Consume vectors directly from Python via iterators, Pandas DataFrames, or
torch.utils.data.Dataset.
Quick Start
Serve The Demo Plugin (Recommended)
python -m pip install -U jerry-thomas
jerry demo init
python -m pip install -e demo
jerry serve --dataset demo --limit 3
Note: jerry demo init creates a workspace jerry.yaml. If you later run
jerry plugin init, it won’t overwrite that file. Remove or edit
jerry.yaml (or pass --project) to point at your new plugin.
For example: jerry serve --project lib/my-datapipeline/project.yaml.
Create Your Own Plugin + First Ingest
jerry plugin init my-datapipeline --out lib/
python -m pip install -e lib/my-datapipeline
# One-stop wizard: scaffolds source YAML + DTO/parser + domain + mapper + contract.
jerry inflow create
# Wizard tips (identity vs custom)
#
# - Parser:
# - Choose "Identity parser" when the loader already yields dicts/objects that match your DTO shape,
# and no type conversion is needed.
# - Choose "Create new parser" when you need to parse timestamps, coerce types, rename fields,
# or drop/validate rows.
#
# - Mapper:
# - Choose "Identity mapper" only when your DTO already is the final domain record shape (for example you might have used jerry to output interim datasets):
# `time` is timezone-aware and identity fields are present.
# - Otherwise, choose "Create new mapper" to map DTO -> domain record and add light derived fields.
#
# After scaffolding, you typically still need to:
# - Fill placeholders in `sources/*.yaml` (path/url/headers/etc.).
# - Reference your stream contract id in `dataset.yaml` under `record_stream: <contract_id>` and pick a `field` for each feature.
#
# Reinstall after commands that update entry points (pyproject.toml).
python -m pip install -e lib/my-datapipeline
jerry serve --dataset your-dataset --limit 3
Pipeline Stages (serve --stage)
Stages 0-6 operate on a single stream at a time (per feature/target config). Stages 7-8 assemble full vectors across all configured features.
-
Stage 0 (DTO stream)
- Input: raw source rows (loader transport + decoder)
- Ops: loader -> decoder -> parser (raw -> DTO; return None to drop rows)
- Output: DTO objects yielded by the parser
-
Stage 1 (record stream)
- Input: DTO stream
- Ops: mapper (DTO -> domain TemporalRecord)
- Output: TemporalRecord instances (must have timezone-aware
time)
-
Stage 2 (record transforms)
- Input: TemporalRecord stream
- Ops: contract
record:transforms (e.g. filter, floor_time); per-record only (no history) - Output: TemporalRecord stream (possibly filtered/mutated)
-
Stage 3 (ordered record stream)
- Input: TemporalRecord stream
- Ops:
- sort by
(partition_key, record.time)(batch/in-memory sort; typically the expensive step)
- sort by
- Output: TemporalRecord stream (sorted by partition,time)
-
Stage 4 (stream transforms)
- Input: ordered TemporalRecord stream
- Ops:
- apply contract
stream:transforms (per-partition history; e.g. ensure_cadence, rolling, fill) - apply contract
debug:transforms (validation only; e.g. lint)
- apply contract
- Output: TemporalRecord stream (sorted by partition,time)
-
Stage 5 (feature stream)
- Input: TemporalRecord stream
- Ops: wrap each record as
FeatureRecord(id, record, value);idis derived from:- dataset
id:(base feature id), and - optional
partition_by:fields (entity-specific feature ids) valueis selected fromdataset.yamlviafield: <record_attr>
- dataset
- Output: FeatureRecord stream (sorted by id,time within partitions)
-
Stage 6 (feature transforms)
- Input: FeatureRecord stream (sorted by id,time)
- Ops: dataset-level feature transforms configured per feature (e.g.
scale,sequence) - Output: FeatureRecord or FeatureRecordSequence
-
Stage 7 (vector assembly)
- Input: all features/targets after stage 6
- Ops:
- merge feature streams by time bucket (
group_by) - assemble
Vectorobjects (feature_id -> value or sequence) - assemble
Sample(key, features, targets) - if rectangular mode is on, align to the expected time window keys (missing buckets become empty vectors)
- merge feature streams by time bucket (
- Output: Sample stream (no postprocess, no split)
-
Stage 8 (postprocess)
- Input: Sample stream
- Ops:
- ensure vector schema (fill missing configured feature ids, drop extras)
- apply project
postprocess.yamlvector transforms
- Output: Sample stream (still not split)
Full run (no --stage)
- Runs stages 0-8, then applies the configured train/val/test split and optional throttling, then writes output.
Split timing (leakage note)
- Split is applied after stage 8 in
jerry serve(postprocess runs before split). - Feature engineering runs before split; keep it causal (no look-ahead, no future leakage).
- Scaler statistics are fit by the build task
scaler.yamland are typically restricted to thetrainsplit (configurable viasplit_label).
CLI Cheat Sheet
jerry demo init: scaffolds a standalone demo plugin at./demo/and wires ademodataset.jerry plugin init <name> --out lib/: scaffoldslib/<name>/(writes workspacejerry.yamlwhen missing).jerry.yaml: setsplugin_rootfor scaffolding commands anddatasets/default_datasetso you can omit--project/--dataset.jerry serve [--dataset <alias>|--project <path>] [--limit N] [--stage 0-8] [--skip-build]: streams output; builds required artifacts unless--skip-build.jerry build [--dataset <alias>|--project <path>] [--force]: materializes artifacts (schema, scaler, etc.).jerry inspect report|matrix|partitions [--dataset <alias>|--project <path>]: quality and metadata helpers.jerry inflow create: interactive wizard to scaffold an end-to-end ingest stream (source + parser/DTO + mapper + contract).jerry source create <provider>.<dataset> ...: scaffolds a source YAML (no Python code).jerry domain create <domain>: scaffolds a domain record stub.jerry dto create,jerry parser create,jerry mapper create,jerry loader create: scaffold Python code + register entry points (reinstall after).jerry contract create [--identity]: interactive contract scaffolder (YAML); use for canonical streams or composed streams.jerry list sources|domains|parsers|mappers|loaders|dtos: introspection helpers.pip install -e lib/<name>: rerun after commands that updatelib/<name>/pyproject.toml(entry points), or after manual edits to it.
MLOps & Reproducibility
jerry buildmaterializes deterministic artifacts (schema, scaler, metadata). Builds are keyed by config hashes and skip work when nothing changed unless you pass--force.jerry serveruns are named (task/run) and can write outputs to<out-path>/<run_name>/for auditing, sharing, or downstream training.- Versioning: tag the project config + plugin code in Git and pair with a data versioning tool like DVC for raw sources. With those inputs pinned, interim datasets and artifacts can be regenerated instead of stored.
Concepts
Workspace (jerry.yaml)
datasets: dataset aliases →project.yamlpaths (relative tojerry.yaml).default_dataset: which datasetjerry serve/build/inspectuse when you omit--dataset/--project.plugin_root: where scaffolding commands write Python code (src/<package>/...) and where they look forpyproject.toml.
Plugin Package (Python Code)
These live under lib/<plugin>/src/<package>/:
dtos/*.py: DTO models (raw source shapes).parsers/*.py: raw -> DTO parsers (referenced by source YAML via entry point).domains/<domain>/model.py: domain record models.mappers/*.py: DTO -> domain record mapping functions (referenced by contracts via entry point).loaders/*.py: optional custom loaders (fs/http usually use the built-in core loader).pyproject.toml: entry points for loaders/parsers/mappers/transforms (rerunpip install -e lib/<plugin>after changes).
Loaders & Parsers
- A loader yields raw rows (bytes/dicts) from some transport (FS/HTTP/synthetic/etc.).
- A parser turns each raw row into a typed DTO (or returns
Noneto drop a row). - In most projects, your source YAML uses the built-in loader
core.ioand you only customize itsargs(transport,format, and apath/url). - You typically only implement a custom loader when you need specialized behavior (auth/pagination/rate limits, proprietary formats, or non-standard protocols).
parser.argsare optional and only used when your parser supports configuration; many parsers don’t need any args since filtering etc is supported natively downstream.
DTOs & Domains
- A DTO (Data Transfer Object) mirrors a single source’s schema (columns/fields) and stays “raw-shaped”; it’s what parsers emit.
- A domain record is the canonical shape used across the pipeline. Mappers convert DTOs into domain records so multiple sources can land in the same domain model.
- The base time-series type is
TemporalRecord(time+ metadata fields). Domains add identity fields (e.g.symbol,station_id) that make filtering/partitioning meaningful. timemust be timezone-aware (normalized to UTC); feature values are selected from record fields indataset.yaml(seefield:); remaining fields act as the record’s “identity” (used by equality/deduping and commonly bypartition_by).
Transforms (Record → Stream → Feature → Vector)
- Record transforms run on raw canonical records before sorting or grouping (filters, time flooring, lagging). Each transform operates on one record at a time because order and partitions are not established yet. Configure in
contracts/*.yamlunderrecord:. - Stream transforms run on ordered, per-stream records after record transforms (dedupe, cadence enforcement, rolling fills). These operate across a sequence of records for a partition because they depend on sorted partition/time order and cadence. Configure in
contracts/*.yamlunderstream:. - Feature transforms run after stream regularization and shape the per-feature payload for vectorization (scalers, sequence/windowing). These occur after feature ids are finalized and payloads are wrapped. Configure in
dataset.yamlunder each feature. - Vector (postprocess) transforms operate on assembled vectors (coverage/drop/fill/replace). Configure in
postprocess.yaml. - Debug transforms run after stream transforms for validation only. Configure in
contracts/*.yamlunderdebug:. - Custom transforms are registered in your plugin
pyproject.tomlunder the matching entry-point group:datapipeline.transforms.recorddatapipeline.transforms.streamdatapipeline.transforms.featuredatapipeline.transforms.vectordatapipeline.transforms.debugThen reference them by name in the YAML.
Glossary
- Source alias:
sources/*.yaml:id(referenced by contracts undersource:). - Stream id:
contracts/*.yaml:id(referenced bydataset.yamlunderrecord_stream:). - Partition: dimension keys appended to feature IDs, driven by
contract.partition_by. - Group: vector “bucket” cadence set by
dataset.group_by(controls how records become samples). - Stage: debug/preview level for
jerry serve --stage 0-8(DTOs → domain records → features → vectors). - Fan-out: when multiple features reference the same
record_stream, the pipeline spools records to disk so each feature can read independently (records must be picklable).
Documentation
docs/config.md: config layout, resolution order, and YAML reference.docs/cli.md: CLI reference (beyond the cheat sheet).docs/transforms.md: built-in transforms and filters.docs/artifacts.md: artifacts, postprocess, and split timing.docs/python.md: Python API usage patterns.docs/extending.md: entry points and writing plugins.docs/architecture.md: pipeline diagrams.
Development
See CONTRIBUTING.md.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file jerry_thomas-2.0.0.tar.gz.
File metadata
- Download URL: jerry_thomas-2.0.0.tar.gz
- Upload date:
- Size: 158.1 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
0f0fdc04e0833d7e9133d21bf57a4de0fead4a385beaa13a72cfbc1c1ed1dc85
|
|
| MD5 |
9a6d14ab799d4d18a52d31b4ad0f67ec
|
|
| BLAKE2b-256 |
f7f86bed507d63d771da3cb824a94a3cbf7ff655993b0b7eddffe7624d8ec81a
|
Provenance
The following attestation bundles were made for jerry_thomas-2.0.0.tar.gz:
Publisher:
workflow.yml on mr-lovalova/datapipeline
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
jerry_thomas-2.0.0.tar.gz -
Subject digest:
0f0fdc04e0833d7e9133d21bf57a4de0fead4a385beaa13a72cfbc1c1ed1dc85 - Sigstore transparency entry: 924356177
- Sigstore integration time:
-
Permalink:
mr-lovalova/datapipeline@537126cf11c69283bcc6ad8a2c36b55c89cbc5fc -
Branch / Tag:
refs/tags/v2.0.0 - Owner: https://github.com/mr-lovalova
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
workflow.yml@537126cf11c69283bcc6ad8a2c36b55c89cbc5fc -
Trigger Event:
release
-
Statement type:
File details
Details for the file jerry_thomas-2.0.0-py3-none-any.whl.
File metadata
- Download URL: jerry_thomas-2.0.0-py3-none-any.whl
- Upload date:
- Size: 238.3 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
8e48909ea81d8284eb9ae999120af0d8178986bee81aadb9e88727823494a534
|
|
| MD5 |
ca362bb81a4160e1f5d7769ad7990033
|
|
| BLAKE2b-256 |
d01598127070c55a2762356c9aefab910e496c5056ecaaea95541b0d6e77db7a
|
Provenance
The following attestation bundles were made for jerry_thomas-2.0.0-py3-none-any.whl:
Publisher:
workflow.yml on mr-lovalova/datapipeline
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
jerry_thomas-2.0.0-py3-none-any.whl -
Subject digest:
8e48909ea81d8284eb9ae999120af0d8178986bee81aadb9e88727823494a534 - Sigstore transparency entry: 924356182
- Sigstore integration time:
-
Permalink:
mr-lovalova/datapipeline@537126cf11c69283bcc6ad8a2c36b55c89cbc5fc -
Branch / Tag:
refs/tags/v2.0.0 - Owner: https://github.com/mr-lovalova
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
workflow.yml@537126cf11c69283bcc6ad8a2c36b55c89cbc5fc -
Trigger Event:
release
-
Statement type: