LLM-powered structured data transformation
Project description
Smelt AI
LLM-powered structured data transformation. Feed in rows of data, get back strictly typed Pydantic models — batched, concurrent, and validated.
from smelt import Model, Job
from pydantic import BaseModel
class Classification(BaseModel):
sector: str
sub_sector: str
is_public: bool
model = Model(provider="openai", name="gpt-4.1-mini")
job = Job(
prompt="Classify each company by industry sector and whether it's publicly traded.",
output_model=Classification,
batch_size=20,
concurrency=3,
)
result = job.run(model, data=[
{"name": "Apple", "desc": "Consumer electronics and software"},
{"name": "Stripe", "desc": "Payment processing platform"},
{"name": "Mayo Clinic", "desc": "Nonprofit medical center"},
])
for row in result.data:
print(row) # Classification(sector='Technology', sub_sector='Consumer Electronics', is_public=True)
Install
pip install smelt-ai[openai] # OpenAI models
pip install smelt-ai[anthropic] # Anthropic models
pip install smelt-ai[google] # Google Gemini models
Requires Python 3.10+.
Architecture
Pipeline Overview
list[dict] → Tag rows with row_id → Shuffle? → Split into batches → Concurrent LLM calls → Validate schema + IDs → Reorder by row_id → SmeltResult[T]
How a Job Executes
- Prepare — Tag each row with a
row_id, optionally shuffle, create internal Pydantic model, build system prompt, split into batches - Execute — Send batches concurrently to the LLM (controlled by
concurrencysemaphore) - Validate — For each batch response: validate row IDs, check schema conformance
- Assemble — Sort all rows by
row_id, strip internal fields, aggregate metrics intoSmeltResult[T]
Retry & Backoff
Each batch independently retries on failure. Validation errors (bad schema) and transient API errors (429, 5xx) trigger retries with exponential backoff (1s × 2^attempt + jitter, capped at 60s). Client errors (400, 401, 403) fail immediately without retrying.
Concurrency Model
Smelt uses asyncio.Semaphore for cooperative async concurrency — no threads, no process pools. While one batch awaits an LLM response, others can fire off their requests on the same thread.
concurrency=3, batch_size=5, 15 rows:
Batch 0 (rows 0–4): ████████░░░░
Batch 1 (rows 5–9): ████████████
Batch 2 (rows 10–14): ██████░░░░░░
↑ 3 slots ↑ 1 slot
Row ID Tracking
Smelt injects a row_id field into your model, tells the LLM to echo it back, then validates and strips it. This ensures correct ordering even when batches complete out of order.
Input: Tagged: LLM Output (unordered): Final (reordered):
{name: Apple} → {row_id: 0, name: Apple} {row_id: 1, Fintech} → Classification(Tech)
{name: Stripe} → {row_id: 1, name: Stripe} {row_id: 0, Tech} → Classification(Fintech)
{name: Mayo} → {row_id: 2, name: Mayo} {row_id: 2, Health} → Classification(Health)
Dynamic Model Creation
Under the hood, smelt dynamically extends your Pydantic model to add row_id, then wraps it in a batch container for with_structured_output:
YourModel _SmeltYourModel (extends YourModel) _SmeltBatch
sector: str → row_id: int rows: list[_SmeltYourModel]
sub_sector: str sector: str
is_public: bool sub_sector: str
is_public: bool
Error Handling Modes
stop_on_exhaustion=True (default):
Batch fails → set cancel event → pending batches skip → raise SmeltExhaustionError → e.partial_result has successful batches
stop_on_exhaustion=False:
Batch fails → record BatchError → continue processing → return SmeltResult → result.errors has failures, result.data has successes
API
Model
Wraps a LangChain chat model provider. Uses init_chat_model under the hood, so any LangChain-supported provider works.
model = Model(
provider="openai", # LangChain provider name
name="gpt-4.1-mini", # Model identifier
api_key="sk-...", # Optional — falls back to env var (e.g. OPENAI_API_KEY)
params={"temperature": 0}, # Forwarded to the chat model constructor
)
Job
Defines what transformation to run and how to batch it.
job = Job(
prompt="Your transformation instructions here",
output_model=MyPydanticModel, # Schema for each output row
batch_size=10, # Rows per LLM request (default: 10)
concurrency=3, # Max concurrent requests (default: 3)
max_retries=3, # Retries per failed batch (default: 3)
shuffle=False, # Shuffle rows before batching (default: False)
stop_on_exhaustion=True, # Raise on failure vs collect errors (default: True)
)
Run synchronously or asynchronously:
# Sync — use in scripts
result = job.run(model, data=rows)
# Async — use in notebooks, async apps
result = await job.arun(model, data=rows)
Note:
job.run()cannot be called from within an async event loop (e.g. Jupyter). Useawait job.arun()in those contexts.
Test with a single row before committing to a full run:
# Quick validation — runs only the first row, ignores shuffle/batch_size/concurrency
result = job.test(model, data=rows)
print(result.data[0]) # see one sample output
# Async version
result = await job.atest(model, data=rows)
SmeltResult[T]
result.data # list[T] — transformed rows in original order
result.errors # list[BatchError] — failed batches
result.metrics # SmeltMetrics — tokens, timing, retries
result.success # bool — True if no errors
SmeltMetrics
result.metrics.total_rows # Total input rows
result.metrics.successful_rows # Rows with valid output
result.metrics.failed_rows # Rows in failed batches
result.metrics.total_retries # Cumulative retries across all batches
result.metrics.input_tokens # Total input tokens consumed
result.metrics.output_tokens # Total output tokens consumed
result.metrics.wall_time_seconds # Wall-clock duration
Error Handling
All exceptions inherit from SmeltError.
| Exception | When |
|---|---|
SmeltConfigError |
Invalid config (bad provider, empty prompt, etc.) |
SmeltValidationError |
LLM output fails schema validation |
SmeltAPIError |
Non-retriable API error (401, 403) |
SmeltExhaustionError |
Batch exhausted all retries (stop_on_exhaustion=True) |
SmeltExhaustionError carries a partial_result with any successfully processed batches:
from smelt.errors import SmeltExhaustionError
try:
result = job.run(model, data=rows)
except SmeltExhaustionError as e:
print(f"Partial: {len(e.partial_result.data)} rows succeeded")
print(f"Failed: {len(e.partial_result.errors)} batches")
Set stop_on_exhaustion=False to collect errors without raising:
job = Job(prompt="...", output_model=MyModel, stop_on_exhaustion=False)
result = job.run(model, data=rows)
if not result.success:
for err in result.errors:
print(f"Batch {err.batch_index} failed: {err.message}")
Supported Providers
Any provider supported by LangChain's init_chat_model. Tested with:
| Provider | provider value |
Example models |
|---|---|---|
| OpenAI | "openai" |
gpt-5.2, gpt-4.1-mini, gpt-4.1, gpt-4o, o4-mini |
| Anthropic | "anthropic" |
claude-sonnet-4-6, claude-opus-4-6, claude-haiku-4-5-20251001 |
| Google Gemini | "google_genai" |
gemini-3-flash-preview, gemini-3-pro-preview, gemini-2.5-flash |
Project Structure
src/smelt/
├── __init__.py # Public API exports
├── model.py # Model — LLM provider config
├── job.py # Job — transformation definition + run/arun
├── batch.py # Async batch engine, retry, concurrency
├── prompt.py # System/human message construction
├── validation.py # Dynamic Pydantic model creation, row ID validation
├── types.py # SmeltResult, SmeltMetrics, BatchError
└── errors.py # Exception hierarchy
Development
git clone https://github.com/Cydra-Tech/smelt-ai.git
cd smelt
uv sync --all-extras
# Unit tests (mocked, no API keys needed)
uv run pytest tests/ --ignore=tests/test_live.py
# Live API tests (requires .env with API keys)
uv run pytest tests/test_live.py -v
# Lint
uv run ruff check src/ tests/
Links
License
MIT
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file smelt_ai-0.1.3.tar.gz.
File metadata
- Download URL: smelt_ai-0.1.3.tar.gz
- Upload date:
- Size: 235.1 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.9.5
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
8a8b427b4e051ad74b7e07e5ecda3b92fb3dac8e62d7823022df3be9ec5291bf
|
|
| MD5 |
362f6731358142ed201f56f8c2f8686d
|
|
| BLAKE2b-256 |
dfd3e65cab29c817980c7f5cc123f5c800e48da595f06bf2a9a2c45d43b805c6
|
File details
Details for the file smelt_ai-0.1.3-py3-none-any.whl.
File metadata
- Download URL: smelt_ai-0.1.3-py3-none-any.whl
- Upload date:
- Size: 17.9 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.9.5
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
7040726afc9b611f03913a68d33924c6c389fd9b8b5f5fc49acb51984b793fda
|
|
| MD5 |
f143e25c3b9224a1274738b875b5750b
|
|
| BLAKE2b-256 |
5a519df240294d44a92d24e9f0c2c0086e03169ac610c83581f18dca21d0d7c4
|