Skip to main content

Schema-free ETL mapper with in-process graph orchestration: turn any REST API, CSV, Parquet, or XML into typed Python objects, then coordinate multi-source pipelines in a bounded time window with Tideweaver.

Project description


๐Ÿš€ Incorporator

A schema-free data mapper that turns JSON, XML, or CSV into a unified Python object graph with dot-notation and access-at-runtime โ€” plus an in-process orchestrator (Tideweaver) for multi-source pipelines.

PyPI version Python Versions Downloads

CI mypy: strict Code style: black Linter: ruff

Pydantic v2 HTTPX

License: MIT GitHub stars

โœจ Highlights

  • Works with unpredictable JSON APIs โ€” and digests XML, CSV, NDJSON, SQLite, Parquet without a line of schema. Native Python objects instantly, no manual model definitions.
  • Handles changing structures at runtime โ€” missing keys and mutating types absorbed without validation errors.
  • Pydantic + HTTPX under the hood โ€” no data classes, connection poolers, or pagination loops to write.

๐Ÿ› ๏ธ How it Works: Zero-Schema Ingestion

Imagine this telemetry JSON. The nested "st" dictionary changes structure for every subsystem (pos vs sig vs bat). Standard parsers would crash.

[
  {"id":"NAV", "st":{"pos":[12,44], "ok":1}},
  {"id":"COM", "st":{"sig":78, "ok":1}},
  {"id":"PWR", "st":{"bat":92, "ok":1}},
  {"id":"THR", "st":{"lvl":63, "ok":0}}
]

Feed it the unpredictable JSON. Incorporator unifies the changing structures into a single object graph:

import asyncio
from incorporator import Incorporator

class System(Incorporator): pass     # Subclass; everything else hangs off it.

async def main():
    systems = await System.incorp(inc_file="telemetry.json", inc_code="id")
    print(systems.inc_dict["NAV"].st.pos)   # [12, 44]
    print(systems.inc_dict["PWR"].st.bat)   # 92

    thr = systems.inc_dict["THR"]
    if not thr.st.ok:
        print(f"โš ๏ธ THRUST FAILURE! Efficiency dropped to {thr.st.lvl}")

asyncio.run(main())

The format is inferred from the URL or file extension. The syntax never changes for XML, CSV, NDJSON, SQLite, Parquet, Feather, ORC, Avro, or XLSX โ€” same incorp() / export() surface.


๐Ÿ“ฆ Installation

Built on Pydantic V2 metaprogramming, HTTPX, and Tenacity. No system dependencies.

pip install incorporator                  # core
pip install incorporator[speedups]        # orjson + lxml + cramjam
pip install incorporator[parquet]         # pyarrow โ€” Parquet, Feather, ORC
pip install incorporator[avro]            # fastavro
pip install incorporator[orchestrate]     # typer + prefect โ€” CLI + Prefect wrappers
pip install incorporator[all]             # everything except [parquet]

๐Ÿงฐ The Verbs + One Orchestrator

Every method you'll call on an Incorporator subclass, plus the windowed orchestrator.

incorp() โ€” fetch, parse, build the object graph

class Launch(Incorporator): pass

launches = await Launch.incorp(inc_url="https://ll.thespacedevs.com/2.2.0/launch/upcoming/")
print(launches[0].name)

โ†’ Tutorial 1 โ€” First Steps

test() โ€” let the framework write your incorp() kwargs

await Launch.test(inc_url="https://api.unknown.com/v1/users")
# Prints payload tree + suggested inc_code, rec_path, conv_dict.

refresh() โ€” re-fetch live data into existing instances

await Launch.refresh(instance=launches)

The seed call's network context (params, headers, rec_path, conv_dict, ...) auto-replays.

export() โ€” serialise to any format

await Launch.export(instance=launches, file_path="launches.parquet") โ€” JSON, NDJSON, CSV, XML, SQLite, Parquet, Feather, ORC, Avro, XLSX. โ†’ Formats & compression

stream() โ€” paginated bulk export, O(1) memory

For paginated APIs or local files too big for RAM, stream() fetches one page at a time, exports it, releases the page, and moves on โ€” peak memory stays at roughly one chunk regardless of dataset size. A Wave per chunk is the built-in observability stream.

from incorporator.io.pagination import PageNumberPaginator

async for wave in Launch.stream(
    incorp_params={
        "inc_url": "https://api.example.com/launches",
        "inc_page": PageNumberPaginator(page_param="page"),
    },
    refresh_params=None,                                 # chunking: opt out of per-chunk refresh
    export_params={"file_path": "launches.ndjson", "if_exists": "append"},
):
    if wave.failed_sources: print(wave)

Use an append-friendly format (.ndjson / .csv / .sqlite / .avro) โ€” Parquet, Feather, ORC, and Excel rebuild the whole file every wave and aren't safe for per-tick streams. For live dashboards keeping a registry hot across many sources, reach for fjord() instead.

โ†’ Streaming & pagination ยท Tutorial 8

fjord() โ€” a multi-source data pipeline

Fans out across N concurrent sources, fuses them through a user-defined outflow(state), exports the combined output.

async for wave in Incorporator.fjord(
    stream_params=[
        {"cls": Coin,  "incorp_params": {"inc_url": "..."}, "refresh_interval": 30},
        {"cls": Order, "incorp_params": {"inc_url": "..."}, "refresh_interval": 5},
    ],
    outflow="outflow.py",
    export_params={"file_path": "fusion.parquet"},
):
    if wave.failed_sources: print(wave)

โ†’ Tutorial 10 โ€” Multi-Source Fjord

Tideweaver โ€” orchestrate multiple feeds on independent intervals

When you need several sources at different cadences inside a single time window, with dependency edges gating downstream work until upstreams produce fresh data, build a Watershed and hand it to Tideweaver:

from incorporator import Tideweaver, Watershed, Stream, Fjord

watershed = Watershed.diamond(
    window=(start, end),
    head=Stream(name="binance", cls=BinanceBook, interval=15, incorp_params={...}),
    middle=[Stream(name="coinbase", cls=CoinbaseTicker, interval=30, incorp_params={...}),
            Stream(name="kraken",   cls=KrakenTicker,   interval=30, incorp_params={...})],
    tail=Fjord(name="best_market", cls=BestMarket, interval=30,
               export_params={"file_path": "arb_signals.ndjson"}),
    outflow="arb_outflow.py",
)
async for tide in Tideweaver(watershed).run():
    print(tide.tide_number, tide.fired, tide.skipped)

Four shape helpers (parallel, chain, fanout, diamond) plus custom with explicit edges. Declarative watershed.json config + incorporator tideweaver run / validate CLI mirror the stream / fjord workflow.

Probe โ†’ plan โ†’ run, no disk round-trip. When you have N unknown endpoints, architect() profiles each one and emits a runnable plan you can tune in-memory before handing it to Tideweaver:

plan = await Coin.architect(
    sources={"binance": book_url, "coinbase": cb_url, "kraken": kr_url},
    output="plan",
)
plan.currents[0].interval_hint = 10                  # tune
watershed = plan.to_watershed(window=(start, end))   # materialise
async for tide in Tideweaver(watershed).run():
    ...

architect(output=...) also emits "report" (pretty-printed), "python" (paste-ready module), or "json" (paste-ready watershed.json).

Per-edge flow control. Each edge composes six orthogonal primitives โ€” Gate (HardLock / SoftPass / Weir), SurgeBarrier, Penstock (Sustained / Burst / Window / Backpressure / Signal), Reservoir, Spillway (DropOldest / RaiseOverflow / ExportToArchive), and a declarative FlowObserver (Null / Logging / Signal) for telemetry. The shape constructors accept a top-level flow= or gate_mode= shorthand; explicit Edge(...) carries its own:

from incorporator.observability.tideweaver import (
    FlowControl, Weir, BurstPenstock, Reservoir, LoggingObserver,
)

edge_flow = FlowControl(
    gate=Weir(),                                          # fire on freshness
    penstock=BurstPenstock(rate_per_sec=2.0, burst=5),    # token bucket
    reservoir=Reservoir(depth=3),                         # buffer 3 waves
    observer=LoggingObserver(fire_level="info"),          # declarative telemetry
)

The same shapes deserialize from watershed.json via Pydantic discriminated unions ({"gate": {"type": "weir"}, ...}) โ€” see Appendix โ€” NASCAR Tideweaver for the JSON form on a working diamond. For non-verb tick logic (cron-style cleanups, custom side-effects), subclass CustomCurrent and override async tick(scheduler).

โ†’ Tutorial 11 โ€” Tideweaver ยท Canal toolkit primitives in API Atlas

When to reach for which long-running verb

Verb Sources Shape Reach for it whenโ€ฆ
stream() one paginated chunks, O(1) memory bulk drain a paginated API or massive local file into a warehouse / archive
fjord() many stateful in-memory registry, live refresh keep a hot multi-source object graph synchronised and snapshot it on a cadence
Tideweaver many windowed graph of streams + fjords with dependency edges run several feeds at independent intervals inside a single time window, with downstream work gated on fresh upstream data

display() โ€” REPL debug print: launches[0].display()


๐Ÿš€ From Code to Production โ€” CLI & Docker

The CLI runs the same engines from declarative config. No Python required.

Command What it does
incorporator init --type {stream,fjord,tideweaver} Scaffold a starter pipeline.json or watershed.json (+ outflow.py for fjord).
incorporator validate <config>.json Structural check before you ship โ€” no network calls. Auto-detects type.
incorporator stream pipeline.json Run a single-source stream pipeline.
incorporator fjord pipeline.json Run a multi-source fjord pipeline.
incorporator tideweaver run watershed.json Run a windowed orchestration graph.
incorporator init --type stream --output-dir .
incorporator validate pipeline.json && incorporator stream pipeline.json   # ...or: docker compose up -d

Secrets stay out of config โ€” ${API_KEY} for env vars, ${file:/run/secrets/api_key} for Docker / Kubernetes Secrets mounts. Set INCORPORATOR_SECRETS_ROOT=/run/secrets to sandbox ${file:...} against directory-traversal.

โ†’ CLI reference ยท Deployment & secrets


๐Ÿ›  Resilience & Batteries Included

  • GIL-free hyperthreading via the [speedups] extra. โ†’ Installation

  • Invisible decompression for .gz, .bz2, .lzma, .zip, .tar โ€” ZIP/TAR paths validated against directory-traversal and a 1 GB bomb cap. โ†’ Formats

  • Connection pooling + retries + structured rejects โ€” HTTP/2-multiplexed httpx.AsyncClient, Tenacity backoff, and IncorporatorList.rejects: List[RejectEntry] carrying source / error_kind / message / retry_after / wave_index for every failed source. The legacy flat failed_sources: List[str] is preserved as a derived view. Opt-in block_internal_redirects=True rejects 3xx Locations to RFC1918 / loopback / cloud-metadata IPs.

  • Friendly rate limiting โ€” the framework ships with no implicit per-host throttling. Opt in once at startup with register_host_penstock (one source of truth across every incorp() call) or pass requests_per_second=X per call. The same Penstock primitive serves both the HTTP layer and Tideweaver edges:

    from incorporator import register_host_penstock
    from incorporator.io.penstock import SustainedPenstock
    
    register_host_penstock("api.coingecko.com", SustainedPenstock(rate_per_sec=0.2))   # ~12 r/min
    register_host_penstock("pokeapi.co",        SustainedPenstock(rate_per_sec=1.5))   # ~90 r/min
    register_host_penstock("vpic.nhtsa.dot.gov", SustainedPenstock(rate_per_sec=1.5))
    

    architect() surfaces 429 / Retry-After hints during probing and recommends a Penstock on the matching edge. See register_host_penstock in the API Atlas.

  • Lambda-free conv_dict โ€” inc, calc, calc_all, pluck, link_to, link_to_list, split_and_get all short-circuit silently on garbage input (None, "", "N/A", "null", "unknown", "nan", "undefined") before the user callable runs. Defensive null guards inside lambdas are no longer needed; use stdlib callables directly:

    conv_dict = {
        "id":     inc(int),                                              # type coerce
        "title":  calc(str.lower, "title", default="", target_type=str), # transform
        "status": calc("Alive".__eq__, "status", default=False),         # enum-to-bool
    }
    

    See incorporator.io.SourceRef for the opt-in typed source value (URL / file / parent / payload / kwargs) when you need explicit source dispatch.

  • Atomic writes + spreadsheet-injection guard โ€” Parquet / Feather / ORC / JSON / XML / XLSX build via tempfile + os.replace() (no half-written files); CSV / XLSX cells starting with = / @ / + / - are quoted on export (OWASP).

  • Non-blocking observability โ€” subclass LoggedIncorporator; logs flow through a QueueHandler so disk I/O never blocks the event loop.

  • Cross-format round-tripping โ€” JSON โ†” Parquet โ†” SQLite โ†” Avro โ†” CSV โ†” XML. โ†’ Tutorial 3


๐Ÿ“š Tutorials (in order)

The eleven-tutorial curriculum. Each slot introduces one new verb or technique, alternating CoinGecko-heavy steps with non-CG domain examples so per-minute rate-limit windows refresh between CG calls and each Incorporator pattern lands across multiple real-world verticals. Runnable code under /examples.

  1. ๐ŸŒฑ First Steps + DX Inspector โ€” discovery-first flow: test() profiles a CoinGecko endpoint, then incorp() applies its recommendations.
  2. ๐Ÿ” Data Lake Pivot โ€” SaaS roster โ†’ BI-ready columnar; pivot a /users endpoint into Avro + SQLite.
  3. ๐Ÿ“ฆ Snapshot Warehouse โ€” Universal Formats โ€” fan CoinGecko top-100 snapshots into NDJSON / CSV / SQLite / Parquet, then round-trip every artifact.
  4. ๐Ÿ›ก๏ธ XML Post Audit โ€” federal-VIN fraud audit: XML invoice ledger enriched via one batched POST.
  5. ๐Ÿš€ Parent โ†’ Child Drilling โ€” CoinGecko /coins/markets โ†’ /coins/{id} fan-out โ€” the canonical backtest-data-prep pattern.
  6. ๐Ÿš€ SpaceX Launches โ€” ops-dashboard feed: upcoming launches drilled for rocket + launchpad detail.
  7. ๐Ÿ”„ Stateful Refresh โ€” refresh() three ways against Binance's live ticker.
  8. ๐ŸŒŠ Streaming Daemon โ€” Paginated Bulk Export at O(1) Memory โ€” stream()'s canonical job: chunking-mode drain of a paginated source. Plus a single-source stateful_polling=True compatibility shim with a pointer to T10 for the multi-source live-daemon path.
  9. ๐Ÿ NASCAR Fantasy Fjord โ€” fantasy-sports scoring fjord across Cup, Xfinity, Truck series; previews T10's abstraction.
  10. ๐ŸŒŠ Multi-Source Fjord โ€” fjord() fusing CoinGecko + Binance into a live cross-venue spread metric.
  11. ๐Ÿงต Tideweaver โ€” Multi-Exchange Arb Scanner (capstone) โ€” declarative windowed orchestration: three exchanges โ†’ one best-market record.

๐Ÿ“‘ Reference

๐Ÿ“Ž Appendices โ€” optional side-quests


๐Ÿค Philosophy & Contributing

See CONTRIBUTING.md for the dev install, quality bar, and architecture conventions. Security disclosures: SECURITY.md. Release notes: CHANGELOG.md.


Have a suggestion or hitting a snag? Edit this page on GitHub ยท Report an issue ยท Browse open issues

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

incorporator-1.2.1.tar.gz (465.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

incorporator-1.2.1-py3-none-any.whl (302.1 kB view details)

Uploaded Python 3

File details

Details for the file incorporator-1.2.1.tar.gz.

File metadata

  • Download URL: incorporator-1.2.1.tar.gz
  • Upload date:
  • Size: 465.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.3

File hashes

Hashes for incorporator-1.2.1.tar.gz
Algorithm Hash digest
SHA256 9d5f6bc971937fce8896179412789f989450bbeb61b090d2e69fdda4257e120b
MD5 1fa1ee5c8340c30323fccdcf969c779a
BLAKE2b-256 f679b93bf40887bba22dd1575f4586645b919a9853ad24ded57c8f97aeaebdf2

See more details on using hashes here.

File details

Details for the file incorporator-1.2.1-py3-none-any.whl.

File metadata

  • Download URL: incorporator-1.2.1-py3-none-any.whl
  • Upload date:
  • Size: 302.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.3

File hashes

Hashes for incorporator-1.2.1-py3-none-any.whl
Algorithm Hash digest
SHA256 8f0f65b6bcdb4c764256c0d90de0e78983692b1fe311a958c700289d1b1f22ef
MD5 ebf58f59f24e64e0b0798099b2fa2a90
BLAKE2b-256 452438c4c3e30931f490dc8e2697e547e48a762c6c20cb2bb12547bf91ff6ed8

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page