Turn zipped fixed-width (FWF) trade microdata into Hive-partitioned Parquet datasets.
Project description
microtrade
Turn monthly drops of zipped fixed-width (FWF) trade microdata into Hive-partitioned Parquet datasets, one per trade type.
microtrade streams each raw zip directly from its archive (no extraction,
bounded memory), slices columns according to a versioned YAML spec, and writes
year=YYYY/month=MM/part-0.parquet atomically under a per-type dataset root.
Monthly runs reprocess all months YTD of the current year; prior years are
frozen.
Raw filenames don't have to follow any fixed convention — each committed spec
carries its own filename_pattern regex, so workbooks from different upstream
generations (SHEET002_202404N.TXT.zip, XYZ12345_Im202404.zip, etc.) can
coexist as long as each pattern captures year and month (and optionally
flag) from the filename.
Three trade types are supported, each with its own distinct schema:
importsexports_usexports_nonus
Requirements
- Python 3.12+
- uv for environment and dependency management
Install
uv sync
This resolves and installs runtime + dev dependencies into .venv/ based on
pyproject.toml and uv.lock.
Usage
Write a project config (microtrade.yaml)
The authoritative schema lives in an Excel workbook, but the wiring
between workbooks, trade types, period windows, and raw filenames lives in
a project config. microtrade import-spec reads it to produce each YAML
spec; the ingest pipeline never touches this file.
# microtrade.yaml
workbooks:
XYZ12345_Record_Layout.xls:
workbook_id: XYZ12345 # optional; defaults to filename prefix
effective_from: 2020-01
effective_to: 2023-12 # optional; absent = open-ended
sheets:
Imports:
trade_type: imports # optional; defaults to positional (sheet index -> TRADE_TYPES)
filename_pattern: '^XYZ12345_Im(?P<year>\d{4})(?P<month>\d{2})\.zip$'
routing_column: year_month # per-row Date column; used to partition output
cast:
year_month: Date # workbook ships as Char; promote to Date
parse:
year_month: yyyymm_to_date
Schedule_Record_Layout_2024.xlsx:
effective_from: 2024-01
sheets:
Imports:
trade_type: imports
filename_pattern: '^IMP_(?P<year>\d{4})(?P<month>\d{2})(?P<flag>[NC])\.TXT\.zip$'
routing_column: period
# Upstream renamed `business_number` -> `business_number_9_digit`;
# keep the stable name in the combined dataset.
rename:
business_number_9_digit: business_number
ExportsUS:
trade_type: exports_us
filename_pattern: '^EXUS_(?P<year>\d{4})(?P<month>\d{2})(?P<flag>[NC])\.TXT\.zip$'
routing_column: period
ExportsNonUS:
trade_type: exports_nonus
filename_pattern: '^EXNONUS_(?P<year>\d{4})(?P<month>\d{2})(?P<flag>[NC])\.TXT\.zip$'
routing_column: period
Required regex groups: year (4 digits) and month (2 digits). Optional:
flag — when upstream publishes both N and C copies of the same period,
N wins at discovery time.
routing_column (defaults to period) names the per-row Date column that
the pipeline uses to bucket rows into year=YYYY/month=MM partitions. It
must resolve to a Date column in the final spec — either because the
workbook already declares it that way, or because cast + parse promote
it. Upstream schemas name this column differently (period, year_month,
ref_month, …) so the field is per-sheet rather than a hardcoded
convention.
Match the value against the logical (post-rename) column name. If you
rename foo_physical → bar_logical, set routing_column: bar_logical.
Columns with no rename match on their physical name.
rename (optional, per sheet) maps a workbook's physical column name
(the Description cell) to the logical name that shows up in the combined
dataset. Each Spec stores both, ingest slices FWF bytes under
physical_name and emits the parquet column under logical_name so
consumers see one stable column even as upstream drifts.
cast (optional, per sheet) overrides the workbook's declared dtype per
column. Upstream FWF specs routinely label numeric or date fields as
Char; use cast to promote them at import time, e.g.
cast: {value_usd: Int64, year_month: Date}. Allowed targets: Utf8,
Int64, Float64, Date.
parse (optional, per sheet) overrides the parser for a Date column.
The default is yyyymmdd_to_date; set
parse: {year_month: yyyymm_to_date} for YYYYMM fields. Only
meaningful for columns whose final dtype is Date.
computed (optional, per sheet) builds new parquet columns from other
columns at ingest time. Keyed by the output column's name; values
declare a named operation (kind) and its sources. First available
operation:
concat_to_date:sources: [period_date_col, day_col]→ Date column combining a YYYYMMDateand a day-of-month (Int64orUtf8—'02'parses the same as2) into a full YYYYMMDD date. Row-level failures (e.g. Feb 30, unparseable day) go to the quality log like any other parse error.concat_text:sources: [col_a, col_b, ...]→ Utf8 column that joins N Utf8 sources withseparator(default" "), skipping null or blank/whitespace-only sources and collapsing every run of whitespace (including tabs/newlines) in the joined result to a single space. A row with every source blank emits null. Override the separator withseparator: "-"(or any string).
computed:
full_name:
kind: concat_text
sources: [first_name, last_name]
separator: " " # optional; default is a single space
computed:
entry_date:
kind: concat_to_date
sources: [period, day_of_month]
drop (optional, per sheet) omits named columns from the parquet
output. Runs after cast/rename/computed, so a dropped column can still
feed a computed column and disappear afterward:
drop: [day_of_month] # used by `entry_date`, then dropped from the output
A worked example lives at examples/microtrade.yaml,
paired with examples/microdata-layout.xls.
Import the schema workbook (once per schema version)
Each sheet's field table is autodetected by looking for a row containing
Position, Description, Length, and Type; rows with Description = Blank are FWF padding and are skipped.
Convert a workbook to versioned YAML specs:
uv run microtrade import-spec XYZ12345_Record_Layout.xls
The importer looks up the workbook in microtrade.yaml (override location
with --config PATH) and writes one YAML per trade type under
src/microtrade/specs/<trade_type>/v<effective_from>.yaml. Each file is a
self-contained runtime contract — review and commit. Re-run with --force
to replace an existing version. When a new workbook lands, add a second
entry to the config with its own period window; the pipeline picks the
appropriate spec per period automatically, and a column-level diff against
the previous version is printed.
Ingest raw monthly zips
uv run microtrade ingest \
--input /path/to/raw_zips \
--output /path/to/datasets
Discovery walks every committed spec's filename_pattern and routes each
file to the spec that matches; files that match nothing are silently
ignored, and files that match more than one spec raise (ambiguous
config — tighten the regexes). microtrade.yaml is not consulted at
ingest time; everything the pipeline needs is already baked into the YAML
specs.
Defaults: year-to-date of the current calendar year, all three trade types, zstd-compressed Parquet. Common flags:
| Flag | Default | Purpose |
|---|---|---|
--type imports |
all | Repeat for multiple; limits processing |
--year 2024 |
unset | Process a single year (disables YTD logic) |
--month 4 |
unset | Combine with --year for one-shot re-ingest |
--all |
off | Process every year present under --input |
--chunk-rows 250000 |
250000 | Rows per Parquet row group / memory batch |
--compression zstd |
zstd | Parquet compression codec |
--encoding cp1252 |
cp1252 | Text encoding of the inner FWF (pass --encoding utf-8 for UTF-8 data) |
Per-partition outcomes are logged as JSON lines under
<output>/_manifests/<trade_type>/<run_id>.jsonl, and a one-line summary is
printed at the end. The exit code is non-zero if any partition failed; other
partitions in the same run still complete.
Output layout
output/
imports/
year=2024/month=01/part-0.parquet
year=2024/month=02/part-0.parquet
...
exports_us/
year=2024/month=01/part-0.parquet
...
exports_nonus/
...
_manifests/
imports/<run_id>.jsonl
exports_us/<run_id>.jsonl
exports_nonus/<run_id>.jsonl
Partition columns (year, month) are encoded in the directory path only,
not duplicated inside each Parquet file. Read with any Hive-aware scanner:
import polars as pl
df = pl.scan_parquet("output/imports", hive_partitioning=True).collect()
Or with DuckDB:
SELECT * FROM read_parquet('output/imports/**/*.parquet', hive_partitioning=1);
Ops: cron-driven runs
microtrade ops run is a thin planner on top of microtrade ingest for
unattended environments (k8s CronJob, typically). It hashes workbooks +
raw zips, compares them against per-file JSON manifests on disk, and
re-runs only the parts that actually changed. Use it when you want the
"every N minutes, do the minimum work" loop; use microtrade ingest
directly for one-shot manual reprocessing.
Two stages per run:
- Stage 1 — import-spec. For each workbook under
workbooks_dirwhose content hash (or the pairedmicrotrade.yamlhash) differs from its last manifest, re-runmicrotrade import-specintospecs_dir. - Stage 2 — ingest. Group raw zips under
raw_dirby(trade_type, year)via each sheet'sfilename_pattern. If any raw in a year is dirty (new, changed, or the pairedmicrotrade.yamlchanged), re-runmicrotrade.pipeline.runfor that whole year (since each raw is a YTD snapshot, the year is the reprocessing unit — not the month).
Paths + directories live in a separate config.yaml loaded by
microtrade.ops.settings (env overrides: MT_RAW_DIR=/data/raw, etc.):
# config.yaml
microtrade_yaml: /app/microtrade.yaml
workbooks_dir: /data/workbooks
raw_dir: /data/raw
specs_dir: /data/specs
processed_dir: /data/processed
spec_manifests_dir: /data/manifests/specs
raw_manifests_dir: /data/manifests/raw
upstream_raw_dir: /mnt/upstream/raw # where the provider drops files
raw_remote_dir: /mnt/remote/raw # our durable archive
processed_remote_dir: /mnt/remote/processed # remote Parquet destination
manifests_remote_dir: /mnt/remote/manifests # shared dirty-check state
Run:
uv run microtrade ops run --config config.yaml
Exit code is 0 on clean completion (including "nothing to do") and
non-zero if any year or workbook failed; the failed items simply have
no manifest update, so the next cronjob run replans them automatically.
loguru handles logging (no custom sinks).
The manifests under spec_manifests_dir / raw_manifests_dir are the
only stateful artifact the ops layer owns. Wire the pull_manifests
and push_manifests transport hooks (below) to a shared backend and
multiple operators — different pods, different clusters, different
people — converge on the same "what's already clean" view without
needing to share the rest of the PV.
A transport seam wraps the ordering contract:
sequenceDiagram
participant U as upstream
participant R as remote
participant L as local (pod)
R->>L: pull_manifests
U->>R: mirror
R->>L: pull_workbooks (xls → workbooks_dir, once)
Note over L: Stage 1 (see below)
loop per dirty (trade_type, year)
R->>L: pull_raws_for_year (zip → raw_dir)
Note over L: ingest_year (see below)
L->>R: push_processed
Note over L: cleanup_local_year (rm raws + parquet)
end
L->>R: push_manifests
Inside the Process step, pod-local data flows through two stages:
flowchart TB
WB[workbooks_dir<br/>*.xls*] -->|stage 1<br/>import_spec| SP[specs_dir<br/>v<effective>.yaml]
RD[raw_dir<br/>*.zip] --> ST2{{stage 2<br/>ingest_year}}
SP --> ST2
ST2 --> PD[processed_dir<br/>year=YYYY/month=MM/<br/>part-N.parquet]
pull_manifests fetches the shared dirty-check state before planning
so a pod that doesn't have the previous run's PV still skips already-
done work. push_manifests publishes the updated state at the end of
the run (even if some years failed — partial progress is worth
sharing). Path routing is owned by the library and driven by
config.yaml; only the per-file transfer primitive is injectable:
from microtrade.ops.runner import run
from microtrade.ops.settings import load_settings
# Default: shutil.copy2 (local disk / mounted PV). No kwarg needed.
sys.exit(run(load_settings(Path("config.yaml"))))
# Or swap in your own one-file-at-a-time primitive (kubectl cp,
# S3 put_object, etc.). It MUST preserve mtime or the skip-if-
# unchanged check misfires and every file re-copies next run.
sys.exit(run(
load_settings(Path("config.yaml")),
copy_file=my_kubectl_cp_wrapper,
))
Point your k8s CronJob's command at this wrapper instead of
microtrade ops run when you need a custom copy primitive. See
examples/ops_demo.py for a runnable walkthrough.
See CLAUDE.md for the full list of invariants (dirty-check logic,
manifest schemas, k8s deployment guidance, what the ops layer
explicitly does not do).
Architecture
config.load_config(yaml) -> ProjectConfig (import-spec only)
excel_spec.read_workbook -> Spec per sheet, with filename_pattern baked in
discover.scan(input_dir) -> list[RawInput] (by matching each committed pattern)
schema.resolve(specs, period) -> Spec whose [effective_from, effective_to] contains period
ingest.iter_record_batches -> pyarrow.RecordBatch stream (bounded memory)
write.PartitionWriter -> year=/month=/part-0.parquet.tmp, atomic rename
pipeline.run -> orchestrates the above + JSONL manifest
ops.planner + ops.runner -> cron-driven dirty-check + dispatch on top of pipeline.run
Key invariants:
- Excel +
microtrade.yamlare the upstream source of truth; committed YAML undersrc/microtrade/specs/is the runtime contract. The pipeline never reads Excel, and consults the project config only viaimport-spec. - Each partition write is idempotent: re-running YTD cleanly replaces the current year's partitions, leaving prior years untouched.
- The zip is decompressed on the fly via
zipfile.ZipFile.open(); the raw FWF is never extracted to disk and never fully materialized in memory. - Per-partition failures are recorded in the manifest but do not abort the run
- one bad month will not block the rest.
validate-specsflags overlapping or gapped[effective_from, effective_to]windows so silent ambiguities inschema.resolvedon't reach production.
Development
uv run pytest # full suite with coverage
uv run pytest tests/test_pipeline.py::test_name # single test
uv run ruff format # auto-format
uv run ruff check # lint
uv run mypy src # strict type check
uv run pre-commit run --all-files # all pre-commit hooks
Tests build synthetic Excel workbooks, YAML specs, and FWF zips on the fly in
tests/_helpers.py rather than checking in binary fixtures, so the exercised
code paths match the real production workflow end-to-end.
Status
The pipeline is feature-complete: scaffolding, project config, Excel → YAML,
discover + ingest + write, the orchestrated CLI subcommands (ingest,
import-spec, inspect, validate-specs), and the cron-driven ops layer
(microtrade ops run) are all landed and covered.
Reference YAML specs ship under src/microtrade/specs/ but predate the
filename_pattern field (tracked in issue #16) — replace them by writing a
microtrade.yaml and running microtrade import-spec against the real
schema workbook. A new workbook goes into the config as a second entry with
its own effective_from/effective_to; the pipeline picks the right spec
per period automatically. Run microtrade validate-specs after importing to
catch dtype conflicts and window overlaps/gaps between versions.
License
MIT (see LICENSE).
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file microtrade_fwf-0.2.17.tar.gz.
File metadata
- Download URL: microtrade_fwf-0.2.17.tar.gz
- Upload date:
- Size: 161.5 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
775c73020192633e1fc72bbe8eaeadafbd0d92a7f683706b01fd7246821cdbdb
|
|
| MD5 |
a1b782663761e1e65f166064991502cd
|
|
| BLAKE2b-256 |
6574ae63c76547bed56de9d7efcb900aa8684bbe0562c52f5e1a2be4afcb7256
|
Provenance
The following attestation bundles were made for microtrade_fwf-0.2.17.tar.gz:
Publisher:
publish.yml on twedl/microtrade
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
microtrade_fwf-0.2.17.tar.gz -
Subject digest:
775c73020192633e1fc72bbe8eaeadafbd0d92a7f683706b01fd7246821cdbdb - Sigstore transparency entry: 1373582850
- Sigstore integration time:
-
Permalink:
twedl/microtrade@e99bb59bbca7d2a0a77982c1b9b99e0df6a19069 -
Branch / Tag:
refs/tags/v0.2.17 - Owner: https://github.com/twedl
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@e99bb59bbca7d2a0a77982c1b9b99e0df6a19069 -
Trigger Event:
push
-
Statement type:
File details
Details for the file microtrade_fwf-0.2.17-py3-none-any.whl.
File metadata
- Download URL: microtrade_fwf-0.2.17-py3-none-any.whl
- Upload date:
- Size: 60.3 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
331e1a302662b87088f3b3885b2e572086e951263a4fbefe8b02f3c004e4803a
|
|
| MD5 |
7efc9c596c9203e2c371d24b9b552e75
|
|
| BLAKE2b-256 |
b61ece0ada43d5fcd42d9473a4d84ec12a8bae54def37cbb7bb5dca14452b876
|
Provenance
The following attestation bundles were made for microtrade_fwf-0.2.17-py3-none-any.whl:
Publisher:
publish.yml on twedl/microtrade
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
microtrade_fwf-0.2.17-py3-none-any.whl -
Subject digest:
331e1a302662b87088f3b3885b2e572086e951263a4fbefe8b02f3c004e4803a - Sigstore transparency entry: 1373582944
- Sigstore integration time:
-
Permalink:
twedl/microtrade@e99bb59bbca7d2a0a77982c1b9b99e0df6a19069 -
Branch / Tag:
refs/tags/v0.2.17 - Owner: https://github.com/twedl
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@e99bb59bbca7d2a0a77982c1b9b99e0df6a19069 -
Trigger Event:
push
-
Statement type: