Jerry-Thomas: a stream-first, plugin-friendly data pipeline (mixology-themed CLI)
Project description
Datapipeline Runtime
Jerry Thomas is a time-series-first data pipeline runtime. It turns declarative YAML projects into iterators that stream records, engineered features, and model-ready vectors. The CLI lets you preview every stage, build deterministic artifacts, inspect quality, and scaffold plugins for custom loaders, parsers, transforms, and filters.
Core assumptions
- Every record carries a timezone-aware
timeattribute and a numericvalue.- Grouping is purely temporal. Dimensional splits belong in
partition_by.
Why You Might Use It
- Materialize canonical time-series datasets from disparate sources.
- Preview and debug each stage of the pipeline without writing ad-hoc scripts.
- Enforce coverage/quality gates and publish artifacts (expected IDs, scaler stats) for downstream ML teams.
- Extend the runtime with entry-point driven plugins for domain-specific I/O or feature engineering.
- Consume vectors directly from Python via iterators, Pandas DataFrames, or
torch.utils.data.Dataset.
Quick Start
Serve The Example
pip install jerry-thomas
jerry plugin init my-datapipeline --out lib/
jerry serve --limit 3
Create Your Own Stream
Assumes you already ran jerry plugin init ... in this workspace (it writes jerry.yaml which the CLI uses for defaults and scaffolding paths).
These scaffolding commands write YAML into the dataset selected by default_dataset in jerry.yaml (example by default).
jerry source add demo weather -t fs -f csv
jerry domain add weather
jerry contract
pip install -e lib/my-datapipeline
CLI Cheat Sheet
jerry plugin init <name> --out lib/: scaffoldslib/<name>/and writes workspacejerry.yaml.jerry.yaml(created byplugin init): setsplugin_rootfor scaffolding commands anddatasets/default_datasetso you can omit--project/--dataset.jerry serve [--dataset <alias>|--project <path>] [--limit N] [--stage 0-7] [--skip-build]: streams output; builds required artifacts unless--skip-build.jerry build [--dataset <alias>|--project <path>] [--force]: materializes artifacts (schema, scaler, expected IDs, etc.).jerry inspect report|matrix|partitions|expected [--dataset <alias>|--project <path>]: quality and metadata helpers.jerry source add <provider> <dataset> -t fs|http|synthetic -f csv|json|json-lines|pickle [--identity]: scaffolds a source YAML and (unless--identity) a parser + entry point.jerry domain add <domain>: scaffolds domain models undersrc/<package>/domains/<domain>/.jerry contract [--identity]: interactive contract scaffolder; most users pick[1] Ingest (source → stream)(use[2] Composedfor derived streams, e.g. air_density from temp + pressure).pip install -e lib/<name>: rerun after commands that updatelib/<name>/pyproject.toml(entry points), or after manual edits to it.
Concepts
Workspace (jerry.yaml)
datasets: dataset aliases →project.yamlpaths (relative tojerry.yaml).default_dataset: which datasetjerry serve/build/inspectuse when you omit--dataset/--project.plugin_root: where scaffolding commands write Python code (src/<package>/...) and where they look forpyproject.toml.
Plugin Package (Python Code)
These live under lib/<plugin>/src/<package>/:
sources/<provider>/<dataset>/dto.py+parser.py: source DTO + parser (created byjerry source addunless--identity).domains/<domain>/model.py: domain records (created byjerry domain add).mappers/<provider>/<dataset>/to_<domain>.py: DTO → domain record mapping (usually created byjerry contract).pyproject.toml: entry points for loaders/parsers/mappers/transforms (rerunpip install -e lib/<plugin>after changes).
Loaders & Parsers
- A loader yields raw rows (bytes/dicts) from some transport (FS/HTTP/synthetic/etc.).
- A parser turns each raw row into a typed DTO (or returns
Noneto drop a row). - In most projects, your source YAML uses the built-in loader
core.ioand you only customize itsargs(transport,format, and apath/url). - You typically only implement a custom loader when you need specialized behavior (auth/pagination/rate limits, proprietary formats, or non-standard protocols).
parser.argsare optional and only used when your parser supports configuration; many parsers don’t need any args since filtering etc is supported natively downstream.
DTOs & Domains
- A DTO (Data Transfer Object) mirrors a single source’s schema (columns/fields) and stays “raw-shaped”; it’s what parsers emit.
- A domain record is the canonical shape used across the pipeline. Mappers convert DTOs into domain records so multiple sources can land in the same domain model.
- The base time-series type is
TemporalRecord(time+value). Domains typically add identity fields (e.g.symbol,station_id) that make filtering/partitioning meaningful. timemust be timezone-aware (normalized to UTC);valueis the measurement you engineer features from; all other fields act as the record’s “identity” (used by equality/deduping and commonly bypartition_by).
Glossary
- Source alias:
sources/*.yaml:id(referenced by contracts undersource:). - Stream id:
contracts/*.yaml:id(referenced bydataset.yamlunderrecord_stream:). - Partition: dimension keys appended to feature IDs, driven by
contract.partition_by. - Group: vector “bucket” cadence set by
dataset.group_by(controls how records become samples). - Stage: debug/preview level for
jerry serve --stage 0-7(DTOs → domain records → features → vectors).
Dataset Project (YAML Config)
These live under the dataset “project root” directory (the folder containing project.yaml):
project.yaml: paths + globals (single source of truth).sources/*.yaml: raw sources (loader + parser wiring).contracts/*.yaml: canonical streams (ingest or composed).dataset.yaml: feature/target declarations.postprocess.yaml: vector-level transforms.tasks/*.yaml: serve presets and artifact task configs.
Configuration & Resolution Order
Defaults are layered so you can set global preferences once, keep dataset/run
files focused on per-project behavior, and still override anything from the CLI.
For both jerry serve and jerry build, options are merged in the following
order (highest precedence first):
- CLI flags – anything you pass on the command line always wins.
- Project task files –
kind: servespecs (underproject.paths.tasks) supply serve defaults; artifact tasks in the same directory drivejerry build. jerry.yamlcommand blocks – settings underjerry.serveandjerry.build.jerry.yaml.shared– shared fallbacks for visuals/progress/log-level style settings.- Built-in defaults – runtime hard-coded defaults.
YAML Config Reference
All dataset configuration is rooted at a single project.yaml file. Other YAML files are discovered via project.paths.* (relative to project.yaml unless absolute).
project.yaml
version: 1
name: default
paths:
streams: ./contracts
sources: ./sources
dataset: dataset.yaml
postprocess: postprocess.yaml
artifacts: ../artifacts/${project_name}/v${version}
tasks: ./tasks
globals:
start_time: 2021-01-01T00:00:00Z
end_time: 2023-01-03T23:00:00Z
split:
mode: hash # hash | time
key: group # group | feature:<id>
seed: 42
ratios: { train: 0.8, val: 0.1, test: 0.1 }
nameprovides a stable identifier you can reuse inside config files via${project_name}.paths.*are resolved relative to the project file unless absolute; they also support${var}interpolation.globalsprovide values for${var}interpolation across YAML files. Datetime values are normalized to strict UTCYYYY-MM-DDTHH:MM:SSZ.splitconfig defines how labels are assigned; serve tasks or CLI flags pick the active label viakeep.paths.taskspoints to a directory of task specs. Each*.yamlfile declareskind: ...(scaler,schema,metadata,serve, …). Artifact tasks drivejerry build; command tasks (currentlykind: serve) provide presets forjerry serve. When multiple serve tasks exist,jerry serve --run <name>selects byname/filename stem.- Label names are free-form: match whatever keys you declare in
split.ratios(hash) orsplit.labels(time).
Serve Tasks (tasks/serve.<name>.yaml)
kind: serve
name: train # defaults to filename stem when omitted
keep: train # select active split label (null disables filtering)
output:
transport: stdout # stdout | fs
format: print # print | json-lines | json | csv | pickle
limit: 100 # cap vectors per serve run (null = unlimited)
throttle_ms: null # milliseconds to sleep between emitted vectors
# Optional overrides:
# log_level: INFO # DEBUG=progress bars, INFO=spinner, WARNING=quiet
# visuals: AUTO # AUTO | TQDM | RICH | OFF
# progress: AUTO # AUTO | SPINNER | BARS | OFF
- Each serve task lives alongside artifact tasks under
paths.tasks. Files are independent—no special directory structure required. output,limit,throttle_ms, andlog_levelprovide defaults forjerry serve; CLI flags still win per invocation (see Configuration & Resolution Order). For filesystem outputs, settransport: fs,directory: /path/to/root, and omit file names—each run automatically writes to<directory>/<run_name>/<run_name>.<ext>unless you override the entireoutputblock with a customfilename.- Override
keep(and other fields) per invocation viajerry serve ... --keep valetc. - Visuals backend: set
visuals: AUTO|TQDM|RICH|OFFin the task or use--visuals. Pair withprogress: AUTO|SPINNER|BARS|OFFor--progressto control progress layouts. - Add additional
kind: servefiles to the tasks directory for other splits (val/test/etc.);jerry serveruns each enabled file unless you pass--run <name>. - Use
jerry.yamlnext to the project or workspace root to define shared defaults (visuals/progress/log level/output); CLI flags still take precedence.
Workspace Defaults (jerry.yaml)
Create an optional jerry.yaml in the directory where you run the CLI to share settings across commands. The CLI walks up from the current working directory to find the first jerry.yaml.
plugin_root: lib/my-datapipeline # plugin workspace (relative to this file)
# Dataset aliases for --dataset; values may be dirs (auto-append project.yaml).
datasets:
example: lib/my-datapipeline/example/project.yaml
default_dataset: example
shared:
visuals: AUTO # AUTO | TQDM | RICH | OFF
progress: BARS # AUTO | SPINNER | BARS | OFF
log_level: INFO
serve:
limit: null
stage: null
output:
transport: stdout
format: print # print | json-lines | json | csv | pickle
# directory: artifacts/serve # Required when transport=fs
build:
mode: AUTO # AUTO | FORCE | OFF
jerry.yaml sits near the root of your workspace, while dataset-specific overrides still live in individual tasks/serve.*.yaml files as needed.
<project_root>/sources/<alias>.yaml
Each file defines a loader/parser pair exposed under <alias>. Files may live in nested
subdirectories under <project_root>/sources/; discovery is recursive.
# Source identifier (commonly `provider.dataset`). Contracts reference this under `source:`.
id: stooq.ohlcv
parser:
# Parser entry point name (registered in your plugin’s pyproject.toml).
entrypoint: stooq.ohlcv
loader:
# Most common loader: core.io (supports fs/http via args.transport + args.format).
entrypoint: core.io
args:
transport: http
format: csv
url: "https://stooq.com/q/d/l/?s=aapl.us&i=d"
id: the source alias; referenced by contracts undersource:.parser.entrypoint: which parser to use;parser.argsare optional.loader.entrypoint: which loader to use;core.iois the default for fs/http and is configured vialoader.args.
Fan-out Sources (core.foreach)
Use core.foreach to expand any inner loader spec across a list without duplicating YAML. It interpolates string args and optionally injects the foreach value into each row.
loader:
entrypoint: core.foreach
args:
foreach:
symbol: [AAPL, MSFT]
inject_field: symbol
loader:
entrypoint: core.io
args:
transport: http
format: csv
url: "https://stooq.com/q/d/l/?s=${symbol}&i=d"
<project_root>/contracts/<stream_id>.yaml
Canonical stream contracts describe how the runtime should map and prepare a raw source. Use folders to organize by domain if you like.
kind: ingest
id: equity.ohlcv # stream identifier (domain.dataset[.variant])
source: stooq.ohlcv # references sources/<alias>.yaml:id
mapper:
entrypoint: equity.ohlcv
args: {}
partition_by: station
sort_batch_size: 50000
record:
- filter: { operator: ge, field: time, comparand: "${start_time}" }
- filter: { operator: lt, field: time, comparand: "${end_time}" }
- floor_time: { cadence: 10m }
stream:
- ensure_cadence: { cadence: 10m }
- granularity: { mode: mean }
- fill: { statistic: median, window: 6, min_samples: 2 }
debug:
- lint: { mode: warn, tick: 10m }
record: ordered record-level transforms (filters, floor/lag, custom transforms registered under therecordentry-point group).stream: transforms applied after feature wrapping, still per base feature.debug: instrumentation-only transforms (linters, assertions).partition_by: optional keys used to suffix feature IDs (e.g.,temp__@station_id:XYZ).sort_batch_size: chunk size used by the in-memory sorter when normalizing order before stream transforms.
Composed Streams (Engineered Domains)
Define engineered streams that depend on other canonical streams directly in contracts. The runtime builds each input to stage 4 (ordered + regularized), stream‑aligns by partition + timestamp, runs your composer, and emits fresh records for the derived stream.
# <project_root>/contracts/air_density.processed.yaml
kind: composed
id: air_density.processed
inputs:
- pressure.processed
- t=temp_dry.processed
partition_by: station_id
sort_batch_size: 20000
mapper:
# Function or class via dotted path; entry points optional
entrypoint: mypkg.domains.air_density:compose_to_record
args:
driver: pressure.processed # optional; defaults to first input
# Optional post‑compose policies (run after composition like any stream)
# record: [...]
# stream: [...]
# debug: [...]
Dataset stays minimal — features only reference the composed stream:
# dataset.yaml
group_by: 1h
features:
- id: air_density
record_stream: air_density.processed
Notes:
- Inputs always reference canonical stream_ids (not raw sources).
- The composed source outputs records; its own
record/stream/debugrules still apply afterward. - Partitioning for the engineered domain is explicit via
partition_byon the composed contract.
dataset.yaml
Defines which canonical streams become features/targets and the vector bucketing.
group_by: 1h
features:
- id: close
record_stream: equity.ohlcv
scale: true
sequence: { size: 6, stride: 1 }
targets:
- id: returns_1d
record_stream: equity.ohlcv
group_bycontrols the cadence for vector partitioning (acceptsXm|min|Xh— minutes or hours).scale: trueinserts the standard scaler feature transform (requires scaler stats artifact or inline statistics).- Downstream consumers can load the
scaler.pklartifact and callStandardScaler.inverse_transform(orStandardScalerTransform.inverse) to undo scaling.
- Downstream consumers can load the
sequenceemitsFeatureRecordSequencewindows (size, stride, optional cadence enforcement viatick).
postprocess.yaml
Project-scoped vector transforms that run after assembly and before serving.
- drop:
axis: horizontal
payload: features
threshold: 0.95
- fill:
statistic: median
window: 48
min_samples: 6
- replace:
payload: targets
value: 0.0
- Each transform receives a
Sample; setpayload: targetswhen you want to mutate label vectors, otherwise the feature vector is used. - Vector transforms rely on the schema artifact (for expected IDs/cadence) and scaler stats when scaling is enabled. When no transforms are configured the stream passes through unchanged.
Task Specs (tasks/*.yaml)
Declare artifact and command tasks under project.paths.tasks (default tasks/).
Artifact specs are optional; if you omit them, Jerry falls back to built-in defaults.
Add a YAML file only when you need to override paths or other parameters.
tasks/scaler.yaml
kind: scaler
output: scaler.pkl
split_label: train
enabled: true
scaler.pklis a pickled standard scaler fitted on the requested split.schema.json(from theschematask) enumerates the discovered feature/target identifiers (including partitions), their kinds (scalar/list), and cadence hints used to enforce ordering downstream.- Configure the
schematask to choose a cadence strategy (currentlymax). Per-feature overrides will be added later; for now every list-valued feature records the max observed length as its enforcement target.
- Configure the
metadata.json(from themetadatatask) captures heavier statistics—present/null counts, inferred value types, list-length histograms, per-partition timestamps, and the dataset window. Configuremetadata.window_modewithunion|intersection|strict|relaxed(defaultintersection) to control how start/end bounds are derived.unionconsiders base features,intersectionuses their overlap,strictintersects every partition, andrelaxedunions partitions independently.- Command tasks (
kind: serve) live alongside artifact tasks;jerry servereads them directly. - Shared run/build defaults (visuals/progress/log level/build mode) live in
jerry.yaml.
CLI Reference
All commands live under the jerry entry point (src/datapipeline/cli/app.py).
Pass --help on any command for flags.
All commands that take a project accept either --project <path/to/project.yaml> or --dataset <alias> (from jerry.yaml datasets:).
Preview Stages
jerry serve --project <project.yaml> --stage <0-7> --limit N [--log-level LEVEL] [--visuals auto|tqdm|rich|off] [--progress auto|spinner|bars|off]- Stage 0: raw DTOs
- Stage 1: domain
TemporalRecords - Stage 2: record transforms applied
- Stage 3: feature records (before sort/regularization)
- Stage 4: feature regularization (post stream transforms)
- Stage 5: feature transforms/sequence outputs
- Stage 6: vectors assembled (no postprocess)
- Stage 7: vectors + postprocess transforms
- Use
--log-level DEBUGfor progress bars,--log-level INFOfor spinner + prints, or the default (WARNING) for minimal output. - Ensures build artifacts are current before streaming; the build step only runs when the configuration hash changes unless you pass
--stage0-5 (auto-skip) or opt out with--skip-build.
jerry serve --project <project.yaml> --out-transport stdout --out-format json-lines --limit N [--include-targets] [--log-level LEVEL] [--visuals ...] [--progress ...] [--run name]- Applies postprocess transforms and optional dataset split before emitting.
- Use
--out-transport fs --out-format json-lines --out-path build/serve(orcsv,pickle, etc.) to write artifacts to disk instead of stdout; files land under<out-path>/<run_name>/.
--out-payload vectoremits only the vector payload with features/targets flattened into schema-ordered lists (no identifier keys) when you don't need the group key or metadata. Default issample.- Set
--log-level DEBUG(or set your serve tasklog_level: DEBUG) to reuse the tqdm progress bars when previewing stages. - When multiple serve tasks exist, add
--run val(task name or filename stem) to target a single config; otherwise every enabled task is executed sequentially. - Argument precedence follows the order described under Configuration & Resolution Order.
- Combine with
--skip-buildwhen you already have fresh artifacts and want to jump straight into streaming.
- Set
Build & Quality
jerry inspect report --project <project.yaml> [--threshold 0.95] [--include-targets]- Prints coverage summary (keep/below lists) and writes
coverage.jsonunder the artifacts directory. - Add
--matrix csv|htmlto persist an availability matrix.
- Prints coverage summary (keep/below lists) and writes
jerry inspect partitions --project <project.yaml> [--include-targets]- Writes discovered partition suffixes to
partitions.json.
- Writes discovered partition suffixes to
jerry inspect expected --project <project.yaml> [--include-targets]- Writes the full set of observed feature IDs to
expected.txt(for external tooling; runtime usesschema.json).
- Writes the full set of observed feature IDs to
jerry build --project <project.yaml> [--force] [--visuals ...] [--progress ...]- Regenerates artifact tasks declared under
project.paths.taskswhen the configuration hash changes.
- Regenerates artifact tasks declared under
Scaffolding & Reference
jerry plugin init <package> --out <dir>(also supports-n/--name)- Generates a plugin project (pyproject, package skeleton, config templates).
jerry source add <provider> <dataset> --transport fs|http|synthetic --format csv|json|json-lines|pickle- Also supports
<provider>.<dataset>via--aliasor as the first positional - Flag form remains available:
--provider/--dataset - Creates loader/parser stubs, updates entry points, and drops a matching source YAML.
- Also supports
jerry domain add <name>(also supports-n/--name)- Adds a
domains/<name>/package with amodel.pystub.
- Adds a
jerry filter create --name <identifier>- Scaffolds an entry-point-ready filter (helpful for custom record predicates).
jerry list sources|domains- Introspect configured source aliases or domain packages.
Transform & Filter Library
Record Filters (<project_root>/contracts/*.yaml:record)
- Binary comparisons:
eq,ne,lt,le,gt,ge(timezone-aware for ISO or datetime literals). - Membership:
in,nin.- filter: { operator: ge, field: time, comparand: "${start_time}" } - filter: { operator: in, field: station, comparand: [a, b, c] }
Record Transforms
floor_time: snap timestamps down to the nearest cadence (10m,1h, …).lag: add lagged copies of records (seesrc/datapipeline/transforms/record/lag.pyfor options).
Stream (Feature) Transforms
ensure_cadence: backfill missing ticks withvalue=Nonerecords to enforce a strict cadence.granularity: merge duplicate timestamps usingfirst|last|mean|median.dedupe: drop exact duplicate records (same id, timestamp, and payload) from an already sorted feature stream.fill: rolling statistic-based imputation within each feature stream.- Custom transforms can be registered under the
streamentry-point group.
Feature Transforms
scale: wrapsStandardScalerTransform. Read statistics from the build artifact or accept inlinestatistics.scale: with_mean: true with_std: true statistics: temp_c__station=001: { mean: 10.3, std: 2.1 }
Sequence Transforms
sequence: sliding window generator (size,stride, optionalcadenceto enforce contiguous windows). EmitsFeatureRecordSequencepayloads with.records.
Vector (Postprocess) Transforms
drop: apply coverage thresholds along the horizontal axis (vectors) or vertical axis (features/partitions) usingaxis: horizontal|verticalandthreshold. Vertical mode requires the optionalmetadata.jsonartifact and internally prunes weak partitions.fill: impute using rolling statistics from prior vectors (history-based).replace: seed missing IDs with a constant or literal value. (Jerry automatically enforces theschema.jsonvector schema—ordering + cadence—before any configured vector transforms run.)
All transforms share a consistent entry-point signature and accept their config
dict as keyword arguments. Register new ones in pyproject.toml under the
appropriate group (record, stream, feature, sequence, vector,
filters, debug).
Artifacts & Postprocess
expected.txt: newline-delimited full feature IDs, generated on demand viajerry inspect expected. Not required at runtime; transforms derive the expected universe fromschema.json.schema.json: output of theschematask. Jerry automatically enforces this schema during postprocess to impose deterministic ordering and list cadence metadata (targets appear whenever the dataset defines them). Window metadata now lives inmetadata.json.scaler.pkl: pickled standard scaler fitted on the configured split. Loaded lazily by feature transforms at runtime.- Build state is tracked in
artifacts/build/state.json; config hashes avoid redundant runs.
If a postprocess transform needs an artifact and it is missing, the runtime will
raise a descriptive error suggesting jerry build.
Splitting & Serving
If project.globals.split is present, jerry serve filters vectors at the
end of the pipeline:
mode: hash– deterministic entity hash using either the group key or a specified feature ID.mode: time– boundary-based slicing using timestamp labels.run.keep(or CLI--keep) selects the active slice; use any label name defined in your split config.
The split configuration never mutates stored artifacts; it is only applied when serving vectors (either via CLI or the Python integrations).
Python Integrations
datapipeline.integrations.ml demonstrates how to reuse the runtime from
application code:
VectorAdapter.from_project(project_yaml)– bootstrap once, then stream vectors or row dicts.stream_vectors(project_yaml, limit=...)– iterator matchingjerry serve.iter_vector_rows/collect_vector_rows– handy for Pandas or custom sinks.dataframe_from_vectors– eager helper that returns a Pandas DataFrame (requirespandas).torch_dataset– builds atorch.utils.data.Datasetthat yields tensors. Seeexamples/minimal_project/run_torch.pyfor usage.
Extending the Runtime
Entry Points
Register custom components in your plugin’s pyproject.toml:
[project.entry-points."datapipeline.loaders"]
demo.csv_loader = "my_datapipeline.loaders.csv:CsvLoader"
[project.entry-points."datapipeline.parsers"]
demo.weather_parser = "my_datapipeline.parsers.weather:WeatherParser"
[project.entry-points."datapipeline.mappers"]
time.ticks = "my_datapipeline.mappers.synthetic.ticks:map"
[project.entry-points."datapipeline.stream"]
weather.fill = "my_datapipeline.transforms.weather:CustomFill"
Loader, parser, mapper, and transform classes should provide a callable
interface (usually __call__) matching the runtime expectations. Refer to the
built-in implementations in src/datapipeline/sources/, src/datapipeline/transforms/,
and src/datapipeline/filters/.
Scaffolding Helpers
datapipeline.services.scaffold.plugin.scaffold_plugin– invoked byjerry plugin init.datapipeline.services.scaffold.source.create_source– writes loader/parser stubs and updates entry points.datapipeline.services.scaffold.domain.create_domain– domain record skeleton.datapipeline.services.scaffold.filter.create_filter– custom filter stub.datapipeline.services.scaffold.mappers.attach_source_to_domain– helper for programmatically wiring sources to domain mappers and emitting stream contracts (useful in custom automation or tests).
Development Workflow
- Install dependencies:
pip install -e .[dev]. - Run tests:
pytest. - When iterating on configs, use
jerry serve --stage <n>to peek into problematic stages. - After tuning transforms, refresh artifacts:
jerry build. - Use
jerry inspect report --include-targetsto ensure targets meet coverage gates before handing vectors to downstream consumers.
Additional Resources
src/datapipeline/analysis/vector_analyzer.py– quality metrics collected by the inspect commands.src/datapipeline/pipeline/– pure functions that wire each stage.src/datapipeline/services/bootstrap/– runtime initialization and registry population (seecore.py).examples/minimal_project/– runnable demo showing config layout and Torch integration.
Pipeline Architecture (WIP)
raw source ──▶ loader/parser DTOs ──▶ canonical stream ──▶ record policies
└──▶ feature wrapping ──▶ stream regularization ──▶ feature transforms/sequence
└──▶ vector assembly ──▶ postprocess transforms
- Loader/parser (Stage 0) – raw bytes become typed DTOs. Loaders fetch from
FS/HTTP/synthetic sources; parsers map bytes to DTOs. Register them via entry
points (
loaders,parsers) and wire them in<project_root>/sources/*.yaml. - Canonical stream mapping (Stage 1) – mappers attach domain semantics and
partition keys, producing domain
TemporalRecords. - Record policies (Stage 2) – contract
recordrules (filters, floor, lag) prune and normalize DTO-derived records. - Feature wrapping (Stage 3) – records become
FeatureRecords before sort/regularization. - Stream regularization (Stage 4) – contract
streamrules ensure cadence, deduplicate timestamps, and impute where needed. - Feature transforms/sequence (Stage 5) – dataset transforms (scale, sequence windows) produce per-feature tensors or windows.
- Vector assembly (Stage 6) – features merge by
group_bycadence into(group_key, Vector)pairs, prior to postprocess tweaks. - Postprocess (Stage 7) – optional vector transforms (fill/drop/etc.) run before results are emitted to the configured output.
Visual Flowchart
flowchart TB
subgraph CLI & Project config
cliSource[jerry source add]
cliDomain[jerry domain add]
cliContract[jerry contract]
cliServe[jerry serve]
project[[project.yaml]]
sourcesCfg[sources/*.yaml]
contractsCfg[contracts/*.yaml]
datasetCfg[dataset.yaml]
postprocessCfg[postprocess.yaml]
end
cliSource --> sourcesCfg
cliDomain --> domainPkg
cliContract --> contractsCfg
cliServe --> vectorSamples
project -.->|paths.sources| sourcesCfg
project -.->|paths.streams| contractsCfg
project -.->|paths.dataset| datasetCfg
project -.->|paths.postprocess| postprocessCfg
subgraph Plugin code
domainPkg[domains/*]
mappersPkg[mappers/*]
end
cliContract --> mappersPkg
domainPkg -. domain models .-> mappersPkg
subgraph Registries
registrySources[sources]
registryStreamSources[stream_sources]
registryMappers[mappers]
registryRecordOps[record_ops]
registryStreamOps[stream_ops]
registryDebugOps[debug_ops]
end
subgraph Source wiring
rawData[(external data)]
transportSpec[transport + format]
loaderEP[loader ep]
parserEP[parser ep]
sourceArgs[loader args]
sourceNode[Source]
dtoStream[(DTOs)]
end
sourcesCfg --> transportSpec
sourcesCfg --> loaderEP
sourcesCfg --> parserEP
sourcesCfg --> sourceArgs
transportSpec -. select fs/http/synth .-> loaderEP
loaderEP -. build loader .-> sourceNode
parserEP -. build parser .-> sourceNode
sourceArgs -. paths/creds .-> sourceNode
rawData --> sourceNode --> dtoStream
sourcesCfg -. build_source_from_spec .-> registrySources
contractsCfg -. stream_id + source .-> registryStreamSources
registrySources -. alias -> Source .-> registryStreamSources
subgraph Canonical stream
mapperEP[mapper ep]
recordRules[record rules]
streamRules[stream rules]
debugRules[debug rules]
canonical[DTO -> record]
domainRecords((TemporalRecord))
recordStage[record xforms]
featureWrap[record -> feature]
featureRecords((FeatureRecord))
regularization[stream xforms]
end
dtoStream --> canonical --> domainRecords --> recordStage --> featureWrap --> featureRecords --> regularization
contractsCfg --> mapperEP
mappersPkg -. ep target .-> mapperEP
mapperEP -. build_mapper_from_spec .-> registryMappers
registryMappers --> canonical
contractsCfg --> recordRules
contractsCfg --> streamRules
contractsCfg --> debugRules
registryRecordOps --> recordRules
registryStreamOps --> streamRules
registryDebugOps --> debugRules
recordRules --> recordStage
streamRules --> regularization
debugRules --> regularization
subgraph Dataset shaping
featureSpec[feature cfg]
groupBySpec[group_by]
streamRefs[record_stream ids]
featureTrans[feature/seq xforms]
sequenceStream((seq/features))
vectorStage[vector assembly]
vectorSamples((samples))
end
datasetCfg --> featureSpec
datasetCfg --> groupBySpec
datasetCfg --> streamRefs
streamRefs -.->|build_feature_pipeline| registryStreamSources
registryStreamSources -.->|open_source_stream| sourceNode
featureRecords --> regularization --> featureTrans --> sequenceStream --> vectorStage --> vectorSamples
featureSpec -. scale/sequence .-> featureTrans
groupBySpec -. cadence .-> vectorStage
subgraph Postprocess
vectorTransforms[vector xforms]
postprocessNode[postprocess]
end
postprocessCfg --> vectorTransforms -. drop/fill .-> postprocessNode
vectorStage --> postprocessNode
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file jerry_thomas-1.0.3.tar.gz.
File metadata
- Download URL: jerry_thomas-1.0.3.tar.gz
- Upload date:
- Size: 165.8 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
6c8ef417131d7c4ad36578aa99bbafe976c5300fa8c1da5380d29d8d30a47048
|
|
| MD5 |
a6cb7f7a65f2c13ad2ed6a9800c2499a
|
|
| BLAKE2b-256 |
6fd650dd53d6b5a926fe8383c5c01940756e56aae2b615e4e9f82b17545a3dff
|
Provenance
The following attestation bundles were made for jerry_thomas-1.0.3.tar.gz:
Publisher:
workflow.yml on mr-lovalova/datapipeline
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
jerry_thomas-1.0.3.tar.gz -
Subject digest:
6c8ef417131d7c4ad36578aa99bbafe976c5300fa8c1da5380d29d8d30a47048 - Sigstore transparency entry: 789685740
- Sigstore integration time:
-
Permalink:
mr-lovalova/datapipeline@b181d0a8308852e026a97eb9af8343285e75fd3a -
Branch / Tag:
refs/tags/v1.0.3 - Owner: https://github.com/mr-lovalova
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
workflow.yml@b181d0a8308852e026a97eb9af8343285e75fd3a -
Trigger Event:
release
-
Statement type:
File details
Details for the file jerry_thomas-1.0.3-py3-none-any.whl.
File metadata
- Download URL: jerry_thomas-1.0.3-py3-none-any.whl
- Upload date:
- Size: 205.5 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
e4f46372cb57c89c1681c0d5adab7e7eb25be66518ca6739bf2edd34c9a1ee6e
|
|
| MD5 |
c44fddd807c99cc0ddccb99ad78cfbb5
|
|
| BLAKE2b-256 |
67f0577656f93cc7d1ed2684559f87333110b4d17a38b9aa394b44c1eabe0ba8
|
Provenance
The following attestation bundles were made for jerry_thomas-1.0.3-py3-none-any.whl:
Publisher:
workflow.yml on mr-lovalova/datapipeline
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
jerry_thomas-1.0.3-py3-none-any.whl -
Subject digest:
e4f46372cb57c89c1681c0d5adab7e7eb25be66518ca6739bf2edd34c9a1ee6e - Sigstore transparency entry: 789685741
- Sigstore integration time:
-
Permalink:
mr-lovalova/datapipeline@b181d0a8308852e026a97eb9af8343285e75fd3a -
Branch / Tag:
refs/tags/v1.0.3 - Owner: https://github.com/mr-lovalova
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
workflow.yml@b181d0a8308852e026a97eb9af8343285e75fd3a -
Trigger Event:
release
-
Statement type: