Skip to main content

LLM-powered structured data transformation

Project description

Smelt AI

LLM-powered structured data transformation. Feed in rows of data, get back strictly typed Pydantic models — batched, concurrent, and validated.

from smelt import Model, Job
from pydantic import BaseModel

class Classification(BaseModel):
    sector: str
    sub_sector: str
    is_public: bool

model = Model(provider="openai", name="gpt-4.1-mini")
job = Job(
    prompt="Classify each company by industry sector and whether it's publicly traded.",
    output_model=Classification,
    batch_size=20,
    concurrency=3,
)

result = job.run(model, data=[
    {"name": "Apple", "desc": "Consumer electronics and software"},
    {"name": "Stripe", "desc": "Payment processing platform"},
    {"name": "Mayo Clinic", "desc": "Nonprofit medical center"},
])

for row in result.data:
    print(row)  # Classification(sector='Technology', sub_sector='Consumer Electronics', is_public=True)

Install

pip install smelt-ai[openai]      # OpenAI models
pip install smelt-ai[anthropic]   # Anthropic models
pip install smelt-ai[google]      # Google Gemini models

Requires Python 3.10+.


Architecture

Pipeline Overview

flowchart LR
    A["list[dict]"] --> B["Tag rows\nwith row_id"]
    B --> S{"shuffle?"}
    S -->|Yes| Sh["Shuffle\ntagged rows"]
    S -->|No| C
    Sh --> C["Split into\nbatches"]
    C --> D["Concurrent\nLLM calls"]
    D --> E["Validate\nschema + IDs"]
    E --> F["Reorder by\nrow_id"]
    F --> G["SmeltResult[T]"]

    style A fill:#f9f,stroke:#333
    style G fill:#9f9,stroke:#333

How a Job Executes

sequenceDiagram
    participant User
    participant Job
    participant BatchEngine
    participant LLM

    User->>Job: job.run(model, data)
    Job->>BatchEngine: execute_batches()

    Note over BatchEngine: Tag rows with row_id<br/>Shuffle if enabled<br/>Create internal Pydantic model<br/>Build system prompt<br/>Split into batches

    par Batch 0
        BatchEngine->>LLM: [system_msg, human_msg]
        LLM-->>BatchEngine: structured response
    and Batch 1
        BatchEngine->>LLM: [system_msg, human_msg]
        LLM-->>BatchEngine: structured response
    and Batch N
        BatchEngine->>LLM: [system_msg, human_msg]
        LLM-->>BatchEngine: structured response
    end

    Note over BatchEngine: Validate row IDs per batch<br/>Sort by row_id<br/>Strip row_id field<br/>Aggregate metrics

    BatchEngine-->>Job: SmeltResult[T]
    Job-->>User: result

Retry & Backoff Flow

Each batch independently retries on failure. Validation errors (bad schema) and transient API errors (429, 5xx) trigger retries. Client errors (400, 401, 403) fail immediately.

flowchart TD
    Start([Send batch to LLM]) --> Response{Response OK?}

    Response -->|Parsed + valid| Success([Return rows])

    Response -->|Parse/validation error| Retriable1{Retries left?}
    Retriable1 -->|Yes| Backoff1["Backoff: 1s × 2^attempt + jitter"]
    Backoff1 --> Start
    Retriable1 -->|No| Fail([BatchError])

    Response -->|API error| Check{Retriable?}
    Check -->|"429, 5xx, timeout"| Retriable2{Retries left?}
    Retriable2 -->|Yes| Backoff2["Backoff: 1s × 2^attempt + jitter"]
    Backoff2 --> Start
    Retriable2 -->|No| Fail

    Check -->|"400, 401, 403"| Fail

    style Success fill:#9f9,stroke:#333
    style Fail fill:#f99,stroke:#333

Concurrency Model

Smelt uses asyncio.Semaphore for cooperative async concurrency — no threads, no process pools. While one batch awaits an LLM response, others can fire off their requests on the same thread.

gantt
    title concurrency=3, batch_size=5, 15 rows
    dateFormat X
    axisFormat %s

    section Batch 0
    LLM call (rows 0-4)   :active, b0, 0, 3

    section Batch 1
    LLM call (rows 5-9)   :active, b1, 0, 4

    section Batch 2
    LLM call (rows 10-14) :active, b2, 0, 2

    section Semaphore
    3 slots occupied       :crit, s0, 0, 2
    2 slots occupied       :s1, 2, 3
    1 slot occupied        :s2, 3, 4

Row ID Tracking

Smelt injects a row_id field into your model, tells the LLM to echo it back, then validates and strips it. This ensures correct ordering even when batches complete out of order.

flowchart LR
    subgraph Input
        direction TB
        R0["row 0: {name: Apple}"]
        R1["row 1: {name: Stripe}"]
        R2["row 2: {name: Mayo}"]
    end

    subgraph Tagged
        direction TB
        T0["{row_id: 0, name: Apple}"]
        T1["{row_id: 1, name: Stripe}"]
        T2["{row_id: 2, name: Mayo}"]
    end

    subgraph "LLM Output (may be unordered)"
        direction TB
        L1["{row_id: 1, sector: Fintech}"]
        L0["{row_id: 0, sector: Tech}"]
        L2["{row_id: 2, sector: Health}"]
    end

    subgraph "Final (reordered)"
        direction TB
        F0["Classification(sector=Tech)"]
        F1["Classification(sector=Fintech)"]
        F2["Classification(sector=Health)"]
    end

    Input --> Tagged --> L1
    L1 ~~~ L0
    L0 ~~~ L2
    L2 --> F0

    style F0 fill:#9f9,stroke:#333
    style F1 fill:#9f9,stroke:#333
    style F2 fill:#9f9,stroke:#333

Dynamic Model Creation

Under the hood, smelt dynamically extends your Pydantic model to add row_id, then wraps it in a batch container for with_structured_output.

classDiagram
    class YourModel {
        +str sector
        +str sub_sector
        +bool is_public
    }

    class _SmeltYourModel {
        +int row_id
        +str sector
        +str sub_sector
        +bool is_public
    }

    class _SmeltBatch {
        +list~_SmeltYourModel~ rows
    }

    YourModel <|-- _SmeltYourModel : extends via create_model()
    _SmeltYourModel --* _SmeltBatch : rows

    note for _SmeltYourModel "Injected row_id for tracking.\nStripped before returning to user."
    note for _SmeltBatch "Wrapper required by LangChain's\nwith_structured_output()."

Error Handling Modes

flowchart TD
    subgraph "stop_on_exhaustion = True (default)"
        A1[Batch fails] --> A2[Set cancel event]
        A2 --> A3[Pending batches skip]
        A3 --> A4[Raise SmeltExhaustionError]
        A4 --> A5["e.partial_result has\nsuccessful batches"]
    end

    subgraph "stop_on_exhaustion = False"
        B1[Batch fails] --> B2[Record BatchError]
        B2 --> B3[Continue processing]
        B3 --> B4[Return SmeltResult]
        B4 --> B5["result.errors has failures\nresult.data has successes"]
    end

    style A4 fill:#f99,stroke:#333
    style B4 fill:#ff9,stroke:#333

API

Model

Wraps a LangChain chat model provider. Uses init_chat_model under the hood, so any LangChain-supported provider works.

model = Model(
    provider="openai",          # LangChain provider name
    name="gpt-4.1-mini",       # Model identifier
    api_key="sk-...",           # Optional — falls back to env var (e.g. OPENAI_API_KEY)
    params={"temperature": 0},  # Forwarded to the chat model constructor
)

Job

Defines what transformation to run and how to batch it.

job = Job(
    prompt="Your transformation instructions here",
    output_model=MyPydanticModel,  # Schema for each output row
    batch_size=10,                 # Rows per LLM request (default: 10)
    concurrency=3,                 # Max concurrent requests (default: 3)
    max_retries=3,                 # Retries per failed batch (default: 3)
    shuffle=False,                 # Shuffle rows before batching (default: False)
    stop_on_exhaustion=True,       # Raise on failure vs collect errors (default: True)
)

Run synchronously or asynchronously:

# Sync — use in scripts
result = job.run(model, data=rows)

# Async — use in notebooks, async apps
result = await job.arun(model, data=rows)

Note: job.run() cannot be called from within an async event loop (e.g. Jupyter). Use await job.arun() in those contexts.

Test with a single row before committing to a full run:

# Quick validation — runs only the first row, ignores shuffle/batch_size/concurrency
result = job.test(model, data=rows)
print(result.data[0])  # see one sample output

# Async version
result = await job.atest(model, data=rows)

SmeltResult[T]

result.data       # list[T] — transformed rows in original order
result.errors     # list[BatchError] — failed batches
result.metrics    # SmeltMetrics — tokens, timing, retries
result.success    # bool — True if no errors

SmeltMetrics

result.metrics.total_rows         # Total input rows
result.metrics.successful_rows    # Rows with valid output
result.metrics.failed_rows        # Rows in failed batches
result.metrics.total_retries      # Cumulative retries across all batches
result.metrics.input_tokens       # Total input tokens consumed
result.metrics.output_tokens      # Total output tokens consumed
result.metrics.wall_time_seconds  # Wall-clock duration

Error Handling

All exceptions inherit from SmeltError.

Exception When
SmeltConfigError Invalid config (bad provider, empty prompt, etc.)
SmeltValidationError LLM output fails schema validation
SmeltAPIError Non-retriable API error (401, 403)
SmeltExhaustionError Batch exhausted all retries (stop_on_exhaustion=True)

SmeltExhaustionError carries a partial_result with any successfully processed batches:

from smelt.errors import SmeltExhaustionError

try:
    result = job.run(model, data=rows)
except SmeltExhaustionError as e:
    print(f"Partial: {len(e.partial_result.data)} rows succeeded")
    print(f"Failed: {len(e.partial_result.errors)} batches")

Set stop_on_exhaustion=False to collect errors without raising:

job = Job(prompt="...", output_model=MyModel, stop_on_exhaustion=False)
result = job.run(model, data=rows)

if not result.success:
    for err in result.errors:
        print(f"Batch {err.batch_index} failed: {err.message}")

Supported Providers

Any provider supported by LangChain's init_chat_model. Tested with:

Provider provider value Example models
OpenAI "openai" gpt-5.2, gpt-4.1-mini, gpt-4.1, gpt-4o, o4-mini
Anthropic "anthropic" claude-sonnet-4-6, claude-opus-4-6, claude-haiku-4-5-20251001
Google Gemini "google_genai" gemini-3-flash-preview, gemini-3-pro-preview, gemini-2.5-flash

Project Structure

src/smelt/
├── __init__.py        # Public API exports
├── model.py           # Model — LLM provider config
├── job.py             # Job — transformation definition + run/arun
├── batch.py           # Async batch engine, retry, concurrency
├── prompt.py          # System/human message construction
├── validation.py      # Dynamic Pydantic model creation, row ID validation
├── types.py           # SmeltResult, SmeltMetrics, BatchError
└── errors.py          # Exception hierarchy

Development

git clone https://github.com/Cydra-Tech/smelt-ai.git
cd smelt
uv sync --all-extras

# Unit tests (mocked, no API keys needed)
uv run pytest tests/ --ignore=tests/test_live.py

# Live API tests (requires .env with API keys)
uv run pytest tests/test_live.py -v

# Lint
uv run ruff check src/ tests/

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

smelt_ai-0.1.1.tar.gz (178.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

smelt_ai-0.1.1-py3-none-any.whl (18.5 kB view details)

Uploaded Python 3

File details

Details for the file smelt_ai-0.1.1.tar.gz.

File metadata

  • Download URL: smelt_ai-0.1.1.tar.gz
  • Upload date:
  • Size: 178.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.9.5

File hashes

Hashes for smelt_ai-0.1.1.tar.gz
Algorithm Hash digest
SHA256 e18cb42b4719d99b317f13b8c3e985abd515a660161d1fedf38fb58147dee862
MD5 3633a0ec7a3db2e2c17dba1cf46220d6
BLAKE2b-256 5ef17e376b0fa9dc23cb75657aef3fd2c2e4a7a4ae1b269ab63cb9469b69356a

See more details on using hashes here.

File details

Details for the file smelt_ai-0.1.1-py3-none-any.whl.

File metadata

  • Download URL: smelt_ai-0.1.1-py3-none-any.whl
  • Upload date:
  • Size: 18.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.9.5

File hashes

Hashes for smelt_ai-0.1.1-py3-none-any.whl
Algorithm Hash digest
SHA256 0edcb4b7668f3e970359617bda7860d11bd6798954c8b16c58733a95935f7e5b
MD5 b2befc191a1ffc4317e5785f4e0840a8
BLAKE2b-256 3aef1d3dcc277d219ae5561d8e4686080360e966847337dd326c891f38f3e021

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page