Skip to main content

Schema-free ETL mapper with in-process graph orchestration: turn any REST API, CSV, Parquet, or XML into typed Python objects, then coordinate multi-source pipelines in a bounded time window with Tideweaver.

Project description


๐Ÿš€ Incorporator

Schema-free ingestion for APIs you don't control โ€” and an orchestrator that tells you how to tune it.

Both halves share the same primitives โ€” Penstock throttling at the HTTP and edge layers, Wave / Tide / RejectEntry outcome records, FlowControl โ€” so the parser and the orchestrator compose, but you can adopt either without the other. Local paginators (SQLitePaginator, CSVPaginator, AvroPaginator) accept the same penstock= kwarg as the HTTP layer โ€” one rate-limit primitive across both.

PyPI version Python Versions Downloads

CI mypy: strict Code style: black Linter: ruff

Pydantic v2 HTTPX

License: MIT GitHub stars

โœจ Highlights

  • Works with unpredictable JSON APIs โ€” digests XML, CSV, NDJSON, SQLite, Parquet, Avro without a line of schema; missing keys and mutating types absorbed without validation errors.
  • The pipeline tells you what to tune โ€” after a Tideweaver run, architect.tune() consumes the accumulated rejects, tides, and waves and emits a TuningReport of severity-sorted hints.
  • Disk-backed observability for orchestration โ€” LoggedTideweaver routes every Tide and RejectEntry through a QueueHandler pipeline, replayable with get_tides() / get_rejects().

๐Ÿ› ๏ธ How it Works: Zero-Schema Ingestion

Imagine this telemetry JSON. The nested "st" dictionary changes structure for every subsystem (pos vs sig vs bat). Standard parsers would crash.

[
  {"id":"NAV", "st":{"pos":[12,44], "ok":1}},
  {"id":"COM", "st":{"sig":78, "ok":1}},
  {"id":"PWR", "st":{"bat":92, "ok":1}},
  {"id":"THR", "st":{"lvl":63, "ok":0}}
]

Feed it the unpredictable JSON. Incorporator unifies the changing structures into a single object graph:

import asyncio
from incorporator import Incorporator

class System(Incorporator): pass     # Subclass; everything else hangs off it.

async def main():
    systems = await System.incorp(inc_file="telemetry.json", inc_code="id")
    print(systems.inc_dict["NAV"].st.pos)   # [12, 44]
    print(systems.inc_dict["PWR"].st.bat)   # 92

    thr = systems.inc_dict["THR"]
    if not thr.st.ok:
        print(f"โš ๏ธ THRUST FAILURE! Efficiency dropped to {thr.st.lvl}")

asyncio.run(main())

The format is inferred from the URL or file extension. The syntax never changes for XML, CSV, NDJSON, SQLite, Parquet, Feather, ORC, Avro, or XLSX โ€” same incorp() / export() surface.


๐Ÿ“ฆ Installation

Built on Pydantic V2 metaprogramming, HTTPX, and Tenacity. No system dependencies. Requires Python 3.10+. CI runs against 3.10 / 3.11 / 3.13 on Ubuntu and Windows.

pip install incorporator                  # core
pip install incorporator[speedups]        # orjson + lxml + cramjam
pip install incorporator[parquet]         # pyarrow โ€” Parquet, Feather, ORC
pip install incorporator[avro]            # fastavro
pip install incorporator[xlsx]            # openpyxl โ€” XLSX read/write
pip install incorporator[orchestrate]     # typer + prefect โ€” CLI + Prefect wrappers
pip install incorporator[all]             # everything except [parquet]

๐Ÿงฐ The Verbs + One Orchestrator

Every method you'll call on an Incorporator subclass, plus the windowed orchestrator.

incorp() โ€” fetch, parse, build the object graph

class Launch(Incorporator): pass

launches = await Launch.incorp(inc_url="https://ll.thespacedevs.com/2.2.0/launch/upcoming/")
print(launches[0].name)

โ†’ Tutorial 1 โ€” First Steps

test() โ€” let the framework write your incorp() kwargs

await Launch.test(inc_url="https://api.unknown.com/v1/users")
# Prints payload tree + suggested inc_code, rec_path, conv_dict.

refresh() โ€” re-fetch live data into existing instances

await Launch.refresh(instance=launches)

The seed call's network context (params, headers, rec_path, conv_dict, ...) auto-replays.

export() โ€” serialise to any format

await Launch.export(instance=launches, file_path="launches.parquet") โ€” JSON, NDJSON, CSV, XML, SQLite, Parquet, Feather, ORC, Avro, XLSX. โ†’ Formats & compression

stream() โ€” paginated bulk export, O(1) memory

For paginated APIs or local files too big for RAM, stream() fetches one page at a time, exports it, releases the page, and moves on โ€” peak memory stays at roughly one chunk regardless of dataset size. A Wave per chunk is the built-in observability stream.

from incorporator.io.pagination import PageNumberPaginator

async for wave in Launch.stream(
    incorp_params={
        "inc_url": "https://api.example.com/launches",
        "inc_page": PageNumberPaginator(page_param="page"),
    },
    refresh_params=None,                                 # chunking: opt out of per-chunk refresh
    export_params={"file_path": "launches.ndjson", "if_exists": "append"},
):
    if wave.failed_sources: print(wave)

Use an append-friendly format (.ndjson / .csv / .sqlite / .avro) โ€” Parquet, Feather, ORC, and Excel rebuild the whole file every wave. For multi-source live registries reach for fjord(). Pass adapt_chunk_size=True to resize paginator.chunk_size via AIMD โ€” bounded by chunk_size_min / chunk_size_max and a target latency window of target_min_sec / target_max_sec.

โ†’ Streaming & pagination ยท Tutorial 8

fjord() โ€” a multi-source data pipeline

Fans out across N concurrent sources, fuses them through a user-defined outflow(state), exports the combined output.

async for wave in Incorporator.fjord(
    stream_params=[
        {"cls": Coin,  "incorp_params": {"inc_url": "..."}, "refresh_interval": 30},
        {"cls": Order, "incorp_params": {"inc_url": "..."}, "refresh_interval": 5},
    ],
    outflow="outflow.py",
    export_params={"file_path": "fusion.parquet"},
):
    if wave.failed_sources: print(wave)

โ†’ Tutorial 10 โ€” Multi-Source Fjord

Tideweaver โ€” orchestrate multiple feeds on independent intervals

When you need several sources at different cadences inside a single time window, with dependency edges gating downstream work until upstreams produce fresh data, build a Watershed and hand it to Tideweaver:

from incorporator import Tideweaver, Watershed, Stream, Fjord

watershed = Watershed.diamond(
    window=(start, end),
    head=Stream(name="binance", cls=BinanceBook, interval=15, incorp_params={...}),
    middle=[Stream(name="coinbase", cls=CoinbaseTicker, interval=30, incorp_params={...}),
            Stream(name="kraken",   cls=KrakenTicker,   interval=30, incorp_params={...})],
    tail=Fjord(name="best_market", cls=BestMarket, interval=30,
               export_params={"file_path": "arb_signals.ndjson"}),
    outflow="outflow.py",
)
async for tide in Tideweaver(watershed).run():
    print(tide.tide_number, tide.fired, tide.skipped)

Four shape helpers (parallel, chain, fanout, diamond) plus custom with explicit edges. Declarative watershed.json config + incorporator tideweaver run / validate CLI mirror the stream / fjord workflow. After the run, Tideweaver.summary(tides=tides) returns a TuningReport (see Resilience). Probe โ†’ plan โ†’ run, no disk round-trip. When you have N unknown endpoints, architect() profiles each one and emits a runnable plan you can tune in-memory before handing it to Tideweaver:

plan = await Coin.architect(
    sources={"binance": book_url, "coinbase": cb_url, "kraken": kr_url},
    output="plan",
)
plan.currents[0].interval_hint = 10                  # tune
watershed = plan.to_watershed(window=(start, end))   # materialise
async for tide in Tideweaver(watershed).run():
    ...

architect(output=...) also emits "report" (pretty-printed), "python" (paste-ready module), or "json" (paste-ready watershed.json). After a run, architect.tune() consumes the accumulated rejects + tides + waves and emits a TuningReport of structured recommendations โ€” see Resilience below. Per-edge flow control. Each edge composes six orthogonal primitives โ€” Gate (HardLock / SoftPass / Weir), SurgeBarrier, Penstock (Sustained / Burst / Window / Backpressure / Signal), Reservoir, Spillway (DropOldest / RaiseOverflow / ExportToArchive), and a declarative FlowObserver (Null / Logging / Signal) for telemetry. The shape constructors accept a top-level flow= or gate_mode= shorthand; explicit Edge(...) carries its own:

from incorporator.tideweaver import (
    FlowControl, Weir, BurstPenstock, Reservoir, LoggingObserver,
)

edge_flow = FlowControl(
    gate=Weir(),                                          # fire on freshness
    penstock=BurstPenstock(rate_per_sec=2.0, burst=5),    # token bucket
    reservoir=Reservoir(depth=3),                         # buffer 3 waves
    observer=LoggingObserver(fire_level="info"),          # declarative telemetry
)

The same shapes deserialize from watershed.json via Pydantic discriminated unions ({"gate": {"type": "weir"}, ...}) โ€” see Appendix โ€” NASCAR Tideweaver for the JSON form on a working diamond. For non-verb tick logic (cron-style cleanups, custom side-effects), subclass CustomCurrent and override async tick(scheduler).

Stream(parent_current=...) declares a parent-child row fan without a CustomCurrent wrapper โ€” one child row per parent snapshot entry at tick time:

child = Stream(
    name="coin_detail",
    cls=CoinDetail,
    interval=60,
    parent_current="markets",          # row-fans from markets._tideweaver_snapshot
    incorp_params={"inc_code": "id"},  # inc_parent injected per row at tick time
)

โ†’ Tutorial 11 โ€” Tideweaver ยท Canal toolkit primitives in API Atlas

When to reach for which long-running verb

Verb Sources Shape Reach for it whenโ€ฆ
stream() one paginated chunks, O(1) memory bulk drain a paginated API or massive local file into a warehouse / archive
fjord() many stateful in-memory registry, live refresh keep a hot multi-source object graph synchronised and snapshot it on a cadence
Tideweaver many windowed graph of streams + fjords with dependency edges run several feeds at independent intervals inside a single time window, with downstream work gated on fresh upstream data

display() โ€” REPL debug print: launches[0].display()

Typed directive wrappers (optional)

excl_lst, name_chg, code_attr, name_attr accept typed frozen wrappers alongside bare shapes. Old call sites keep working; mixed sequences are accepted.

from incorporator.schema.directives import Ex, Nm
await Users.incorp(
    inc_url="https://api.example.com/users",
    excl_lst=["legacy_flag", Ex("profile.internal.ssn")],   # nested drop
    name_chg=[("external_id", "id"), Nm("user_name", "name")],
    code_attr="id",
)

Ex("a.b.c") drops nested leaves (previously only top-level keys could be dropped). PK binding now runs after rename, so inc_code resolves correctly whether name_chg removes the source field or creates it.


๐Ÿš€ From Code to Production โ€” CLI & Docker

The CLI runs the same engines from declarative config. No Python required.

Command What it does
incorporator init --type {stream,fjord,tideweaver} Scaffold a starter pipeline.json or watershed.json (+ outflow.py for fjord).
incorporator validate <config>.json Structural check before you ship โ€” no network calls. Auto-detects type.
incorporator stream pipeline.json Run a single-source stream pipeline.
incorporator fjord pipeline.json Run a multi-source fjord pipeline.
incorporator tideweaver run watershed.json Run a windowed orchestration graph.
incorporator deps [--missing] [--category CAT] [--json] List installed optional extras and what each one unlocks; --json for CI.
incorporator init --type stream --output-dir .
incorporator validate pipeline.json && incorporator stream pipeline.json   # ...or: docker compose up -d

Secrets stay out of config โ€” ${API_KEY} for env vars, ${file:/run/secrets/api_key} for Docker / Kubernetes Secrets mounts. Set INCORPORATOR_SECRETS_ROOT=/run/secrets to sandbox ${file:...} against directory-traversal.

โ†’ CLI reference ยท Deployment & secrets


๐Ÿ›  Resilience & Batteries Included

  • GIL-free hyperthreading via the [speedups] extra. โ†’ Installation

  • Invisible decompression for .gz, .bz2, .lzma, .zip, .tar โ€” ZIP/TAR paths validated against directory-traversal and a 1 GB bomb cap. โ†’ Formats

  • Connection pooling + phase-aware retries + structured rejects โ€” HTTP/2-multiplexed httpx.AsyncClient, phase-aware retry classification (connect-phase errors capped at ~3 attempts; server-responded 5xx/429 up to 8 attempts honoring Retry-After; non-idempotent POST is not retried; HTTP 408 and 425 are retryable), and IncorporatorList.rejects: list[RejectEntry] carrying source / error_kind / is_url_traffic_error / message / retry_after / wave_index for every failed source. RejectEntry.__str__ includes the HTTP reason phrase ([HTTP 429 Too Many Requests]; [HTTP 522] for non-standard codes). The legacy flat failed_sources: list[str] is preserved as a derived view. Opt-in block_internal_redirects=True rejects 3xx Locations to RFC1918 / loopback / cloud-metadata IPs.

  • Friendly rate limiting โ€” the framework ships with no implicit per-host throttling. Opt in once at startup with register_host_penstock (one source of truth across every incorp() call) or pass requests_per_second=X per call. The same Penstock primitive serves both the HTTP layer and Tideweaver edges:

    from incorporator import register_host_penstock
    from incorporator.io.penstock import SustainedPenstock
    register_host_penstock("api.coingecko.com", SustainedPenstock(rate_per_sec=0.2))   # ~12 r/min
    register_host_penstock("pokeapi.co",        SustainedPenstock(rate_per_sec=1.5))   # ~90 r/min
    

    architect() surfaces 429 / Retry-After hints during probing and recommends a Penstock on the matching edge. See register_host_penstock in the API Atlas.

  • Lambda-free conv_dict โ€” inc, calc, calc_all, pluck, link_to, link_to_list, split_and_get all short-circuit silently on garbage input (None, "", "N/A", "null", "unknown", "nan", "undefined") before the user callable runs. Defensive null guards inside lambdas are no longer needed; use stdlib callables directly:

    conv_dict = {
        "id":     inc(int),                                              # type coerce
        "title":  calc(str.lower, "title", default="", target_type=str), # transform
        "status": calc("Alive".__eq__, "status", default=False),         # enum-to-bool
    }
    

    See incorporator.io.SourceRef for the opt-in typed source value (URL / file / parent / payload / kwargs) when you need explicit source dispatch.

  • Atomic writes + spreadsheet-injection guard โ€” Parquet / Feather / ORC / JSON / XML / XLSX build via tempfile + os.replace() (no half-written files); CSV / XLSX cells starting with = / @ / + / - are quoted on export (OWASP).

  • Non-blocking observability with a routing split โ€” subclass LoggedIncorporator; logs flow through a QueueHandler so disk I/O never blocks the event loop. URL/internet-traffic errors (HTTP 4xx/5xx, network failures) route to _api.log; parse and codebase errors route to _error.log; _debug.log is the superset of both. The file location tells you whether the fault is the API's or your code's. get_rejects() unions both files so every reject is covered regardless of routing. For orchestration runs, LoggedTideweaver (from incorporator.tideweaver) is the parallel drop-in for Tideweaver โ€” routes every yielded Tide and every accumulated RejectEntry to disk via the same QueueHandler pipeline; replay with get_tides() / get_rejects() / get_scheduler_events().

  • The pipeline tells you what to tune. architect.tune() reads accumulated rejects, tides, and pass interval and returns a TuningReport of severity-sorted hints โ€” per-edge Penstock rates, byte-rate-aware penstock recommendations, evidence-based timeout hints via tune(timeout=...), surge thresholds. tw.summary(tides=tides) returns the same report from an existing instance.

    from incorporator.tideweaver import LoggedTideweaver, tune
    tw = LoggedTideweaver(watershed, enable_logging=True, logger_name="PriceSession")
    tides = [tide async for tide in tw.run()]
    report = tune(rejects=tw.rejects, tides=tides, pass_interval=tw.pass_interval)
    print(report.render())   # hint blocks, sorted by severity
    
  • Keyed reject audit + backlog short-circuit. Tideweaver.rejects returns a list[RejectEntry] whose error_kind is one of "PenstockLimited", "SurgeHalted", "SkipAhead", "GateBlocked" โ€” every canal-layer skip that never reached a tick body lands here, with from_name / to_name / cooldown_sec populated for keyed analysis. Set backlog_backoff_factor=2.0 on the Tideweaver constructor to extend the next-pass wait when the scheduler is consistently saturated; the default 1.0 is disabled.

  • Optional-dependency introspection โ€” list_deps() / install_hint() / Category / DepInfo Python API; CLI surface via incorporator deps (see CLI table). โ†’ docs/cli_and_configuration.md

  • Cross-format round-tripping โ€” JSON โ†” Parquet โ†” SQLite โ†” Avro โ†” CSV โ†” XML. โ†’ Tutorial 3


๐Ÿ“š Tutorials (in order)

The eleven-tutorial curriculum. Each slot introduces one new verb or technique, alternating CoinGecko-heavy steps with non-CG domain examples so per-minute rate-limit windows refresh between CG calls and each Incorporator pattern lands across multiple real-world verticals. Runnable code under /examples.

  1. ๐ŸŒฑ First Steps + DX Inspector โ€” discovery-first flow: test() profiles a CoinGecko endpoint, then incorp() applies its recommendations.
  2. ๐Ÿ” Data Lake Pivot โ€” SaaS roster โ†’ BI-ready columnar; pivot a /users endpoint into Avro + SQLite.
  3. ๐Ÿ“ฆ Snapshot Warehouse โ€” Universal Formats โ€” fan CoinGecko top-100 snapshots into NDJSON / CSV / SQLite / Parquet, then round-trip every artifact.
  4. ๐Ÿ›ก๏ธ XML Post Audit โ€” federal-VIN fraud audit: XML invoice ledger enriched via one batched POST.
  5. ๐Ÿš€ Parent โ†’ Child Drilling โ€” CoinGecko /coins/markets โ†’ /coins/{id} fan-out โ€” the canonical backtest-data-prep pattern.
  6. ๐Ÿš€ SpaceX Launches โ€” ops-dashboard feed: upcoming launches drilled for rocket + launchpad detail.
  7. ๐Ÿ”„ Stateful Refresh โ€” refresh() three ways against Binance's live ticker.
  8. ๐ŸŒŠ Streaming Daemon โ€” Paginated Bulk Export at O(1) Memory โ€” stream()'s canonical job: chunking-mode drain of a paginated source. Plus a single-source stateful_polling=True compatibility shim with a pointer to T10 for the multi-source live-daemon path.
  9. ๐Ÿ NASCAR Fantasy Fjord โ€” fantasy-sports scoring fjord across Cup, Xfinity, Truck series; previews T10's abstraction.
  10. ๐ŸŒŠ Multi-Source Fjord โ€” fjord() fusing CoinGecko + Binance into a live cross-venue spread metric.
  11. ๐Ÿงต Tideweaver โ€” Multi-Exchange Arb Scanner (capstone) โ€” declarative windowed orchestration: three exchanges โ†’ one best-market record.

๐Ÿ“‘ Reference

  • ๐Ÿ“– Library Reference โ€” every public class, rendered from source docstrings.
  • ๐Ÿ“‘ API Atlas โ€” paste-ready map of every public callable: signature, pseudocode, "when to reach for it", common kwargs, tutorial cross-links.
  • ๐Ÿฉบ Production Debugging โ€” LoggedIncorporator + the api/error routing split + reader API (get_rejects, get_api, get_error, get_current, get_scheduler_events) + retry loop via RejectEntry.
  • ๐Ÿ“ฆ Formats & Compression + ๐ŸŒŠ Streaming & Pagination โ€” every format kwarg, compression rules, and the paginator family for endpoints / files too big for RAM.
  • ๐Ÿณ CLI & Configuration โ€” running pipelines from pipeline.json / watershed.json.
  • โšก Performance โ€” measured throughput per format, memory profile, tuning knobs. Per-chunk validation uses TypeAdapter(list[Cls]).validate_python(rows) and is 1.3-2.0ร— faster than per-row model_validate (tests/benchmarks/test_validate_batch_vs_per_row.py). Trade-off: within a single incorp() call, peak memory is O(N); stream() still keeps RSS flat by releasing each chunk. Outcome records โ€” Wave, Tide, RejectEntry, and slotted dataclass CurrentOutcome (per-current outcomes inside Tide) โ€” carry structured fields (HTTP retry counts, schema-cache hits, source URLs, per-edge identity, status codes, cooldown hints) for keyed audit.

๐Ÿ“Ž Appendices โ€” optional side-quests


๐Ÿค Philosophy & Contributing

See CONTRIBUTING.md for the dev install, quality bar, and architecture conventions. Security disclosures: SECURITY.md. Release notes: CHANGELOG.md.


Have a suggestion or hitting a snag? Edit this page on GitHub ยท Report an issue ยท Browse open issues

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

incorporator-1.3.5.tar.gz (609.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

incorporator-1.3.5-py3-none-any.whl (361.0 kB view details)

Uploaded Python 3

File details

Details for the file incorporator-1.3.5.tar.gz.

File metadata

  • Download URL: incorporator-1.3.5.tar.gz
  • Upload date:
  • Size: 609.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.3

File hashes

Hashes for incorporator-1.3.5.tar.gz
Algorithm Hash digest
SHA256 9a88832c50cd6f399001de1525fb8378a4f1142a4a5c079b41e71cba290f53cc
MD5 96b9137396b1e3c8ffd59c8c0e74053f
BLAKE2b-256 a6f117a07c902dc48f5ed8b34046572787e7fa03de7110505d1a1b3dbc16ed71

See more details on using hashes here.

File details

Details for the file incorporator-1.3.5-py3-none-any.whl.

File metadata

  • Download URL: incorporator-1.3.5-py3-none-any.whl
  • Upload date:
  • Size: 361.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.3

File hashes

Hashes for incorporator-1.3.5-py3-none-any.whl
Algorithm Hash digest
SHA256 76240e58c62ffac18f2b403811c9842a9497486d241edf17756a4e80d2ef312d
MD5 d3b603df5ec33fab6072d6f9b8cc3d04
BLAKE2b-256 9e18c03c39e4d8993aad63209aa74139129b528ddd805169f8d6bbb668740a8b

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page