Jerry-Thomas: a stream-first, plugin-friendly data pipeline (mixology-themed CLI)
Project description
Datapipeline Runtime
Jerry Thomas is a time-series-first data pipeline runtime. It turns declarative YAML projects into iterators that stream records, engineered features, and model-ready vectors. The CLI lets you preview every stage, build deterministic artifacts, inspect coverage, and scaffold plugins for custom loaders, parsers, transforms, and filters.
Core assumptions
- Every record carries a timezone-aware
timeattribute and a numericvalue.- Grouping is purely temporal. Dimensional splits belong in
partition_by.
Why You Might Use It
- Materialize canonical time-series datasets from disparate sources.
- Preview and debug each stage of the pipeline without writing ad-hoc scripts.
- Enforce coverage/quality gates and publish artifacts (expected IDs, scaler stats) for downstream ML teams.
- Extend the runtime with entry-point driven plugins for domain-specific I/O or feature engineering.
- Consume vectors directly from Python via iterators, Pandas DataFrames, or
torch.utils.data.Dataset.
Quick Start
# 1. Install in editable mode (with optional dev extras for testing).
pip install -e .[dev]
# 2. Bootstrap a project (scaffolds configs, plugin package, and templates).
jerry plugin init --name my_datapipeline --out .
# 3. Create a source & domain scaffold, then declare a canonical stream.
jerry source add --provider demo --dataset weather --transport fs --format csv
jerry domain add --domain weather
# (edit config/contracts/<alias>.yaml to point at your mapper and policies)
# 4. Configure dataset/postprocess/build files in config/datasets/<name>/.
# Then preview the pipeline and serve a few vectors:
jerry serve --project config/datasets/default/project.yaml --stage 2 --limit 5
jerry serve --project config/datasets/default/project.yaml --output print --limit 3
# 5. Inspect coverage and build artifacts:
jerry inspect report --project config/datasets/default/project.yaml
jerry build --project config/datasets/default/project.yaml
The skeleton project in src/datapipeline/templates/plugin_skeleton/ mirrors the
paths expected by the CLI. Copy it or run jerry plugin init to get a ready-made
layout with config/, src/<package>/, and entry-point stubs.
Pipeline Architecture
raw source ──▶ canonical stream ──▶ record stage ──▶ feature stage ──▶ vector stage
- Raw sources pair a loader with a parser. Loaders fetch bytes (file system,
HTTP, synthetic generators). Parsers turn those bytes into typed DTOs.
Register them via entry points (
loaders,parsers) and declaratively wire them inconfig/sources/*.yaml. - Canonical streams decorate raw sources with mappers and per-stream
policies. Contract files under
config/contracts/define record transforms, feature transforms, sort hints, and partitioning. - Record stage applies canonical policies to DTOs, turning them into
TemporalRecordinstances (tz-aware timestamp + numeric value). - Feature stage wraps records into
FeatureRecords, handles per-feature sorting, optional scaling, and sequence windows (FeatureRecordSequence). - Vector stage merges all feature streams, buckets them using
group_bycadence (e.g.,1h), and emits(group_key, Vector)pairs ready for downstream consumers.
The runtime (src/datapipeline/runtime.py) hosts registries for sources,
transforms, artifacts, and postprocess rules. The CLI constructs lightweight
PipelineContext objects to build iterators without mutating global state.
Configuration Files
All project configuration lives under config/datasets/<name>/ by default.
project.yaml
version: 1
name: default
paths:
streams: ../../contracts
sources: ../../sources
dataset: dataset.yaml
postprocess: postprocess.yaml
artifacts: ../../build/datasets/${project_name}
build: build.yaml
run: run.yaml
globals:
start_time: 2021-01-01T00:00:00Z
end_time: 2023-01-03T23:00:00Z
split:
mode: hash # hash | time
key: group # group | feature:<id>
seed: 42
ratios: { train: 0.8, val: 0.1, test: 0.1 }
nameprovides a stable identifier you can reuse inside config files via${project_name}.paths.*are resolved relative to the project file unless absolute; they also support${var}interpolation.globalsprovide values for${var}interpolation across YAML files. Datetime values are normalized to strict UTCYYYY-MM-DDTHH:MM:SSZ.splitconfig defines how labels are assigned; the active label is selected byrun.yamlor CLI--keep.paths.runmay point to a single file (default) or a directory. When it is a directory, every*.yamlfile inside is treated as a run config;jerry serveexecutes them sequentially in alphabetical order unless you pass--run <name>(filename stem).- Label names are free-form: match whatever keys you declare in
split.ratios(hash) orsplit.labels(time).
run.yaml
version: 1
keep: train # set to any label defined in globals.split (null disables filtering)
output: print # override to 'stream' or a .pt path for binary dumps
limit: 100 # cap vectors per serve run (null = unlimited)
include_targets: false
throttle_ms: null # sleep between vectors (milliseconds)
log_level: INFO # DEBUG=progress bars, INFO=spinner, WARNING=quiet (null inherits CLI)
keepselects the currently served split. This file is referenced byproject.paths.run.output,limit,include_targets,throttle_ms, andlog_levelprovide defaults forjerry serve; CLI flags still win per invocation.- Override
keep(and other fields) per invocation viajerry serve ... --keep valetc. - To manage multiple runs, point
project.paths.runat a folder (e.g.,config/datasets/default/runs/) and drop additional*.yamlfiles there.jerry servewill run each file in order; pass--run trainto execute onlyruns/train.yaml.
config/sources/<alias>.yaml
Each file defines a loader/parser pair exposed under <alias> (also the
source_id the rest of the pipeline references).
source_id: demo_weather
loader:
entrypoint: demo.csv_loader
args:
path: data/weather.csv
parser:
entrypoint: demo.weather_parser
args:
timezone: UTC
config/contracts/<alias>.yaml
Canonical stream contracts describe how the runtime should map and prepare a
source. alias normally matches the source alias; use folders to organize by
domain.
source_id: demo_weather
stream_id: demo_weather
mapper:
entrypoint: weather.domain.mapper
args: {}
partition_by: station
sort_batch_size: 50000
record:
- filter: { operator: ge, field: time, comparand: "${start_time}" }
- filter: { operator: lt, field: time, comparand: "${end_time}" }
- floor_time: { resolution: 10m }
stream:
- ensure_ticks: { tick: 10m }
- granularity: { mode: mean }
- fill: { statistic: median, window: 6, min_samples: 2 }
debug:
- lint: { mode: warn, tick: 10m }
record: ordered record-level transforms (filters, floor/lag, custom transforms registered under therecordentry-point group).stream: transforms applied after feature wrapping, still per base feature.debug: instrumentation-only transforms (linters, assertions).partition_by: optional keys used to suffix feature IDs (e.g.,temp__station=XYZ).sort_batch_size: chunk size used by the in-memory sorter when normalizing order before stream transforms.
dataset.yaml
Defines which canonical streams become features/targets and the vector bucketing.
group_by: 1h
features:
- id: temp_c
record_stream: demo_weather
scale: true
sequence: { size: 6, stride: 1, tick: 10m }
targets:
- id: precip
record_stream: demo_weather
group_bycontrols the cadence for vector partitioning (acceptsXm|min|Xh— minutes or hours).scale: trueinserts the standard scaler feature transform (requires scaler stats artifact or inline statistics).sequenceemitsFeatureRecordSequencewindows (size, stride, optional cadence enforcement viatick).
postprocess.yaml
Project-scoped vector transforms that run after assembly and before serving.
- drop_missing:
required: [temp_c__station=001]
min_coverage: 0.95
- fill_constant: { value: 0.0 }
- fill_history:
statistic: median
window: 48
min_samples: 6
- fill_horizontal:
statistic: mean
min_samples: 2
- Vector transforms rely on artifacts (expected IDs, scaler stats) to decide which features should be present.
- When no transforms are configured the stream passes through unchanged.
build.yaml
Declares which artifacts the build step should materialize.
version: 1
partitioned_ids:
output: expected.txt
include_targets: false
scaler:
enabled: true
output: scaler.pkl
include_targets: false
split_label: train
expected.txtlists every fully partitioned feature ID observed in the latest run (used by vector postprocess transforms).scaler.pklis a pickled standard scaler fitted on the requested split.
CLI Reference
All commands live under the jerry entry point (src/datapipeline/cli/app.py).
Pass --help on any command for flags.
Preview Stages
jerry serve --project <project.yaml> --stage <0-7> --limit N [--log-level LEVEL]- Stage 0: raw DTOs
- Stage 1: domain
TemporalRecords - Stage 2: record transforms applied
- Stage 3: feature records (before sort/regularization)
- Stage 4: feature regularization (post stream transforms)
- Stage 5: feature transforms/sequence outputs
- Stage 6: vectors assembled (no postprocess)
- Stage 7: vectors + postprocess transforms
- Use
--log-level DEBUGfor progress bars,--log-level INFOfor spinner + prints, or the default (WARNING) for minimal output.
jerry serve --project <project.yaml> --output print|stream|path.pt|path.csv|path.jsonl.gz --limit N [--include-targets] [--log-level LEVEL] [--run name]- Applies postprocess transforms and optional dataset split before emitting.
- Set
--log-level DEBUG(or setrun.yaml->log_level: DEBUG) to reuse the tqdm progress bars when previewing stages. - When
project.paths.runis a directory, add--run val(filename stem) to target a single config; otherwise every run file is executed sequentially. - Argument precedence: CLI flags > run.yaml > built‑in defaults.
Build & Quality
jerry inspect report --project <project.yaml> [--threshold 0.95] [--include-targets]- Prints coverage summary (keep/below lists) and writes
coverage.jsonunder the artifacts directory. - Add
--matrix csv|htmlto persist an availability matrix.
- Prints coverage summary (keep/below lists) and writes
jerry inspect partitions --project <project.yaml> [--include-targets]- Writes discovered partition suffixes to
partitions.json.
- Writes discovered partition suffixes to
jerry inspect expected --project <project.yaml> [--include-targets]- Writes the full set of observed feature IDs to
expected.txt.
- Writes the full set of observed feature IDs to
jerry build --project <project.yaml> [--force]- Regenerates artifacts declared in
build.yamlif configuration hash changed.
- Regenerates artifacts declared in
Scaffolding & Reference
jerry plugin init --name <package> --out <dir>- Generates a plugin project (pyproject, package skeleton, config templates).
jerry source add --provider <name> --dataset <slug> --transport fs|url|synthetic --format csv|json|json-lines- Creates loader/parser stubs, updates entry points, and drops a matching source YAML.
jerry domain add --domain <name>- Adds a
domains/<name>/package with amodel.pystub.
- Adds a
jerry filter create --name <identifier>- Scaffolds an entry-point-ready filter (helpful for custom record predicates).
jerry list sources|domains- Introspect configured source aliases or domain packages.
Transform & Filter Library
Record Filters (config/contracts[].record)
- Binary comparisons:
eq,ne,lt,le,gt,ge(timezone-aware for ISO or datetime literals). - Membership:
in,nin.- filter: { operator: ge, field: time, comparand: "${start_time}" } - filter: { operator: in, field: station, comparand: [a, b, c] }
Record Transforms
floor_time: snap timestamps down to the nearest resolution (10m,1h, …).lag: add lagged copies of records (seesrc/datapipeline/transforms/record/lag.pyfor options).
Stream (Feature) Transforms
ensure_ticks: backfill missing ticks withvalue=Nonerecords to enforce a strict cadence.granularity: merge duplicate timestamps usingfirst|last|mean|median.fill: rolling statistic-based imputation within each feature stream.- Custom transforms can be registered under the
streamentry-point group.
Feature Transforms
scale: wrapsStandardScalerTransform. Read statistics from the build artifact or accept inlinestatistics.scale: with_mean: true with_std: true statistics: temp_c__station=001: { mean: 10.3, std: 2.1 }
Sequence Transforms
sequence: sliding window generator (size,stride, optionaltickto enforce gaps). EmitsFeatureRecordSequencepayloads with.records.
Vector (Postprocess) Transforms
drop_missing: drop vectors that do not meet required IDs or coverage ratio.fill_constant: seed absent IDs with a constant.fill_history: impute using rolling statistics from prior vectors.fill_horizontal: aggregate sibling partitions in the same timestamp.
All transforms share a consistent entry-point signature and accept their config
dict as keyword arguments. Register new ones in pyproject.toml under the
appropriate group (record, stream, feature, sequence, vector,
filters, debug).
Artifacts & Postprocess
expected.txt: newline-delimited full feature IDs. Required by drop/fill transforms to know the target feature universe.scaler.pkl: pickled standard scaler fitted on the configured split. Loaded lazily by feature transforms at runtime.- Build state is tracked in
artifacts/build/state.json; config hashes avoid redundant runs.
If a postprocess transform needs an artifact and it is missing, the runtime will
raise a descriptive error suggesting jerry build.
Splitting & Serving
If project.globals.split is present, jerry serve filters vectors at the
end of the pipeline:
mode: hash– deterministic entity hash using either the group key or a specified feature ID.mode: time– boundary-based slicing using timestamp labels.run.keep(or CLI--keep) selects the active slice; use any label name defined in your split config.
The split configuration never mutates stored artifacts; it is only applied when serving vectors (either via CLI or the Python integrations).
Python Integrations
datapipeline.integrations.ml demonstrates how to reuse the runtime from
application code:
VectorAdapter.from_project(project_yaml)– bootstrap once, then stream vectors or row dicts.stream_vectors(project_yaml, limit=...)– iterator matchingjerry serve.iter_vector_rows/collect_vector_rows– handy for Pandas or custom sinks.dataframe_from_vectors– eager helper that returns a Pandas DataFrame (requirespandas).torch_dataset– builds atorch.utils.data.Datasetthat yields tensors. Seeexamples/minimal_project/run_torch.pyfor usage.
Extending the Runtime
Entry Points
Register custom components in your plugin’s pyproject.toml:
[project.entry-points."datapipeline.loaders"]
demo.csv_loader = "my_datapipeline.loaders.csv:CsvLoader"
[project.entry-points."datapipeline.parsers"]
demo.weather_parser = "my_datapipeline.parsers.weather:WeatherParser"
[project.entry-points."datapipeline.mappers"]
weather.domain.mapper = "my_datapipeline.mappers.weather:DomainMapper"
[project.entry-points."datapipeline.stream"]
weather.fill = "my_datapipeline.transforms.weather:CustomFill"
Loader, parser, mapper, and transform classes should provide a callable
interface (usually __call__) matching the runtime expectations. Refer to the
built-in implementations in src/datapipeline/sources/, src/datapipeline/transforms/,
and src/datapipeline/filters/.
Scaffolding Helpers
datapipeline.services.scaffold.plugin.scaffold_plugin– invoked byjerry plugin init.datapipeline.services.scaffold.source.create_source– writes loader/parser stubs and updates entry points.datapipeline.services.scaffold.domain.create_domain– domain DTO skeleton.datapipeline.services.scaffold.filter.create_filter– custom filter stub.datapipeline.services.scaffold.mappers.attach_source_to_domain– helper for programmatically wiring sources to domain mappers and emitting stream contracts (useful in custom automation or tests).
Development Workflow
- Install dependencies:
pip install -e .[dev]. - Run tests:
pytest. - When iterating on configs, use
jerry serve --stage <n>to peek into problematic stages. - After tuning transforms, refresh artifacts:
jerry build. - Use
jerry inspect report --include-targetsto ensure targets meet coverage gates before handing vectors to downstream consumers.
Additional Resources
src/datapipeline/analysis/vector_analyzer.py– quality metrics collected by the inspect commands.src/datapipeline/pipeline/– pure functions that wire each stage.src/datapipeline/services/bootstrap/– runtime initialization and registry population (seecore.py).examples/minimal_project/– runnable demo showing config layout and Torch integration.
Happy shipping! Build, inspect, and serve consistent time-series features with confidence.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file jerry_thomas-0.3.0.tar.gz.
File metadata
- Download URL: jerry_thomas-0.3.0.tar.gz
- Upload date:
- Size: 96.8 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
51d319eae0746c16a75d0df30fb6475949df36eddd486fa863aa5be5c291f8f6
|
|
| MD5 |
fdf5dfa2e57a51c6263b2fd3b4d9e43b
|
|
| BLAKE2b-256 |
2987917c27cfb8f5445defc9a654a59dfaff2caa7066b439c9a3a1a2bca4bc46
|
Provenance
The following attestation bundles were made for jerry_thomas-0.3.0.tar.gz:
Publisher:
workflow.yml on mr-lovalova/datapipeline
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
jerry_thomas-0.3.0.tar.gz -
Subject digest:
51d319eae0746c16a75d0df30fb6475949df36eddd486fa863aa5be5c291f8f6 - Sigstore transparency entry: 660764906
- Sigstore integration time:
-
Permalink:
mr-lovalova/datapipeline@6b868abec3900637879ee43dc9f3a9875c115f5a -
Branch / Tag:
refs/tags/v.0.3.0 - Owner: https://github.com/mr-lovalova
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
workflow.yml@6b868abec3900637879ee43dc9f3a9875c115f5a -
Trigger Event:
release
-
Statement type:
File details
Details for the file jerry_thomas-0.3.0-py3-none-any.whl.
File metadata
- Download URL: jerry_thomas-0.3.0-py3-none-any.whl
- Upload date:
- Size: 123.9 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
0e10d240a1f0c04e9fe0ac6919d29b4c7cea4da2266b18fffbf82f1fbe28223e
|
|
| MD5 |
f60b398104b3b36628b246f093bcfaf1
|
|
| BLAKE2b-256 |
b28ed79993f9fcce590af4619798460a4ec92307420ba264dd37e0b8882de5ca
|
Provenance
The following attestation bundles were made for jerry_thomas-0.3.0-py3-none-any.whl:
Publisher:
workflow.yml on mr-lovalova/datapipeline
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
jerry_thomas-0.3.0-py3-none-any.whl -
Subject digest:
0e10d240a1f0c04e9fe0ac6919d29b4c7cea4da2266b18fffbf82f1fbe28223e - Sigstore transparency entry: 660764907
- Sigstore integration time:
-
Permalink:
mr-lovalova/datapipeline@6b868abec3900637879ee43dc9f3a9875c115f5a -
Branch / Tag:
refs/tags/v.0.3.0 - Owner: https://github.com/mr-lovalova
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
workflow.yml@6b868abec3900637879ee43dc9f3a9875c115f5a -
Trigger Event:
release
-
Statement type: