Schema-free ETL mapper with in-process graph orchestration: turn any REST API, CSV, Parquet, or XML into typed Python objects, then coordinate multi-source pipelines in a bounded time window with Tideweaver.
Project description
๐ Incorporator
Schema-free ingestion for APIs you don't control โ and an orchestrator that tells you how to tune it.
Both halves share the same primitives โ Penstock throttling at the HTTP and edge layers, Wave / Tide / RejectEntry outcome records, FlowControl โ so the parser and the orchestrator compose, but you can adopt either without the other. Local paginators (SQLitePaginator, CSVPaginator, AvroPaginator) accept the same penstock= kwarg as the HTTP layer โ one rate-limit primitive across both.
โจ Highlights
- Works with unpredictable JSON APIs โ digests XML, CSV, NDJSON, SQLite, Parquet, Avro without a line of schema; missing keys and mutating types absorbed without validation errors.
- The pipeline tells you what to tune โ after a Tideweaver run,
architect.tune()consumes the accumulated rejects, tides, and waves and emits aTuningReportof severity-sorted hints. - Disk-backed observability for orchestration โ
LoggedTideweaverroutes everyTideandRejectEntrythrough aQueueHandlerpipeline, replayable withget_tides()/get_rejects().
๐ ๏ธ How it Works: Zero-Schema Ingestion
Imagine this telemetry JSON. The nested "st" dictionary changes structure for every subsystem (pos vs sig vs bat). Standard parsers would crash.
[
{"id":"NAV", "st":{"pos":[12,44], "ok":1}},
{"id":"COM", "st":{"sig":78, "ok":1}},
{"id":"PWR", "st":{"bat":92, "ok":1}},
{"id":"THR", "st":{"lvl":63, "ok":0}}
]
Feed it the unpredictable JSON. Incorporator unifies the changing structures into a single object graph:
import asyncio
from incorporator import Incorporator
class System(Incorporator): pass # Subclass; everything else hangs off it.
async def main():
systems = await System.incorp(inc_file="telemetry.json", inc_code="id")
print(systems.inc_dict["NAV"].st.pos) # [12, 44]
print(systems.inc_dict["PWR"].st.bat) # 92
thr = systems.inc_dict["THR"]
if not thr.st.ok:
print(f"โ ๏ธ THRUST FAILURE! Efficiency dropped to {thr.st.lvl}")
asyncio.run(main())
The format is inferred from the URL or file extension. The syntax never changes for XML, CSV, NDJSON, SQLite, Parquet, Feather, ORC, Avro, or XLSX โ same incorp() / export() surface.
๐ฆ Installation
Built on Pydantic V2 metaprogramming, HTTPX, and Tenacity. No system dependencies. Requires Python 3.10+. CI runs against 3.10 / 3.11 / 3.13 on Ubuntu and Windows.
pip install incorporator # core
pip install incorporator[speedups] # orjson + lxml + cramjam
pip install incorporator[parquet] # pyarrow โ Parquet, Feather, ORC
pip install incorporator[avro] # fastavro
pip install incorporator[xlsx] # openpyxl โ XLSX read/write
pip install incorporator[orchestrate] # typer + prefect โ CLI + Prefect wrappers
pip install incorporator[all] # everything except [parquet]
๐งฐ The Verbs + One Orchestrator
Every method you'll call on an Incorporator subclass, plus the windowed orchestrator.
incorp() โ fetch, parse, build the object graph
class Launch(Incorporator): pass
launches = await Launch.incorp(inc_url="https://ll.thespacedevs.com/2.2.0/launch/upcoming/")
print(launches[0].name)
โ Tutorial 1 โ First Steps
test() โ let the framework write your incorp() kwargs
await Launch.test(inc_url="https://api.unknown.com/v1/users")
# Prints payload tree + suggested inc_code, rec_path, conv_dict.
refresh() โ re-fetch live data into existing instances
await Launch.refresh(instance=launches)
The seed call's network context (params, headers, rec_path, conv_dict, ...) auto-replays.
export() โ serialise to any format
await Launch.export(instance=launches, file_path="launches.parquet") โ JSON, NDJSON, CSV, XML, SQLite, Parquet, Feather, ORC, Avro, XLSX. โ Formats & compression
stream() โ paginated bulk export, O(1) memory
For paginated APIs or local files too big for RAM, stream() fetches one page at a time, exports it, releases the page, and moves on โ peak memory stays at roughly one chunk regardless of dataset size. A Wave per chunk is the built-in observability stream.
from incorporator.io.pagination import PageNumberPaginator
async for wave in Launch.stream(
incorp_params={
"inc_url": "https://api.example.com/launches",
"inc_page": PageNumberPaginator(page_param="page"),
},
refresh_params=None, # chunking: opt out of per-chunk refresh
export_params={"file_path": "launches.ndjson", "if_exists": "append"},
):
if wave.failed_sources: print(wave)
Use an append-friendly format (.ndjson / .csv / .sqlite / .avro) โ Parquet, Feather, ORC, and Excel rebuild the whole file every wave. For multi-source live registries reach for fjord(). Pass adapt_chunk_size=True to resize paginator.chunk_size via AIMD โ bounded by chunk_size_min / chunk_size_max and a target latency window of target_min_sec / target_max_sec.
โ Streaming & pagination ยท Tutorial 8
fjord() โ a multi-source data pipeline
Fans out across N concurrent sources, fuses them through a user-defined outflow(state), exports the combined output.
async for wave in Incorporator.fjord(
stream_params=[
{"cls": Coin, "incorp_params": {"inc_url": "..."}, "refresh_interval": 30},
{"cls": Order, "incorp_params": {"inc_url": "..."}, "refresh_interval": 5},
],
outflow="outflow.py",
export_params={"file_path": "fusion.parquet"},
):
if wave.failed_sources: print(wave)
โ Tutorial 10 โ Multi-Source Fjord
Tideweaver โ orchestrate multiple feeds on independent intervals
When you need several sources at different cadences inside a single time window, with dependency edges gating downstream work until upstreams produce fresh data, build a Watershed and hand it to Tideweaver:
from incorporator import Tideweaver, Watershed, Stream, Fjord
watershed = Watershed.diamond(
window=(start, end),
head=Stream(name="binance", cls=BinanceBook, interval=15, incorp_params={...}),
middle=[Stream(name="coinbase", cls=CoinbaseTicker, interval=30, incorp_params={...}),
Stream(name="kraken", cls=KrakenTicker, interval=30, incorp_params={...})],
tail=Fjord(name="best_market", cls=BestMarket, interval=30,
export_params={"file_path": "arb_signals.ndjson"}),
outflow="arb_outflow.py",
)
async for tide in Tideweaver(watershed).run():
print(tide.tide_number, tide.fired, tide.skipped)
Four shape helpers (parallel, chain, fanout, diamond) plus custom with explicit edges. Declarative watershed.json config + incorporator tideweaver run / validate CLI mirror the stream / fjord workflow. After the run, Tideweaver.summary(tides=tides) returns a TuningReport (see Resilience).
Probe โ plan โ run, no disk round-trip. When you have N unknown endpoints, architect() profiles each one and emits a runnable plan you can tune in-memory before handing it to Tideweaver:
plan = await Coin.architect(
sources={"binance": book_url, "coinbase": cb_url, "kraken": kr_url},
output="plan",
)
plan.currents[0].interval_hint = 10 # tune
watershed = plan.to_watershed(window=(start, end)) # materialise
async for tide in Tideweaver(watershed).run():
...
architect(output=...) also emits "report" (pretty-printed), "python" (paste-ready module), or "json" (paste-ready watershed.json). After a run, architect.tune() consumes the accumulated rejects + tides + waves and emits a TuningReport of structured recommendations โ see Resilience below.
Per-edge flow control. Each edge composes six orthogonal primitives โ Gate (HardLock / SoftPass / Weir), SurgeBarrier, Penstock (Sustained / Burst / Window / Backpressure / Signal), Reservoir, Spillway (DropOldest / RaiseOverflow / ExportToArchive), and a declarative FlowObserver (Null / Logging / Signal) for telemetry. The shape constructors accept a top-level flow= or gate_mode= shorthand; explicit Edge(...) carries its own:
from incorporator.observability.tideweaver import (
FlowControl, Weir, BurstPenstock, Reservoir, LoggingObserver,
)
edge_flow = FlowControl(
gate=Weir(), # fire on freshness
penstock=BurstPenstock(rate_per_sec=2.0, burst=5), # token bucket
reservoir=Reservoir(depth=3), # buffer 3 waves
observer=LoggingObserver(fire_level="info"), # declarative telemetry
)
The same shapes deserialize from watershed.json via Pydantic discriminated unions ({"gate": {"type": "weir"}, ...}) โ see Appendix โ NASCAR Tideweaver for the JSON form on a working diamond. For non-verb tick logic (cron-style cleanups, custom side-effects), subclass CustomCurrent and override async tick(scheduler).
Stream(parent_current=...) declares a parent-child row fan without a CustomCurrent wrapper โ one child row per parent snapshot entry at tick time:
child = Stream(
name="coin_detail",
cls=CoinDetail,
interval=60,
parent_current="markets", # row-fans from markets._tideweaver_snapshot
incorp_params={"inc_code": "id"}, # inc_parent injected per row at tick time
)
โ Tutorial 11 โ Tideweaver ยท Canal toolkit primitives in API Atlas
When to reach for which long-running verb
| Verb | Sources | Shape | Reach for it whenโฆ |
|---|---|---|---|
stream() |
one | paginated chunks, O(1) memory | bulk drain a paginated API or massive local file into a warehouse / archive |
fjord() |
many | stateful in-memory registry, live refresh | keep a hot multi-source object graph synchronised and snapshot it on a cadence |
Tideweaver |
many | windowed graph of streams + fjords with dependency edges | run several feeds at independent intervals inside a single time window, with downstream work gated on fresh upstream data |
display() โ REPL debug print: launches[0].display()
Typed directive wrappers (optional)
excl_lst, name_chg, code_attr, name_attr accept typed frozen wrappers alongside bare shapes. Old call sites keep working; mixed sequences are accepted.
from incorporator.schema.directives import Ex, Nm
await Users.incorp(
inc_url="https://api.example.com/users",
excl_lst=["legacy_flag", Ex("profile.internal.ssn")], # nested drop
name_chg=[("external_id", "id"), Nm("user_name", "name")],
code_attr="id",
)
Ex("a.b.c") drops nested leaves (previously only top-level keys could
be dropped). PK binding now runs after rename, so inc_code resolves
correctly whether name_chg removes the source field or creates it.
๐ From Code to Production โ CLI & Docker
The CLI runs the same engines from declarative config. No Python required.
| Command | What it does |
|---|---|
incorporator init --type {stream,fjord,tideweaver} |
Scaffold a starter pipeline.json or watershed.json (+ outflow.py for fjord). |
incorporator validate <config>.json |
Structural check before you ship โ no network calls. Auto-detects type. |
incorporator stream pipeline.json |
Run a single-source stream pipeline. |
incorporator fjord pipeline.json |
Run a multi-source fjord pipeline. |
incorporator tideweaver run watershed.json |
Run a windowed orchestration graph. |
incorporator deps [--missing] [--category CAT] [--json] |
List installed optional extras and what each one unlocks; --json for CI. |
incorporator init --type stream --output-dir .
incorporator validate pipeline.json && incorporator stream pipeline.json # ...or: docker compose up -d
Secrets stay out of config โ ${API_KEY} for env vars, ${file:/run/secrets/api_key} for Docker / Kubernetes Secrets mounts. Set INCORPORATOR_SECRETS_ROOT=/run/secrets to sandbox ${file:...} against directory-traversal.
โ CLI reference ยท Deployment & secrets
๐ Resilience & Batteries Included
-
GIL-free hyperthreading via the
[speedups]extra. โ Installation -
Invisible decompression for
.gz,.bz2,.lzma,.zip,.tarโ ZIP/TAR paths validated against directory-traversal and a 1 GB bomb cap. โ Formats -
Connection pooling + retries + structured rejects โ HTTP/2-multiplexed
httpx.AsyncClient, Tenacity backoff, andIncorporatorList.rejects: list[RejectEntry]carryingsource/error_kind/message/retry_after/wave_indexfor every failed source. The legacy flatfailed_sources: list[str]is preserved as a derived view. Opt-inblock_internal_redirects=Truerejects 3xx Locations to RFC1918 / loopback / cloud-metadata IPs. -
Friendly rate limiting โ the framework ships with no implicit per-host throttling. Opt in once at startup with
register_host_penstock(one source of truth across everyincorp()call) or passrequests_per_second=Xper call. The samePenstockprimitive serves both the HTTP layer and Tideweaver edges:from incorporator import register_host_penstock from incorporator.io.penstock import SustainedPenstock register_host_penstock("api.coingecko.com", SustainedPenstock(rate_per_sec=0.2)) # ~12 r/min register_host_penstock("pokeapi.co", SustainedPenstock(rate_per_sec=1.5)) # ~90 r/min
architect()surfaces 429 /Retry-Afterhints during probing and recommends aPenstockon the matching edge. Seeregister_host_penstockin the API Atlas. -
Lambda-free
conv_dictโinc,calc,calc_all,pluck,link_to,link_to_list,split_and_getall short-circuit silently on garbage input (None,"","N/A","null","unknown","nan","undefined") before the user callable runs. Defensive null guards inside lambdas are no longer needed; use stdlib callables directly:conv_dict = { "id": inc(int), # type coerce "title": calc(str.lower, "title", default="", target_type=str), # transform "status": calc("Alive".__eq__, "status", default=False), # enum-to-bool }
See
incorporator.io.SourceReffor the opt-in typed source value (URL / file / parent / payload / kwargs) when you need explicit source dispatch. -
Atomic writes + spreadsheet-injection guard โ Parquet / Feather / ORC / JSON / XML / XLSX build via tempfile +
os.replace()(no half-written files); CSV / XLSX cells starting with=/@/+/-are quoted on export (OWASP). -
Non-blocking observability โ subclass
LoggedIncorporator; logs flow through aQueueHandlerso disk I/O never blocks the event loop. For orchestration runs,LoggedTideweaver(fromincorporator.observability.tideweaver) is the parallel drop-in forTideweaverโ routes every yieldedTideand every accumulatedRejectEntryto disk via the sameQueueHandlerpipeline; replay withget_tides()/get_rejects(). -
The pipeline tells you what to tune.
architect.tune()reads accumulated rejects, tides, and pass interval and returns aTuningReportof severity-sorted hints โ per-edgePenstockrates, backlog backoff, surge thresholds.tw.summary(tides=tides)returns the same report from an existing instance.from incorporator.observability.tideweaver import LoggedTideweaver, tune tw = LoggedTideweaver(watershed, enable_logging=True, logger_name="PriceSession") tides = [tide async for tide in tw.run()] report = tune(rejects=tw.rejects, tides=tides, pass_interval=tw.pass_interval) print(report.render()) # hint blocks, sorted by severity
-
Keyed reject audit + backlog short-circuit.
Tideweaver.rejectsreturns alist[RejectEntry]whoseerror_kindis one of"PenstockLimited","SurgeHalted","SkipAhead","GateBlocked"โ every canal-layer skip that never reached a tick body lands here, withfrom_name/to_name/cooldown_secpopulated for keyed analysis. Setbacklog_backoff_factor=2.0on theTideweaverconstructor to extend the next-pass wait when the scheduler is consistently saturated; the default1.0is disabled. -
Optional-dependency introspection โ
list_deps()/install_hint()/Category/DepInfoPython API; CLI surface viaincorporator deps(see CLI table). โdocs/cli_and_configuration.md -
Cross-format round-tripping โ JSON โ Parquet โ SQLite โ Avro โ CSV โ XML. โ Tutorial 3
๐ Tutorials (in order)
The eleven-tutorial curriculum. Each slot introduces one new verb or technique, alternating CoinGecko-heavy steps with non-CG domain examples so per-minute rate-limit windows refresh between CG calls and each Incorporator pattern lands across multiple real-world verticals. Runnable code under /examples.
- ๐ฑ First Steps + DX Inspector โ discovery-first flow:
test()profiles a CoinGecko endpoint, thenincorp()applies its recommendations. - ๐ Data Lake Pivot โ SaaS roster โ BI-ready columnar; pivot a
/usersendpoint into Avro + SQLite. - ๐ฆ Snapshot Warehouse โ Universal Formats โ fan CoinGecko top-100 snapshots into NDJSON / CSV / SQLite / Parquet, then round-trip every artifact.
- ๐ก๏ธ XML Post Audit โ federal-VIN fraud audit: XML invoice ledger enriched via one batched POST.
- ๐ Parent โ Child Drilling โ CoinGecko
/coins/marketsโ/coins/{id}fan-out โ the canonical backtest-data-prep pattern. - ๐ SpaceX Launches โ ops-dashboard feed: upcoming launches drilled for rocket + launchpad detail.
- ๐ Stateful Refresh โ
refresh()three ways against Binance's live ticker. - ๐ Streaming Daemon โ Paginated Bulk Export at O(1) Memory โ
stream()'s canonical job: chunking-mode drain of a paginated source. Plus a single-sourcestateful_polling=Truecompatibility shim with a pointer to T10 for the multi-source live-daemon path. - ๐ NASCAR Fantasy Fjord โ fantasy-sports scoring fjord across Cup, Xfinity, Truck series; previews T10's abstraction.
- ๐ Multi-Source Fjord โ
fjord()fusing CoinGecko + Binance into a live cross-venue spread metric. - ๐งต Tideweaver โ Multi-Exchange Arb Scanner (capstone) โ declarative windowed orchestration: three exchanges โ one best-market record.
๐ Reference
- ๐ Library Reference โ every public class, rendered from source docstrings.
- ๐ API Atlas โ paste-ready map of every public callable: signature, pseudocode, "when to reach for it", common kwargs, tutorial cross-links.
- ๐ฉบ Production Debugging with
get_error()โLoggedIncorporator+ structured error logs + retry loop via RejectEntry. - ๐ฆ Formats & Compression + ๐ Streaming & Pagination โ every format kwarg, compression rules, and the paginator family for endpoints / files too big for RAM.
- ๐ณ CLI & Configuration โ running pipelines from
pipeline.json/watershed.json. - โก Performance โ measured throughput per format, memory profile, tuning knobs. Per-chunk validation uses
TypeAdapter(list[Cls]).validate_python(rows)and is 1.3-2.0ร faster than per-rowmodel_validate(tests/benchmarks/test_validate_batch_vs_per_row.py). Trade-off: within a singleincorp()call, peak memory is O(N);stream()still keeps RSS flat by releasing each chunk. Outcome records โWave,Tide,RejectEntry, and slotted dataclassCurrentOutcome(per-current outcomes insideTide) โ carry structured fields (HTTP retry counts, schema-cache hits, source URLs, per-edge identity, status codes, cooldown hints) for keyed audit.
๐ Appendices โ optional side-quests
- ๐งฌ Pokรฉmon ETL โ paginated HATEOAS drill + array reductions with
calc/sum_attributes. Mirrors T5. - ๐ธ๏ธ Crypto Graph Mapping (static) โ
link_to-based in-memory join; T10's fjord pattern as a one-shot. - ๐ NASCAR Tideweaver โ T11's diamond shape against race telemetry (laps + pits + flags โ driver state).
- ๐งต Tideweaver Deep Dives โ Parquet at window close and Tideweaver vs. Prefect โ columnar artifacts plus the in-process-vs-cloud orchestration decision.
๐ค Philosophy & Contributing
See CONTRIBUTING.md for the dev install, quality bar, and architecture conventions. Security disclosures: SECURITY.md. Release notes: CHANGELOG.md.
Have a suggestion or hitting a snag? Edit this page on GitHub ยท Report an issue ยท Browse open issues
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file incorporator-1.3.1.tar.gz.
File metadata
- Download URL: incorporator-1.3.1.tar.gz
- Upload date:
- Size: 535.0 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
f103ee69e1c5bd4c20bb84a7cc1a2ca6795cd761f5abb15e8316033bab0b12d9
|
|
| MD5 |
706c04de8019c81cbaffb98f0bcb94ef
|
|
| BLAKE2b-256 |
e1560d2615ed6e9e8eabd3a04fa41d65c875401bcbe85467979eb341aac9aa36
|
File details
Details for the file incorporator-1.3.1-py3-none-any.whl.
File metadata
- Download URL: incorporator-1.3.1-py3-none-any.whl
- Upload date:
- Size: 336.5 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
9075c7367fe6b0584bfc3d06afc6fd68f65e60e5ff8b0698af69afbc7041ad47
|
|
| MD5 |
8b993ac93e5744b625627882213c9f10
|
|
| BLAKE2b-256 |
eaaa0136f8c64a5d570bb0a31a9cd827a6bc8e98ccae690cec4aa15f715c0afb
|