Skip to main content

Schema-free ETL mapper with in-process graph orchestration: turn any REST API, CSV, Parquet, or XML into typed Python objects, then coordinate multi-source pipelines in a bounded time window with Tideweaver.

Project description


๐Ÿš€ Incorporator

A schema-free data mapper that turns JSON, XML, or CSV into a unified Python object graph with dot-notation and access-at-runtime โ€” plus an in-process orchestrator (Tideweaver) for multi-source pipelines.

PyPI version Python Versions Downloads

CI mypy: strict Code style: black Linter: ruff

Pydantic v2 HTTPX

License: MIT GitHub stars

โœจ Highlights

  • Works with unpredictable JSON APIs โ€” and digests XML, CSV, NDJSON, SQLite, Parquet without a line of schema. Native Python objects instantly, no manual model definitions.
  • Handles changing structures at runtime โ€” missing keys and mutating types absorbed without validation errors.
  • Pydantic + HTTPX under the hood โ€” no data classes, connection poolers, or pagination loops to write.

๐Ÿ› ๏ธ How it Works: Zero-Schema Ingestion

Imagine this telemetry JSON. The nested "st" dictionary changes structure for every subsystem (pos vs sig vs bat). Standard parsers would crash.

[
  {"id":"NAV", "st":{"pos":[12,44], "ok":1}},
  {"id":"COM", "st":{"sig":78, "ok":1}},
  {"id":"PWR", "st":{"bat":92, "ok":1}},
  {"id":"THR", "st":{"lvl":63, "ok":0}}
]

Feed it the unpredictable JSON. Incorporator unifies the changing structures into a single object graph:

import asyncio
from incorporator import Incorporator

class System(Incorporator): pass     # Subclass; everything else hangs off it.

async def main():
    systems = await System.incorp(inc_file="telemetry.json", inc_code="id")
    print(systems.inc_dict["NAV"].st.pos)   # [12, 44]
    print(systems.inc_dict["PWR"].st.bat)   # 92

    thr = systems.inc_dict["THR"]
    if not thr.st.ok:
        print(f"โš ๏ธ THRUST FAILURE! Efficiency dropped to {thr.st.lvl}")

asyncio.run(main())

The format is inferred from the URL or file extension. The syntax never changes for XML, CSV, NDJSON, SQLite, Parquet, Feather, ORC, Avro, or XLSX โ€” same incorp() / export() surface.


๐Ÿ“ฆ Installation

Built on Pydantic V2 metaprogramming, HTTPX, and Tenacity. No system dependencies.

pip install incorporator                  # core
pip install incorporator[speedups]        # orjson + lxml + cramjam
pip install incorporator[parquet]         # pyarrow โ€” Parquet, Feather, ORC
pip install incorporator[avro]            # fastavro
pip install incorporator[orchestrate]     # typer + prefect โ€” CLI + Prefect wrappers
pip install incorporator[all]             # everything except [parquet]

๐Ÿงฐ The Verbs + One Orchestrator

Every method you'll call on an Incorporator subclass, plus the windowed orchestrator.

incorp() โ€” fetch, parse, build the object graph

class Launch(Incorporator): pass

launches = await Launch.incorp(inc_url="https://ll.thespacedevs.com/2.2.0/launch/upcoming/")
print(launches[0].name)

โ†’ Tutorial 1 โ€” First Steps

test() โ€” let the framework write your incorp() kwargs

await Launch.test(inc_url="https://api.unknown.com/v1/users")
# Prints payload tree + suggested inc_code, rec_path, conv_dict.

refresh() โ€” re-fetch live data into existing instances

await Launch.refresh(instance=launches)

The seed call's network context (params, headers, rec_path, conv_dict, ...) auto-replays.

export() โ€” serialise to any format

await Launch.export(instance=launches, file_path="launches.parquet") โ€” JSON, NDJSON, CSV, XML, SQLite, Parquet, Feather, ORC, Avro, XLSX. โ†’ Formats & compression

stream() โ€” paginated bulk export, O(1) memory

For paginated APIs or local files too big for RAM, stream() fetches one page at a time, exports it, releases the page, and moves on โ€” peak memory stays at roughly one chunk regardless of dataset size. A Wave per chunk is the built-in observability stream.

from incorporator.io.pagination import PageNumberPaginator

async for wave in Launch.stream(
    incorp_params={
        "inc_url": "https://api.example.com/launches",
        "inc_page": PageNumberPaginator(page_param="page"),
    },
    refresh_params=None,                                 # chunking: opt out of per-chunk refresh
    export_params={"file_path": "launches.ndjson", "if_exists": "append"},
):
    if wave.failed_sources: print(wave)

Use an append-friendly format (.ndjson / .csv / .sqlite / .avro) โ€” Parquet, Feather, ORC, and Excel rebuild the whole file every wave and aren't safe for per-tick streams. For live dashboards keeping a registry hot across many sources, reach for fjord() instead.

โ†’ Streaming & pagination ยท Tutorial 8

fjord() โ€” a multi-source data pipeline

Fans out across N concurrent sources, fuses them through a user-defined outflow(state), exports the combined output.

async for wave in Incorporator.fjord(
    stream_params=[
        {"cls": Coin,  "incorp_params": {"inc_url": "..."}, "refresh_interval": 30},
        {"cls": Order, "incorp_params": {"inc_url": "..."}, "refresh_interval": 5},
    ],
    outflow="outflow.py",
    export_params={"file_path": "fusion.parquet"},
):
    if wave.failed_sources: print(wave)

โ†’ Tutorial 10 โ€” Multi-Source Fjord

Tideweaver โ€” orchestrate multiple feeds on independent intervals

When you need several sources at different cadences inside a single time window, with dependency edges gating downstream work until upstreams produce fresh data, build a Watershed and hand it to Tideweaver:

from incorporator import Tideweaver, Watershed, Stream, Fjord

watershed = Watershed.diamond(
    window=(start, end),
    head=Stream(name="binance", cls=BinanceBook, interval=15, incorp_params={...}),
    middle=[Stream(name="coinbase", cls=CoinbaseTicker, interval=30, incorp_params={...}),
            Stream(name="kraken",   cls=KrakenTicker,   interval=30, incorp_params={...})],
    tail=Fjord(name="best_market", cls=BestMarket, interval=30,
               export_params={"file_path": "arb_signals.ndjson"}),
    outflow="arb_outflow.py",
)
async for tide in Tideweaver(watershed).run():
    print(tide.tide_number, tide.fired, tide.skipped)

Four shape helpers (parallel, chain, fanout, diamond) plus custom with explicit edges. Declarative watershed.json config + incorporator tideweaver run / validate CLI mirror the stream / fjord workflow.

โ†’ Tutorial 11 โ€” Tideweaver

When to reach for which long-running verb

Verb Sources Shape Reach for it whenโ€ฆ
stream() one paginated chunks, O(1) memory bulk drain a paginated API or massive local file into a warehouse / archive
fjord() many stateful in-memory registry, live refresh keep a hot multi-source object graph synchronised and snapshot it on a cadence
Tideweaver many windowed graph of streams + fjords with dependency edges run several feeds at independent intervals inside a single time window, with downstream work gated on fresh upstream data

display() โ€” REPL debug print: launches[0].display()


๐Ÿš€ From Code to Production โ€” CLI & Docker

The CLI runs the same engines from declarative config. No Python required.

Command What it does
incorporator init --type {stream,fjord,tideweaver} Scaffold a starter pipeline.json or watershed.json (+ outflow.py for fjord).
incorporator validate <config>.json Structural check before you ship โ€” no network calls. Auto-detects type.
incorporator stream pipeline.json Run a single-source stream pipeline.
incorporator fjord pipeline.json Run a multi-source fjord pipeline.
incorporator tideweaver run watershed.json Run a windowed orchestration graph.
incorporator init --type stream --output-dir .
incorporator validate pipeline.json && incorporator stream pipeline.json   # ...or: docker compose up -d

Secrets stay out of config โ€” ${API_KEY} for env vars, ${file:/run/secrets/api_key} for Docker / Kubernetes Secrets mounts. Set INCORPORATOR_SECRETS_ROOT=/run/secrets to sandbox ${file:...} against directory-traversal.

โ†’ CLI reference ยท Deployment & secrets


๐Ÿ›  Resilience & Batteries Included

  • GIL-free hyperthreading via the [speedups] extra. โ†’ Installation
  • Invisible decompression for .gz, .bz2, .lzma, .zip, .tar โ€” ZIP/TAR paths validated against directory-traversal and a 1 GB bomb cap. โ†’ Formats
  • Connection pooling + retries + DLQ โ€” HTTP/2-multiplexed httpx.AsyncClient, Tenacity backoff, failed URLs on wave.failed_sources. Opt-in block_internal_redirects=True rejects 3xx Locations to RFC1918 / loopback / cloud-metadata IPs.
  • Atomic writes + spreadsheet-injection guard โ€” Parquet / Feather / ORC / JSON / XML / XLSX build via tempfile + os.replace() (no half-written files); CSV / XLSX cells starting with = / @ / + / - are quoted on export (OWASP).
  • Non-blocking observability โ€” subclass LoggedIncorporator; logs flow through a QueueHandler so disk I/O never blocks the event loop.
  • Cross-format round-tripping โ€” JSON โ†” Parquet โ†” SQLite โ†” Avro โ†” CSV โ†” XML. โ†’ Tutorial 3

๐Ÿ“š Tutorials (in order)

The eleven-tutorial curriculum. Each slot introduces one new verb or technique, alternating CoinGecko-heavy steps with non-CG domain examples so per-minute rate-limit windows refresh between CG calls and each Incorporator pattern lands across multiple real-world verticals. Runnable code under /examples.

  1. ๐ŸŒฑ First Steps + DX Inspector โ€” discovery-first flow: test() profiles a CoinGecko endpoint, then incorp() applies its recommendations.
  2. ๐Ÿ” Data Lake Pivot โ€” SaaS roster โ†’ BI-ready columnar; pivot a /users endpoint into Avro + SQLite.
  3. ๐Ÿ“ฆ Snapshot Warehouse โ€” Universal Formats โ€” fan CoinGecko top-100 snapshots into NDJSON / CSV / SQLite / Parquet, then round-trip every artifact.
  4. ๐Ÿ›ก๏ธ XML Post Audit โ€” federal-VIN fraud audit: XML invoice ledger enriched via one batched POST.
  5. ๐Ÿš€ Parent โ†’ Child Drilling โ€” CoinGecko /coins/markets โ†’ /coins/{id} fan-out โ€” the canonical backtest-data-prep pattern.
  6. ๐Ÿš€ SpaceX Launches โ€” ops-dashboard feed: upcoming launches drilled for rocket + launchpad detail.
  7. ๐Ÿ”„ Stateful Refresh โ€” refresh() three ways against Binance's live ticker.
  8. ๐ŸŒŠ Streaming Daemon โ€” Paginated Bulk Export at O(1) Memory โ€” stream()'s canonical job: chunking-mode drain of a paginated source. Plus a single-source stateful_polling=True compatibility shim with a pointer to T10 for the multi-source live-daemon path.
  9. ๐Ÿ NASCAR Fantasy Fjord โ€” fantasy-sports scoring fjord across Cup, Xfinity, Truck series; previews T10's abstraction.
  10. ๐ŸŒŠ Multi-Source Fjord โ€” fjord() fusing CoinGecko + Binance into a live cross-venue spread metric.
  11. ๐Ÿงต Tideweaver โ€” Multi-Exchange Arb Scanner (capstone) โ€” declarative windowed orchestration: three exchanges โ†’ one best-market record.

๐Ÿ“‘ Reference

๐Ÿ“Ž Appendices โ€” optional side-quests


๐Ÿค Philosophy & Contributing

See CONTRIBUTING.md for the dev install, quality bar, and architecture conventions. Security disclosures: SECURITY.md. Release notes: CHANGELOG.md.


Have a suggestion or hitting a snag? Edit this page on GitHub ยท Report an issue ยท Browse open issues

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

incorporator-1.1.3.tar.gz (295.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

incorporator-1.1.3-py3-none-any.whl (226.4 kB view details)

Uploaded Python 3

File details

Details for the file incorporator-1.1.3.tar.gz.

File metadata

  • Download URL: incorporator-1.1.3.tar.gz
  • Upload date:
  • Size: 295.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.3

File hashes

Hashes for incorporator-1.1.3.tar.gz
Algorithm Hash digest
SHA256 b8bd8c9f4bc540d9642b94e783c82b493082a96781ee212a451d365ab45a8bf0
MD5 62f99f2865cce6735eb5377fd78d4aef
BLAKE2b-256 43fb8d8c6d68e92be530fec875ac542a55b8d5b33765aa1678ef4aa2bcb9d215

See more details on using hashes here.

File details

Details for the file incorporator-1.1.3-py3-none-any.whl.

File metadata

  • Download URL: incorporator-1.1.3-py3-none-any.whl
  • Upload date:
  • Size: 226.4 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.3

File hashes

Hashes for incorporator-1.1.3-py3-none-any.whl
Algorithm Hash digest
SHA256 690652f778777d6aa6f6abc96a3d5333c7e2084b8262ba595d8e8e2588463ff4
MD5 28489e6f486cc9cd2cb5adecb158e77b
BLAKE2b-256 d5c5a3b2c29f208553b3dc9d228ba2cc6a56dce7dcc35d0fb620bbd6b9e29704

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page