Skip to main content

Full pipeline orchestrator (Phases 2–5): segmentation → classification → mapping → hydration

Project description

data-hydrator

Full async pipeline that turns raw scraped HTML into Astro Content Collection files. It orchestrates four phases in sequence — segmentation, classification, mapping, and content extraction — and writes structured JSON or Markdown files ready to drop into an Astro project.


Pipeline overview

scrape_output/{slug}.json          ← raw HTML + URL from your scraper
        │
        ▼  Phase 2 — page-segmenter
        │  Splits raw HTML into a recursive SegmentRecord tree
        │  Cached to segmented/{slug}.json
        │
        ▼  Phase 3 — segment-classifier
        │  Labels each segment with a ComponentType (product_card, article, …)
        │  Cached to classified/{slug}.json
        │
        ▼  Phase 4 — component-mapper
        │  Maps each classified segment to a MappedComponent with prop schema
        │  Cached to mapped/{slug}.json
        │
        ▼  Phase 5 — data-hydrator (this package)
           Three extraction tracks:
             Track A (known types)   — CSS selectors, zero LLM
             Track B (content types) — CSS selectors + markdownify, zero LLM
             Track C (unknown)       — one LLM call per segment, batched
           Writes:
             staging/src/content/{collection}/{slug}.json|.md
             staging/redirects.json + staging/redirects.csv
             staging/hydration_manifest.json

Phases 2–4 cache their output to disk. Re-running the pipeline after adding new pages only processes the new pages through those phases; everything already cached is skipped.


Installation

pip install data-hydrator

Or from source (with sibling packages):

git clone https://github.com/innerkorehq/data-hydrator
cd data-hydrator
pip install -e .

Requires Python 3.12+.


Quick start

1. Put your scraped pages in scrape_output/

Each file is {page_slug}.json:

{
  "url": "https://shop.example.com/products/nike-air-max-90",
  "html": "<!DOCTYPE html>...",
  "scraped_at": "2024-01-15T10:30:00Z",
  "title": "Nike Air Max 90"
}

2. Set your API key

export ANTHROPIC_API_KEY=sk-ant-...

3. Run

import asyncio
from data_hydrator import HydrationPipeline, HydrationSettings

async def main():
    pipeline = HydrationPipeline(HydrationSettings(site_id="example.com"))
    await pipeline.initialize()
    result = await pipeline.run()
    await pipeline.shutdown()

    print(f"Written : {result.manifest.total_written}")
    print(f"Manifest: {result.manifest_path}")

asyncio.run(main())

Output lands in ./staging/src/content/ ready for Astro.


Configuration

All settings live in HydrationSettings. Every nested model maps to an HYDRATOR_ environment variable prefix.

from data_hydrator import HydrationSettings
from data_hydrator.config import (
    PipelineDirsConfig,
    ResumeConfig,
    SchemaGenConfig,
    URLGenConfig,
    R2Config,
)

settings = HydrationSettings(
    site_id="example.com",

    dirs=PipelineDirsConfig(
        scrape_dir="./scrape_output",   # Phase 2 input
        segmented_dir="./segmented",    # Phase 2 output / Phase 3 input
        classified_dir="./classified",  # Phase 3 output / Phase 4 input
        mapped_dir="./mapped",          # Phase 4 output / Phase 5 input
        staging_dir="./staging",        # Phase 5 output
    ),

    resume=ResumeConfig(
        skip_segmentation_if_cached=True,
        skip_classification_if_cached=True,
        skip_mapping_if_cached=True,
    ),

    schema_gen=SchemaGenConfig(
        model="anthropic/claude-sonnet-4-5",
        unknown_model="anthropic/claude-haiku-4-5",
        representatives_per_type=3,     # HTML examples sent to LLM per type
        unknown_batch_size=20,
    ),

    url_gen=URLGenConfig(
        type_base_paths={
            "collection.product_card": "/products",
            "collection.blog_card":    "/blog",
            "content.article":         "/articles",
        },
        category_min_occurrence=2,      # min pages sharing a URL prefix to form a category
        max_slug_length=80,
    ),

    r2=R2Config(
        bucket_name="my-bucket",
        endpoint_url="https://abc.r2.cloudflarestorage.com",
        access_key_id="...",
        secret_access_key="...",
        public_base_url="https://cdn.example.com",
    ),
)

Environment variables

Every nested field can be set via HYDRATOR_ prefixed env vars using double-underscore for nesting:

HYDRATOR_SITE_ID=example.com
HYDRATOR_DIRS__SCRAPE_DIR=./scrape_output
HYDRATOR_R2__BUCKET_NAME=my-bucket
HYDRATOR_R2__ACCESS_KEY_ID=...
HYDRATOR_SCHEMA_GEN__MODEL=anthropic/claude-opus-4-7

Extraction tracks

Track A — Known types (zero LLM after first run)

For any ComponentType that is not UNKNOWN and not a content type.

  • On first run: one LLM batch call generates CSS selectors for all known types at once.
  • Schemas are cached to .cache/extraction_schemas/{site_id}/{type}.json.
  • All subsequent extractions use only BeautifulSoup — no LLM.

Track B — Content types (zero LLM after first run)

For types matching content.*, collection.blog*, collection.news*.

Same schema generation as Track A, but extraction strips noise elements and converts the content container to Markdown via markdownify. Output goes into the body_markdown of a .md file with YAML frontmatter.

Track C — Unknown types (one LLM call per segment)

For ComponentType.UNKNOWN segments. The LLM infers the component type, generates a schema, and extracts the data in one call. The inferred schema is written back to the schema cache — future segments of the same inferred type use Track A automatically.


Resume behaviour

First run (nothing cached):
  Phase 2: segments 100 pages → writes segmented/*.json
  Phase 3: classifies all     → writes classified/*.json
  Phase 4: maps all           → writes mapped/*.json
  Phase 5: hydrates all       → writes staging/

Second run (all cached):
  Phases 2-4: 100 pages from cache (no LLM, no segmenter called)
  Phase 5:    always re-runs; staging uses backup-on-overwrite

Add 20 new pages to scrape_output/:
  Phase 2: 80 cached + 20 newly segmented
  Phase 3: 80 cached + 20 newly classified (classifier batch = 20 new only)
  Phase 4: 80 cached + 20 newly mapped
  Phase 5: all 100 hydrated

Output structure

staging/
├── src/
│   ├── content/
│   │   ├── products/
│   │   │   ├── nike-air-max-90.json
│   │   │   └── adidas-ultraboost.json
│   │   └── blog/
│   │       └── 10-tips-for-better-sleep.md
│   └── pages/
│       └── _data/
├── redirects.json          ← full redirect map with metadata
├── redirects.csv           ← Cloudflare Bulk Redirect Rules format
└── hydration_manifest.json ← run stats, per-file details, warnings

Collection entry — JSON format

{
  "draft": false,
  "title": "Nike Air Max 90",
  "generatedUrl": "/products/footwear/nike-air-max-90",
  "price": "$120.00",
  "image": "https://cdn.example.com/images/products/abc123.jpg"
}

Collection entry — Markdown format

---
draft: false
title: 10 Tips for Better Sleep
generatedUrl: /blog/10-tips-for-better-sleep
publishedAt: '2024-01-10'
author: Dr. Jane Smith
---

Getting a good night's sleep is one of the most important things...

Redirects CSV (Cloudflare-ready)

source_url,target_url,status_code,preserve_query_string
https://shop.example.com/footwear/running/nike-air-max-90,/products/footwear/nike-air-max-90,301,false

Upload redirects.csv directly to Cloudflare → Traffic → Bulk Redirects.


Image processing

When R2 is configured, every image_url field is:

  1. Downloaded via aiohttp (bounded by max_concurrent_downloads)
  2. Uploaded to R2 (bounded by max_concurrent_uploads)
  3. URL rewritten to public_base_url/{key_prefix}/{collection}/{hash}{ext} in the output file

A SHA-256-keyed URL cache at .cache/image_url_cache.json deduplicates across runs — images already uploaded are never re-downloaded.

If R2 is not configured (bucket_name is empty), images are skipped and original URLs are kept.


Hydration manifest

staging/hydration_manifest.json records everything about the run:

{
  "run_id": "b3f2a1...",
  "site_id": "example.com",
  "generated_at": "2024-01-15T12:00:00Z",
  "total_segments": 247,
  "total_written": 231,
  "total_draft": 12,
  "total_failed": 4,
  "track_breakdown": {"known": 180, "content": 55, "unknown": 12},
  "collections": {"products": 120, "blog": 55, "misc": 56},
  "schemas_generated": 6,
  "schema_cache_hits": 0,
  "llm_calls_schema_gen": 1,
  "llm_calls_unknown": 12,
  "image_cache_hit_rate": 0.0,
  "files": [...]
}

Inspect it without re-running:

python examples/inspect_manifest.py staging/hydration_manifest.json

Examples

File What it shows
examples/basic_run.py Minimal end-to-end run with default settings
examples/with_r2.py R2 image upload enabled
examples/resume_run.py Resume / incremental-update behaviour
examples/custom_url_paths.py Custom collection names and URL structures
examples/inspect_manifest.py Print stats from a saved manifest (no pipeline needed)

Sample scraped pages are in examples/scrape_output/.


LLM call budget

Scenario Schema gen Unknown extraction
First run, all known types 1 batch call 0
First run, with unknown segments 1 batch call 1 call per unknown segment (batched)
Subsequent runs, all cached 0 0 (inferred types now Track A)
Subsequent runs, new page types 1 batch call (new types only)

The schema cache means a site with 500 pages of the same 4 component types uses exactly 1 LLM call on the first run and 0 on every run after.


Project structure

data_hydrator/
├── config.py                  # All settings (Pydantic + env vars)
├── models.py                  # Domain models; re-exports upstream types
├── pipeline.py                # HydrationPipeline orchestrator
├── phases/
│   ├── phase2_segmentation.py
│   ├── phase3_classification.py
│   ├── phase4_mapping.py
│   └── phase5_hydration.py
├── io/
│   ├── page_loader.py         # Load raw scrape JSON
│   ├── phase_cache.py         # Per-phase disk cache (phases 2–4)
│   └── staging_writer.py      # Write + backup Astro files
├── schema/
│   ├── representative_picker.py
│   ├── schema_generator.py    # One LLM batch call → ExtractionSchema per type
│   └── schema_cache.py        # Persist schemas per site + type
├── extraction/
│   ├── field_extractor.py     # CSS selectors → ExtractedField (Track A)
│   ├── content_extractor.py   # CSS + markdownify → Markdown (Track B)
│   └── unknown_extractor.py   # LLM schema + extraction (Track C)
├── urls/
│   ├── category_inferrer.py   # Detect categories from URL patterns
│   ├── url_generator.py       # Generate /type/category/slug URLs
│   └── redirect_builder.py    # JSON + Cloudflare CSV
├── media/
│   ├── image_processor.py     # Download + upload + URL rewrite
│   ├── r2_client.py           # aiobotocore async R2
│   └── url_cache.py           # SHA-256 dedup cache
├── collection/
│   ├── entry_builder.py       # Build CollectionEntry
│   ├── serializer.py          # JSON / YAML frontmatter output
│   └── slug_generator.py      # title → slug + dedup
└── manifest/
    └── manifest_builder.py    # Assemble HydrationManifest

Dependencies

Package Purpose
page-segmenter Phase 2 — HTML → SegmentRecord tree
segment-classifier Phase 3 — SegmentRecord → ComponentType
component-mapper Phase 4 — ClassifiedSegment → MappedComponent + prop schema
litellm LLM calls (schema generation + unknown extraction)
beautifulsoup4 + lxml CSS selector extraction
markdownify HTML → Markdown (Track B)
aiofiles Async file I/O
aiohttp Async image download
aiobotocore Async R2/S3 upload
pydantic + pydantic-settings Models + env-var config
pyyaml YAML frontmatter serialization

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

data_hydrator-0.2.0.tar.gz (206.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

data_hydrator-0.2.0-py3-none-any.whl (55.8 kB view details)

Uploaded Python 3

File details

Details for the file data_hydrator-0.2.0.tar.gz.

File metadata

  • Download URL: data_hydrator-0.2.0.tar.gz
  • Upload date:
  • Size: 206.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.7.5

File hashes

Hashes for data_hydrator-0.2.0.tar.gz
Algorithm Hash digest
SHA256 720221b15e2bd8b04feab5d2a1572336743523a4047d42e851383eaad528d171
MD5 a455de743e7d8aeae27be40b968381ae
BLAKE2b-256 d6b285d17447b9ab840c3b0f1eb17b9c90888ad1a294f50270629583cc47f2c3

See more details on using hashes here.

File details

Details for the file data_hydrator-0.2.0-py3-none-any.whl.

File metadata

File hashes

Hashes for data_hydrator-0.2.0-py3-none-any.whl
Algorithm Hash digest
SHA256 b17bd817b15a2cda60f087405a5e7d6355c6b443c75fb530eac471e87c617137
MD5 c354715c439b78987d44e48273aef247
BLAKE2b-256 4baccfc01e6f17b04e1a527d82359b82fc4c7e73170896b9d7bd159b8ea4299f

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page