Skip to main content

A Zero-Boilerplate Universal Data Gateway and Pipeline Orchestration Platform.

Project description


๐Ÿš€ Incorporator

A schema-free data mapper that turns JSON, XML, or CSV into a unified Python object graph with dot-notation and access-at-runtime.

PyPI version Python Versions Downloads

CI mypy: strict Code style: black Linter: ruff

Pydantic v2 HTTPX

License: MIT GitHub stars

โœจ Highlights

  • Works with unpredictable JSON APIsโ€”and effortlessly digests XML, CSV, NDJSON, SQLite, and columnar Parquetโ€”without writing a single line of schema.
  • Turns raw data into native Python objects instantly, bypassing the need for manual model definitions or brittle classes.
  • Handles changing JSON structures at runtime, absorbing missing keys or mutating data types without throwing validation errors.
  • Harnesses Pydantic and HTTPX under the hood without forcing you to write data classes, connection poolers, or pagination while loops.

๐ŸŽฏ Use this when:

  • You are working with evolving, undocumented, or heavily nested JSON APIs.
  • You need a universal bridge to map legacy XML, flat CSVs, or columnar Parquet into the exact same Python object graph.
  • You are exhausted by writing boilerplate models and validation logic just to explore a new data source.
  • You need to extract deeply nested web data, transform it, and pivot it straight into a local SQL database or columnar data lake.

๐Ÿ› ๏ธ How it Works: Zero-Schema Ingestion

Imagine receiving this spacecraft telemetry JSON. Notice how the nested "st" dictionary changes its structure completely for every subsystem (pos vs sig vs bat). Standard parsers would crash instantly.

The Input (telemetry.json):

[
  {"id":"NAV", "st":{"pos":[12,44], "ok":1}},
  {"id":"COM", "st":{"sig":78, "ok":1}},
  {"id":"PWR", "st":{"bat":92, "ok":1}},
  {"id":"THR", "st":{"lvl":63, "ok":0}}
]

The Incorporator Way: Feed it the unpredictable JSON. Incorporator dynamically unifies the changing structures into a single object graph and gives you instant dot-notation access.

import asyncio
from incorporator import Incorporator

class System(Incorporator): pass     # Subclass; everything else hangs off it.

async def main():
    # 1. Parse unpredictable JSON directly into Python objects. No models defined!
    systems = await System.incorp(
        inc_file="telemetry.json",
        inc_code="id" # Sets 'id' as the O(1) Memory Registry lookup key
    )

    # 2. Instantly access the unified Python object graph via dot-notation
    print(f"Navigation Position: {systems.inc_dict['NAV'].st.pos}")   # Output: [12, 44]
    print(f"Power Battery Level: {systems.inc_dict['PWR'].st.bat}%")  # Output: 92%

    # 3. Interpret and manipulate data effortlessly at runtime
    thr = systems.inc_dict["THR"]
    if not thr.st.ok:
        print(f"โš ๏ธ THRUST FAILURE! Efficiency dropped to {thr.st.lvl}")

asyncio.run(main())

๐Ÿคทโ€โ™‚๏ธ Wait, what if my data isn't JSON?

It doesn't matter. Incorporator automatically infers the format from the URL or file extension. The syntax never changes.

Out of the box: JSON, NDJSON, CSV, TSV, PSV, XML, SQLite, and HTML (HTML is parse-only). Opt-in extras unlock Apache Parquet, Feather (Arrow IPC), ORC, Apache Avro, and Excel (XLSX) โ€” same incorp() / export() surface, no syntax changes.

If that exact same telemetry data comes from a legacy system as XML or CSV:

# The syntax doesn't change for XML...
systems_xml = await System.incorp(inc_file="telemetry.xml", inc_code="id")
print(systems_xml.inc_dict["NAV"].st.pos) # Output:['12', '44']

# ...and it works instantly for CSV, TSV, or streaming NDJSON logs!
systems_csv = await System.incorp(inc_file="telemetry.csv", inc_code="id")

๐Ÿ“ฆ Installation

Built on Pydantic V2 metaprogramming, HTTPX, and Tenacity. No system dependencies.

pip install incorporator

Core dependencies: pydantic (>=2.0), httpx, tenacity.

Opt in to format and performance extras as you need them:

pip install incorporator[speedups]    # orjson + lxml + cramjam (GIL-releasing parsers, Rust compression)
pip install incorporator[parquet]     # pyarrow โ€” unlocks Parquet, Feather, and ORC
pip install incorporator[avro]        # fastavro โ€” Apache Avro binary streams
pip install incorporator[xlsx]        # openpyxl โ€” Excel (.xlsx) read/write
pip install incorporator[orchestrate] # typer + prefect โ€” CLI + Prefect task wrappers
pip install incorporator[all]         # everything except [parquet] (pyarrow is ~30 MB โ€” opt in explicitly)

๐Ÿงฐ The Verbs

Every method you'll call on an Incorporator subclass, in order of increasing power.

incorp() โ€” fetch, parse, build the object graph

class Launch(Incorporator): pass

launches = await Launch.incorp(inc_url="https://ll.thespacedevs.com/2.2.0/launch/upcoming/")
print(launches[0].name)

โ†’ Tutorial 1 โ€” First Steps with Incorporator

test() โ€” let the framework write your incorp() kwargs for you

await Launch.test(inc_url="https://api.unknown.com/v1/users")
# Prints payload tree + suggested inc_code, rec_path, conv_dict.

refresh() โ€” re-fetch live data into existing instances

await Launch.refresh(instance=launches)

The seed call's network context โ€” params, headers, rec_path, conv_dict, payload_list, sql_query, etc. โ€” is auto-replayed on every refresh, so stateful polling against a URL that needed query parameters (CoinGecko's ?vs_currency=usd, paginated SQL, custom POST bodies) works without re-declaring anything. Caller-supplied kwargs win on conflicts.

export() โ€” serialise to any format

CSV, JSON, NDJSON, XML, SQLite, Parquet, Feather, ORC, Avro, XLSX. All share the same call.

await Launch.export(instance=launches, file_path="launches.parquet")

โ†’ Formats & compression cheat sheet

stream() โ€” a long-running data pipeline

Periodic fetch + optional stateful refresh + optional periodic export, running as a daemon. The kwargs are the pipeline definition. A Wave per chunk is the built-in observability stream โ€” a DX bonus, not the purpose.

async for wave in Launch.stream(
    incorp_params={"inc_url": "https://ll.thespacedevs.com/2.2.0/launch/upcoming/"},
    refresh_interval=60,                              # re-fetch every 60s
    export_params={"file_path": "launches.parquet"},
    export_interval=300,                              # flush to disk every 5 min
):
    if wave.failed_sources: print(wave)               # observability bonus

โ†’ Streaming & pagination guide

fjord() โ€” a multi-source data pipeline

Fans out across N concurrent sources, fuses them through a user-defined outflow(state) function, exports the combined output.

async for wave in Incorporator.fjord(
    stream_params=[
        {"cls": Coin,  "incorp_params": {"inc_url": "..."}, "refresh_interval": 30},
        {"cls": Order, "incorp_params": {"inc_url": "..."}, "refresh_interval": 5},
    ],
    outflow="outflow.py",                             # outflow(state) -> list[dict] OR dict[name, list[dict]]
    export_params={"file_path": "fusion.parquet"},   # single output
):
    if wave.failed_sources: print(wave)

Two more fjord() patterns:

  • State-aware inflow(state) โ€” if inflow.py defines a top-level inflow(state) callable, fjord seeds sources sequentially and feeds each one the prior sources' loaded snapshots. That's how link_to(state["Planet"], โ€ฆ) and link_to_list(state["Film"], โ€ฆ) resolve foreign-key URLs to real Pydantic instances at incorp time.
  • Multi-output fjord โ€” return dict[ClassName, list[dict]] from outflow(state) and fjord builds N derived classes and writes N export files in one tick, with per-class export_params={"JediArchive": {...}, "Demographics": {...}}.

โ†’ Tutorial 7 โ€” Multi-Source Fjord

display() โ€” REPL debug print

launches[0].display()   # <Launch id="..." name="...">

stream() and fjord() are the production verbs โ€” and they're what the CLI runs against a pipeline.json.


๐Ÿš€ From Code to Production โ€” CLI & Docker

The CLI runs the same stream() / fjord() engines from a pipeline.json. No Python required for single- or multi-source ETLs.

Command What it does
incorporator init --type stream Scaffold a starter pipeline.json (use --type fjord for multi-source + outflow.py).
incorporator validate pipeline.json Structural check before you ship โ€” no network calls.
incorporator stream pipeline.json Run a stream pipeline.
incorporator fjord pipeline.json Run a multi-source fjord pipeline.
incorporator init --type stream --output-dir .
# Edit pipeline.json (inc_url, headers, export_params, ...)
incorporator validate pipeline.json
incorporator stream pipeline.json                # one-shot
# ...or run it as a Dockerised daemon:
cp .env.example .env && mkdir -p config data logs && mv pipeline.json config/
docker compose up -d && docker compose logs -f

Secrets stay out of pipeline.json โ€” use ${API_KEY} for env vars or ${file:/run/secrets/api_key} for Docker / Kubernetes Secrets mounts. Set INCORPORATOR_SECRETS_ROOT=/run/secrets to sandbox ${file:...} references against directory-traversal at startup.

โ†’ CLI reference ยท Deployment & secrets guide


๐Ÿ›  Resilience & Batteries Included

  • GIL-free hyperthreading via the [speedups] extra (orjson, lxml). โ†’ Installation
  • Invisible decompression for .gz, .bz2, .lzma, .zip, .tar payloads โ€” automatic, no extra calls; ZIP/TAR member paths are validated against directory-traversal attacks and a 1 GB decompression-bomb cap. โ†’ Formats
  • Connection pooling + retries + DLQ โ€” HTTP/2-multiplexed httpx.AsyncClient, Tenacity exponential backoff, failed URLs surfaced via wave.failed_sources. Opt-in block_internal_redirects=True rejects 3xx Locations to RFC1918 / loopback / cloud-metadata IPs. โ†’ Library reference
  • Atomic writes for monolithic formats โ€” Parquet, Feather, ORC, JSON, XML, and XLSX all build to a sibling tempfile and os.replace() on success, so a crash mid-write never leaves a corrupt-footer file. โ†’ Formats
  • Spreadsheet-injection guard โ€” CSV / XLSX cells starting with = / @ / + / - are prefixed with ' on export so consumers in Excel / LibreOffice / Sheets render the literal text instead of evaluating formulas (OWASP-recommended default; opt out via csv_safe_formulas=False).
  • Zero-OOM IncorporatorList backed by a WeakValueDictionary for O(1) lookups without GC pressure. โ†’ Streaming
  • Non-blocking observability โ€” subclass LoggedIncorporator; logs flow through a QueueHandler so disk I/O never blocks the event loop. โ†’ Library reference
  • Cross-format round-tripping โ€” JSON โ†” Parquet โ†” SQLite โ†” Avro โ†” CSV โ†” XML, all share the same export() surface, governed by a small hand-maintained type bridge that turns adding a new format into a 2-row dict change. โ†’ Tutorial 2 โ€” Universal Formats ยท Cross-format type bridge

๐Ÿ“š Tutorials (in order)

A focused 1-7 curriculum in increasing difficulty. Each slot introduces one new verb or technique. Runnable code lives under /examples.

  1. ๐ŸŒฑ First Steps with Incorporator โ€” your first incorp() against CoinGecko market data.
  2. ๐Ÿ“ฆ Universal Formats โ€” One Verb, Any File โ€” same call across .json / .csv / .parquet / .sqlite / .xlsx / .avro, with a comparison table.
  3. ๐Ÿ•ต๏ธโ€โ™‚๏ธ DX Inspector โ€” Let the Framework Write Your Kwargs โ€” test() profiles unknown APIs.
  4. ๐Ÿš€ Drilling API Graphs โ€” Parent โ†’ Child โ€” inc_parent + inc_child for HATEOAS APIs (SpaceX launches โ†’ rockets).
  5. ๐Ÿ”„ Keep It Live โ€” Stateful Refresh โ€” refresh() three ways against Binance's live ticker.
  6. ๐ŸŒŠ Streaming Daemons โ€” stream() for long-running pipelines.
  7. ๐ŸŒŠ Multi-Source Fjord (capstone) โ€” fjord() fusing CoinGecko + Binance into a live spread metric.

๐Ÿ“‘ Reference

๐Ÿ“Ž Appendices

Patterns that earned their keep before the curriculum was reshaped โ€” production-ready, just not on the learning path.


๐Ÿค Philosophy & Contributing

Incorporator is built on strict OOP principles, non-blocking observability, and a forgiving metaprogramming shield. We trap standard library exceptions (JSONDecodeError, httpx.HTTPStatusError) and gracefully recast them as domain errors. Your event loop is safe with us.

Contributions: see CONTRIBUTING.md for the dev install, quality bar, and architecture conventions. Security disclosures: see SECURITY.md. Release notes: CHANGELOG.md.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

incorporator-1.1.0.tar.gz (230.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

incorporator-1.1.0-py3-none-any.whl (174.5 kB view details)

Uploaded Python 3

File details

Details for the file incorporator-1.1.0.tar.gz.

File metadata

  • Download URL: incorporator-1.1.0.tar.gz
  • Upload date:
  • Size: 230.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.3

File hashes

Hashes for incorporator-1.1.0.tar.gz
Algorithm Hash digest
SHA256 c687b80b1faab0d3a3ce1e2d930288050405df342b4500c7e38c43b16c318b87
MD5 0ea78f2ded1eb4b066512ae6e1b5588d
BLAKE2b-256 c77a3402f056cd5f7fd671334b45aa8080128ff6f1a26998cc0ccf8dff8968f7

See more details on using hashes here.

File details

Details for the file incorporator-1.1.0-py3-none-any.whl.

File metadata

  • Download URL: incorporator-1.1.0-py3-none-any.whl
  • Upload date:
  • Size: 174.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.3

File hashes

Hashes for incorporator-1.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 58d2bbb4355a171feb838c07597fb828a5b19c43a711789e76dfa0d9ad6f1535
MD5 3f618c96aa6b04e73a3c9c820eef7670
BLAKE2b-256 126ce56329c4baf4ca8f545fb09bddb9657ddac3104742264a93e9d582081671

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page