A Zero-Boilerplate Universal Data Gateway and Pipeline Orchestration Platform.
Project description
๐ Incorporator
A schema-free data mapper that turns JSON, XML, or CSV into a unified Python object graph with dot-notation and access-at-runtime.
โจ Highlights
- Works with unpredictable JSON APIsโand effortlessly digests XML, CSV, NDJSON, SQLite, and columnar Parquetโwithout writing a single line of schema.
- Turns raw data into native Python objects instantly, bypassing the need for manual model definitions or brittle classes.
- Handles changing JSON structures at runtime, absorbing missing keys or mutating data types without throwing validation errors.
- Harnesses Pydantic and HTTPX under the hood without forcing you to write data classes, connection poolers, or pagination
whileloops.
๐ฏ Use this when:
- You are working with evolving, undocumented, or heavily nested JSON APIs.
- You need a universal bridge to map legacy XML, flat CSVs, or columnar Parquet into the exact same Python object graph.
- You are exhausted by writing boilerplate models and validation logic just to explore a new data source.
- You need to extract deeply nested web data, transform it, and pivot it straight into a local SQL database or columnar data lake.
๐ ๏ธ How it Works: Zero-Schema Ingestion
Imagine receiving this spacecraft telemetry JSON. Notice how the nested "st" dictionary changes its structure completely for every subsystem (pos vs sig vs bat). Standard parsers would crash instantly.
The Input (telemetry.json):
[
{"id":"NAV", "st":{"pos":[12,44], "ok":1}},
{"id":"COM", "st":{"sig":78, "ok":1}},
{"id":"PWR", "st":{"bat":92, "ok":1}},
{"id":"THR", "st":{"lvl":63, "ok":0}}
]
The Incorporator Way: Feed it the unpredictable JSON. Incorporator dynamically unifies the changing structures into a single object graph and gives you instant dot-notation access.
import asyncio
from incorporator import Incorporator
class System(Incorporator): pass # Subclass; everything else hangs off it.
async def main():
# 1. Parse unpredictable JSON directly into Python objects. No models defined!
systems = await System.incorp(
inc_file="telemetry.json",
inc_code="id" # Sets 'id' as the O(1) Memory Registry lookup key
)
# 2. Instantly access the unified Python object graph via dot-notation
print(f"Navigation Position: {systems.inc_dict['NAV'].st.pos}") # Output: [12, 44]
print(f"Power Battery Level: {systems.inc_dict['PWR'].st.bat}%") # Output: 92%
# 3. Interpret and manipulate data effortlessly at runtime
thr = systems.inc_dict["THR"]
if not thr.st.ok:
print(f"โ ๏ธ THRUST FAILURE! Efficiency dropped to {thr.st.lvl}")
asyncio.run(main())
๐คทโโ๏ธ Wait, what if my data isn't JSON?
It doesn't matter. Incorporator automatically infers the format from the URL or file extension. The syntax never changes.
Out of the box: JSON, NDJSON, CSV, TSV, PSV, XML, SQLite, and HTML (HTML is parse-only). Opt-in extras unlock Apache Parquet, Feather (Arrow IPC), ORC, Apache Avro, and Excel (XLSX) โ same incorp() / export() surface, no syntax changes.
If that exact same telemetry data comes from a legacy system as XML or CSV:
# The syntax doesn't change for XML...
systems_xml = await System.incorp(inc_file="telemetry.xml", inc_code="id")
print(systems_xml.inc_dict["NAV"].st.pos) # Output:['12', '44']
# ...and it works instantly for CSV, TSV, or streaming NDJSON logs!
systems_csv = await System.incorp(inc_file="telemetry.csv", inc_code="id")
๐ฆ Installation
Built on Pydantic V2 metaprogramming, HTTPX, and Tenacity. No system dependencies.
pip install incorporator
Core dependencies: pydantic (>=2.0), httpx, tenacity.
Opt in to format and performance extras as you need them:
pip install incorporator[speedups] # orjson + lxml + cramjam (GIL-releasing parsers, Rust compression)
pip install incorporator[parquet] # pyarrow โ unlocks Parquet, Feather, and ORC
pip install incorporator[avro] # fastavro โ Apache Avro binary streams
pip install incorporator[xlsx] # openpyxl โ Excel (.xlsx) read/write
pip install incorporator[orchestrate] # typer + prefect โ CLI + Prefect task wrappers
pip install incorporator[all] # everything except [parquet] (pyarrow is ~30 MB โ opt in explicitly)
๐งฐ The Verbs
Every method you'll call on an Incorporator subclass, in order of increasing power.
incorp() โ fetch, parse, build the object graph
class Launch(Incorporator): pass
launches = await Launch.incorp(inc_url="https://ll.thespacedevs.com/2.2.0/launch/upcoming/")
print(launches[0].name)
โ Tutorial 1 โ First Steps with Incorporator
test() โ let the framework write your incorp() kwargs for you
await Launch.test(inc_url="https://api.unknown.com/v1/users")
# Prints payload tree + suggested inc_code, rec_path, conv_dict.
refresh() โ re-fetch live data into existing instances
await Launch.refresh(instance=launches)
The seed call's network context โ params, headers, rec_path,
conv_dict, payload_list, sql_query, etc. โ is auto-replayed on
every refresh, so stateful polling against a URL that needed query
parameters (CoinGecko's ?vs_currency=usd, paginated SQL, custom
POST bodies) works without re-declaring anything. Caller-supplied
kwargs win on conflicts.
export() โ serialise to any format
CSV, JSON, NDJSON, XML, SQLite, Parquet, Feather, ORC, Avro, XLSX. All share the same call.
await Launch.export(instance=launches, file_path="launches.parquet")
โ Formats & compression cheat sheet
stream() โ a long-running data pipeline
Periodic fetch + optional stateful refresh + optional periodic export, running as a daemon. The kwargs are the pipeline definition. A Wave per chunk is the built-in observability stream โ a DX bonus, not the purpose.
async for wave in Launch.stream(
incorp_params={"inc_url": "https://ll.thespacedevs.com/2.2.0/launch/upcoming/"},
refresh_interval=60, # re-fetch every 60s
export_params={"file_path": "launches.parquet"},
export_interval=300, # flush to disk every 5 min
):
if wave.failed_sources: print(wave) # observability bonus
โ Streaming & pagination guide
fjord() โ a multi-source data pipeline
Fans out across N concurrent sources, fuses them through a user-defined outflow(state) function, exports the combined output.
async for wave in Incorporator.fjord(
stream_params=[
{"cls": Coin, "incorp_params": {"inc_url": "..."}, "refresh_interval": 30},
{"cls": Order, "incorp_params": {"inc_url": "..."}, "refresh_interval": 5},
],
outflow="outflow.py", # outflow(state) -> list[dict] OR dict[name, list[dict]]
export_params={"file_path": "fusion.parquet"}, # single output
):
if wave.failed_sources: print(wave)
Two more fjord() patterns:
- State-aware
inflow(state)โ ifinflow.pydefines a top-levelinflow(state)callable, fjord seeds sources sequentially and feeds each one the prior sources' loaded snapshots. That's howlink_to(state["Planet"], โฆ)andlink_to_list(state["Film"], โฆ)resolve foreign-key URLs to real Pydantic instances at incorp time. - Multi-output fjord โ return
dict[ClassName, list[dict]]fromoutflow(state)and fjord builds N derived classes and writes N export files in one tick, with per-classexport_params={"JediArchive": {...}, "Demographics": {...}}.
โ Tutorial 7 โ Multi-Source Fjord
display() โ REPL debug print
launches[0].display() # <Launch id="..." name="...">
stream() and fjord() are the production verbs โ and they're what the CLI runs against a pipeline.json.
๐ From Code to Production โ CLI & Docker
The CLI runs the same stream() / fjord() engines from a pipeline.json. No Python required for single- or multi-source ETLs.
| Command | What it does |
|---|---|
incorporator init --type stream |
Scaffold a starter pipeline.json (use --type fjord for multi-source + outflow.py). |
incorporator validate pipeline.json |
Structural check before you ship โ no network calls. |
incorporator stream pipeline.json |
Run a stream pipeline. |
incorporator fjord pipeline.json |
Run a multi-source fjord pipeline. |
incorporator init --type stream --output-dir .
# Edit pipeline.json (inc_url, headers, export_params, ...)
incorporator validate pipeline.json
incorporator stream pipeline.json # one-shot
# ...or run it as a Dockerised daemon:
cp .env.example .env && mkdir -p config data logs && mv pipeline.json config/
docker compose up -d && docker compose logs -f
Secrets stay out of pipeline.json โ use ${API_KEY} for env vars or ${file:/run/secrets/api_key} for Docker / Kubernetes Secrets mounts. Set INCORPORATOR_SECRETS_ROOT=/run/secrets to sandbox ${file:...} references against directory-traversal at startup.
โ CLI reference ยท Deployment & secrets guide
๐ Resilience & Batteries Included
- GIL-free hyperthreading via the
[speedups]extra (orjson, lxml). โ Installation - Invisible decompression for
.gz,.bz2,.lzma,.zip,.tarpayloads โ automatic, no extra calls; ZIP/TAR member paths are validated against directory-traversal attacks and a 1 GB decompression-bomb cap. โ Formats - Connection pooling + retries + DLQ โ HTTP/2-multiplexed
httpx.AsyncClient, Tenacity exponential backoff, failed URLs surfaced viawave.failed_sources. Opt-inblock_internal_redirects=Truerejects 3xx Locations to RFC1918 / loopback / cloud-metadata IPs. โ Library reference - Atomic writes for monolithic formats โ Parquet, Feather, ORC, JSON, XML, and XLSX all build to a sibling tempfile and
os.replace()on success, so a crash mid-write never leaves a corrupt-footer file. โ Formats - Spreadsheet-injection guard โ CSV / XLSX cells starting with
=/@/+/-are prefixed with'on export so consumers in Excel / LibreOffice / Sheets render the literal text instead of evaluating formulas (OWASP-recommended default; opt out viacsv_safe_formulas=False). - Zero-OOM
IncorporatorListbacked by aWeakValueDictionaryfor O(1) lookups without GC pressure. โ Streaming - Non-blocking observability โ subclass
LoggedIncorporator; logs flow through aQueueHandlerso disk I/O never blocks the event loop. โ Library reference - Cross-format round-tripping โ JSON โ Parquet โ SQLite โ Avro โ CSV โ XML, all share the same
export()surface, governed by a small hand-maintained type bridge that turns adding a new format into a 2-row dict change. โ Tutorial 2 โ Universal Formats ยท Cross-format type bridge
๐ Tutorials (in order)
A focused 1-7 curriculum in increasing difficulty. Each slot introduces
one new verb or technique. Runnable code lives under /examples.
- ๐ฑ First Steps with Incorporator โ your first
incorp()against CoinGecko market data. - ๐ฆ Universal Formats โ One Verb, Any File โ same call across
.json/.csv/.parquet/.sqlite/.xlsx/.avro, with a comparison table. - ๐ต๏ธโโ๏ธ DX Inspector โ Let the Framework Write Your Kwargs โ
test()profiles unknown APIs. - ๐ Drilling API Graphs โ Parent โ Child โ
inc_parent+inc_childfor HATEOAS APIs (SpaceX launches โ rockets). - ๐ Keep It Live โ Stateful Refresh โ
refresh()three ways against Binance's live ticker. - ๐ Streaming Daemons โ
stream()for long-running pipelines. - ๐ Multi-Source Fjord (capstone) โ
fjord()fusing CoinGecko + Binance into a live spread metric.
๐ Reference
- ๐ Library Reference (pdoc) โ every public class, method, converter, and paginator, rendered from the source docstrings.
- ๐ฉบ Production Debugging with
get_error()โLoggedIncorporator+ structured error logs + DLQ retry loops. - ๐ฆ Formats & Compression Cheat Sheet โ every format kwarg, compression rules.
- ๐ Streaming & Pagination Deep Dive โ paginator family for files / endpoints too big for RAM.
- ๐ณ CLI & Configuration Guide โ running pipelines from
pipeline.jsonwithout writing Python. - โก Performance Characteristics โ measured throughput per format + automatic engine optimisations.
๐ Appendices
Patterns that earned their keep before the curriculum was reshaped โ production-ready, just not on the learning path.
- ๐งฌ Pokรฉmon ETL โ array reductions with
calc/sum_attributes. - ๐จ Shady Jimmy's XML Audit โ XML ingestion + declarative bulk POST + fraud audit.
- ๐ธ๏ธ Crypto Graph Mapping (static) โ
link_to-based in-memory join across CoinGecko + Binance. Tutorial 7 covers the same fusion as a live daemon. - ๐ NASCAR Fantasy โ Graph-Map Fjord (advanced) โ six-source fjord with state-aware
inflow(state), multi-outputoutflow(state), and sentinel-ID filtering. Builds on Tutorial 7. - ๐ Data Lake Pivot (legacy) โ original JSON โ Avro/SQLite walkthrough; the headline pattern is now in Tutorial 2.
๐ค Philosophy & Contributing
Incorporator is built on strict OOP principles, non-blocking observability, and a forgiving metaprogramming shield. We trap standard library exceptions (JSONDecodeError, httpx.HTTPStatusError) and gracefully recast them as domain errors. Your event loop is safe with us.
Contributions: see CONTRIBUTING.md for the dev install, quality bar, and architecture conventions. Security disclosures: see SECURITY.md. Release notes: CHANGELOG.md.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file incorporator-1.1.0.tar.gz.
File metadata
- Download URL: incorporator-1.1.0.tar.gz
- Upload date:
- Size: 230.3 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
c687b80b1faab0d3a3ce1e2d930288050405df342b4500c7e38c43b16c318b87
|
|
| MD5 |
0ea78f2ded1eb4b066512ae6e1b5588d
|
|
| BLAKE2b-256 |
c77a3402f056cd5f7fd671334b45aa8080128ff6f1a26998cc0ccf8dff8968f7
|
File details
Details for the file incorporator-1.1.0-py3-none-any.whl.
File metadata
- Download URL: incorporator-1.1.0-py3-none-any.whl
- Upload date:
- Size: 174.5 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
58d2bbb4355a171feb838c07597fb828a5b19c43a711789e76dfa0d9ad6f1535
|
|
| MD5 |
3f618c96aa6b04e73a3c9c820eef7670
|
|
| BLAKE2b-256 |
126ce56329c4baf4ca8f545fb09bddb9657ddac3104742264a93e9d582081671
|