Full pipeline orchestrator (Phases 2–5): segmentation → classification → mapping → hydration
Project description
data-hydrator
Full async pipeline that turns raw scraped HTML into Astro Content Collection files. It orchestrates four phases in sequence — segmentation, classification, mapping, and content extraction — and writes structured JSON or Markdown files ready to drop into an Astro project.
Pipeline overview
scrape_output/{slug}.json ← raw HTML + URL from your scraper
│
▼ Phase 2 — page-segmenter
│ Splits raw HTML into a recursive SegmentRecord tree
│ Cached to segmented/{slug}.json
│
▼ Phase 3 — segment-classifier
│ Labels each segment with a ComponentType (product_card, article, …)
│ Cached to classified/{slug}.json
│
▼ Phase 4 — component-mapper
│ Maps each classified segment to a MappedComponent with prop schema
│ Cached to mapped/{slug}.json
│
▼ Phase 5 — data-hydrator (this package)
Three extraction tracks:
Track A (known types) — CSS selectors, zero LLM
Track B (content types) — CSS selectors + markdownify, zero LLM
Track C (unknown) — one LLM call per segment, batched
Writes:
staging/src/content/{collection}/{slug}.json|.md
staging/redirects.json + staging/redirects.csv
staging/hydration_manifest.json
Phases 2–4 cache their output to disk. Re-running the pipeline after adding new pages only processes the new pages through those phases; everything already cached is skipped.
Installation
pip install data-hydrator
Or from source (with sibling packages):
git clone https://github.com/innerkorehq/data-hydrator
cd data-hydrator
pip install -e .
Requires Python 3.12+.
Quick start
1. Put your scraped pages in scrape_output/
Each file is {page_slug}.json:
{
"url": "https://shop.example.com/products/nike-air-max-90",
"html": "<!DOCTYPE html>...",
"scraped_at": "2024-01-15T10:30:00Z",
"title": "Nike Air Max 90"
}
2. Set your API key
export ANTHROPIC_API_KEY=sk-ant-...
3. Run
import asyncio
from data_hydrator import HydrationPipeline, HydrationSettings
async def main():
pipeline = HydrationPipeline(HydrationSettings(site_id="example.com"))
await pipeline.initialize()
result = await pipeline.run()
await pipeline.shutdown()
print(f"Written : {result.manifest.total_written}")
print(f"Manifest: {result.manifest_path}")
asyncio.run(main())
Output lands in ./staging/src/content/ ready for Astro.
Configuration
All settings live in HydrationSettings. Every nested model maps to an HYDRATOR_ environment variable prefix.
from data_hydrator import HydrationSettings
from data_hydrator.config import (
PipelineDirsConfig,
ResumeConfig,
SchemaGenConfig,
URLGenConfig,
R2Config,
)
settings = HydrationSettings(
site_id="example.com",
dirs=PipelineDirsConfig(
scrape_dir="./scrape_output", # Phase 2 input
segmented_dir="./segmented", # Phase 2 output / Phase 3 input
classified_dir="./classified", # Phase 3 output / Phase 4 input
mapped_dir="./mapped", # Phase 4 output / Phase 5 input
staging_dir="./staging", # Phase 5 output
),
resume=ResumeConfig(
skip_segmentation_if_cached=True,
skip_classification_if_cached=True,
skip_mapping_if_cached=True,
),
schema_gen=SchemaGenConfig(
model="anthropic/claude-sonnet-4-5",
unknown_model="anthropic/claude-haiku-4-5",
representatives_per_type=3, # HTML examples sent to LLM per type
unknown_batch_size=20,
),
url_gen=URLGenConfig(
type_base_paths={
"collection.product_card": "/products",
"collection.blog_card": "/blog",
"content.article": "/articles",
},
category_min_occurrence=2, # min pages sharing a URL prefix to form a category
max_slug_length=80,
),
r2=R2Config(
bucket_name="my-bucket",
endpoint_url="https://abc.r2.cloudflarestorage.com",
access_key_id="...",
secret_access_key="...",
public_base_url="https://cdn.example.com",
),
)
LiteLLM config
All AI calls go through a LiteLLM Router configured via litellm_config.yaml in your working directory:
model_list:
- model_name: schema-gen # alias used in HydrationSettings
litellm_params:
model: anthropic/claude-sonnet-4-5
api_key: os.environ/ANTHROPIC_API_KEY
- model_name: unknown-extraction
litellm_params:
model: anthropic/claude-haiku-4-5
api_key: os.environ/ANTHROPIC_API_KEY
litellm_settings:
drop_params: true
num_retries: 3
request_timeout: 60
The model_name aliases map to schema_gen.model / schema_gen.unknown_model in HydrationSettings. Swap litellm_params.model to change provider without touching application code. A copy of the default config ships with the package at litellm_config.yaml.
Point to a custom file:
settings = HydrationSettings(
schema_gen=SchemaGenConfig(litellm_config_path="/path/to/my_llm_config.yaml")
)
Or via env var:
HYDRATOR_SCHEMA_GEN__LITELLM_CONFIG_PATH=/path/to/my_llm_config.yaml
If the file is absent the client falls back to direct litellm.acompletion() calls using the inline timeout_seconds / max_retries settings.
Environment variables
Every nested field can be set via HYDRATOR_ prefixed env vars using double-underscore for nesting:
HYDRATOR_SITE_ID=example.com
HYDRATOR_DIRS__SCRAPE_DIR=./scrape_output
HYDRATOR_R2__BUCKET_NAME=my-bucket
HYDRATOR_R2__ACCESS_KEY_ID=...
HYDRATOR_SCHEMA_GEN__MODEL=schema-gen
Extraction tracks
Track A — Known types (zero LLM after first run)
For any ComponentType that is not UNKNOWN and not a content type.
- On first run: one LLM batch call generates CSS selectors for all known types at once.
- Schemas are cached to
.cache/extraction_schemas/{site_id}/{type}.json. - All subsequent extractions use only BeautifulSoup — no LLM.
Track B — Content types (zero LLM after first run)
For types matching content.*, collection.blog*, collection.news*.
Same schema generation as Track A, but extraction strips noise elements and converts the content container to Markdown via markdownify. Output goes into the body_markdown of a .md file with YAML frontmatter.
Track C — Unknown types (one LLM call per segment)
For ComponentType.UNKNOWN segments. The LLM infers the component type, generates a schema, and extracts the data in one call. The inferred schema is written back to the schema cache — future segments of the same inferred type use Track A automatically.
Resume behaviour
First run (nothing cached):
Phase 2: segments 100 pages → writes segmented/*.json
Phase 3: classifies all → writes classified/*.json
Phase 4: maps all → writes mapped/*.json
Phase 5: hydrates all → writes staging/
Second run (all cached):
Phases 2-4: 100 pages from cache (no LLM, no segmenter called)
Phase 5: always re-runs; staging uses backup-on-overwrite
Add 20 new pages to scrape_output/:
Phase 2: 80 cached + 20 newly segmented
Phase 3: 80 cached + 20 newly classified (classifier batch = 20 new only)
Phase 4: 80 cached + 20 newly mapped
Phase 5: all 100 hydrated
Output structure
staging/
├── src/
│ ├── content/
│ │ ├── products/
│ │ │ ├── nike-air-max-90.json
│ │ │ └── adidas-ultraboost.json
│ │ └── blog/
│ │ └── 10-tips-for-better-sleep.md
│ └── pages/
│ └── _data/
├── redirects.json ← full redirect map with metadata
├── redirects.csv ← Cloudflare Bulk Redirect Rules format
└── hydration_manifest.json ← run stats, per-file details, warnings
Collection entry — JSON format
{
"draft": false,
"title": "Nike Air Max 90",
"generatedUrl": "/products/footwear/nike-air-max-90",
"price": "$120.00",
"image": "https://cdn.example.com/images/products/abc123.jpg"
}
Collection entry — Markdown format
---
draft: false
title: 10 Tips for Better Sleep
generatedUrl: /blog/10-tips-for-better-sleep
publishedAt: '2024-01-10'
author: Dr. Jane Smith
---
Getting a good night's sleep is one of the most important things...
Redirects CSV (Cloudflare-ready)
source_url,target_url,status_code,preserve_query_string
https://shop.example.com/footwear/running/nike-air-max-90,/products/footwear/nike-air-max-90,301,false
Upload redirects.csv directly to Cloudflare → Traffic → Bulk Redirects.
Image processing
When R2 is configured, every image_url field is:
- Downloaded via
aiohttp(bounded bymax_concurrent_downloads) - Uploaded to R2 (bounded by
max_concurrent_uploads) - URL rewritten to
public_base_url/{key_prefix}/{collection}/{hash}{ext}in the output file
A SHA-256-keyed URL cache at .cache/image_url_cache.json deduplicates across runs — images already uploaded are never re-downloaded.
If R2 is not configured (bucket_name is empty), images are skipped and original URLs are kept.
Hydration manifest
staging/hydration_manifest.json records everything about the run:
{
"run_id": "b3f2a1...",
"site_id": "example.com",
"generated_at": "2024-01-15T12:00:00Z",
"total_segments": 247,
"total_written": 231,
"total_draft": 12,
"total_failed": 4,
"track_breakdown": {"known": 180, "content": 55, "unknown": 12},
"collections": {"products": 120, "blog": 55, "misc": 56},
"schemas_generated": 6,
"schema_cache_hits": 0,
"llm_calls_schema_gen": 1,
"llm_calls_unknown": 12,
"image_cache_hit_rate": 0.0,
"files": [...]
}
Inspect it without re-running:
python examples/inspect_manifest.py staging/hydration_manifest.json
Examples
| File | What it shows |
|---|---|
examples/basic_run.py |
Minimal end-to-end run with default settings |
examples/with_r2.py |
R2 image upload enabled |
examples/resume_run.py |
Resume / incremental-update behaviour |
examples/custom_url_paths.py |
Custom collection names and URL structures |
examples/inspect_manifest.py |
Print stats from a saved manifest (no pipeline needed) |
Sample scraped pages are in examples/scrape_output/.
LLM call budget
| Scenario | Schema gen | Unknown extraction |
|---|---|---|
| First run, all known types | 1 batch call | 0 |
| First run, with unknown segments | 1 batch call | 1 call per unknown segment (batched) |
| Subsequent runs, all cached | 0 | 0 (inferred types now Track A) |
| Subsequent runs, new page types | 1 batch call (new types only) | — |
The schema cache means a site with 500 pages of the same 4 component types uses exactly 1 LLM call on the first run and 0 on every run after.
Project structure
data_hydrator/
├── config.py # All settings (Pydantic + env vars)
├── models.py # Domain models; re-exports upstream types
├── pipeline.py # HydrationPipeline orchestrator
├── phases/
│ ├── phase2_segmentation.py
│ ├── phase3_classification.py
│ ├── phase4_mapping.py
│ └── phase5_hydration.py
├── io/
│ ├── page_loader.py # Load raw scrape JSON
│ ├── phase_cache.py # Per-phase disk cache (phases 2–4)
│ └── staging_writer.py # Write + backup Astro files
├── schema/
│ ├── representative_picker.py
│ ├── schema_generator.py # One LLM batch call → ExtractionSchema per type
│ └── schema_cache.py # Persist schemas per site + type
├── extraction/
│ ├── field_extractor.py # CSS selectors → ExtractedField (Track A)
│ ├── content_extractor.py # CSS + markdownify → Markdown (Track B)
│ └── unknown_extractor.py # LLM schema + extraction (Track C)
├── urls/
│ ├── category_inferrer.py # Detect categories from URL patterns
│ ├── url_generator.py # Generate /type/category/slug URLs
│ └── redirect_builder.py # JSON + Cloudflare CSV
├── media/
│ ├── image_processor.py # Download + upload + URL rewrite
│ ├── r2_client.py # aiobotocore async R2
│ └── url_cache.py # SHA-256 dedup cache
├── collection/
│ ├── entry_builder.py # Build CollectionEntry
│ ├── serializer.py # JSON / YAML frontmatter output
│ └── slug_generator.py # title → slug + dedup
└── manifest/
└── manifest_builder.py # Assemble HydrationManifest
Dependencies
| Package | Purpose |
|---|---|
page-segmenter |
Phase 2 — HTML → SegmentRecord tree |
segment-classifier |
Phase 3 — SegmentRecord → ComponentType |
component-mapper |
Phase 4 — ClassifiedSegment → MappedComponent + prop schema |
litellm |
LLM calls (schema generation + unknown extraction) |
beautifulsoup4 + lxml |
CSS selector extraction |
markdownify |
HTML → Markdown (Track B) |
aiofiles |
Async file I/O |
aiohttp |
Async image download |
aiobotocore |
Async R2/S3 upload |
pydantic + pydantic-settings |
Models + env-var config |
pyyaml |
YAML frontmatter serialization |
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file data_hydrator-0.2.1.tar.gz.
File metadata
- Download URL: data_hydrator-0.2.1.tar.gz
- Upload date:
- Size: 207.7 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.7.5
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
20d143f5660f55e710112dd797ac717ce80629edad215d27f63e09b2751fda2a
|
|
| MD5 |
59f61ce7d468390e895936a24352cce5
|
|
| BLAKE2b-256 |
60e33d1fffde891464280902e9c3bc0894d7236519153b57e57bde87a45a951a
|
File details
Details for the file data_hydrator-0.2.1-py3-none-any.whl.
File metadata
- Download URL: data_hydrator-0.2.1-py3-none-any.whl
- Upload date:
- Size: 57.4 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.7.5
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
f57df3ff5c32d416f612b29cbfc7ca8f37fc799cca4d50b28c23a18ac6f913db
|
|
| MD5 |
9fd6c7888d88bf34f3483ed34c6a1e16
|
|
| BLAKE2b-256 |
852b69c107d2be0a4383e50351869b2f574fb0e9fbac21acc6264f5e8f5d7e4b
|