Concurrent and batch LLM calls on a pandas DataFrame with Pydantic-typed outputs.
Project description
GenAI Information Extraction for DataFrames
A general-purpose library for prompt-based information extraction over DataFrames, with concurrent and batch execution against OpenAI, Anthropic, and models on OpenRouter.
If you find this library useful in your research, please cite:
Li, Mai, Shen, Yang, Zhang (2026), "Dissecting Corporate Culture Using Generative AI," Review of Financial Studies 39(1):253–296, doi.org/10.1093/rfs/hhaf081.
Install
pip install lmsyz_genai_ie_rfs
Set at least one provider key (or drop them in a .env file next to your notebook):
export OPENAI_API_KEY=sk-...
export ANTHROPIC_API_KEY=sk-ant-...
Quickstart: entity + relation extraction
Three short news sentences in:
| id | text |
|---|---|
| 1 | Apple CEO Tim Cook announced the iPhone 17 at WWDC in June 2025. |
| 2 | Tesla acquired SolarCity in 2016 for $2.6 billion to enter the solar market. |
| 3 | Pfizer's decision to spin off its consumer health unit was driven by activist pressure from Trian Partners. |
Structured DataFrame out:
| input_id | entities | causal_triples | sentiment |
|---|---|---|---|
| 1 | [{Tim Cook, PERSON}, {Apple, ORG}, {iPhone 17, PRODUCT}, {WWDC, EVENT}, {June 2025, DATE}] |
[] |
neutral |
| 2 | [{Tesla, ORG}, {SolarCity, ORG}, {2016, DATE}, {$2.6 billion, MONEY}] |
[[acquisition, enabled, market entry]] |
positive |
| 3 | [{Pfizer, ORG}, {Trian Partners, ORG}] |
[[activist pressure, drove, spin-off]] |
neutral |
One function call:
import pandas as pd
from lmsyz_genai_ie_rfs import extract_df
df = pd.DataFrame({
"id": [1, 2, 3],
"text": [
"Apple CEO Tim Cook announced the iPhone 17 at WWDC in June 2025.",
"Tesla’s $2.6 billion acquisition of SolarCity in 2016 enabled its entry into the solar market.",
"Pfizer's decision to spin off its consumer health unit was driven by activist pressure from Trian Partners.",
],
})
prompt = """
You are an information-extraction assistant. For each input row, analyze the text and extract structured information.
Step-by-step instructions:
1. input_id: Copy the input_id from the row verbatim.
2. entities: List every named entity mentioned in the text. For each entity give:
- name: the surface form as it appears in the text.
- type: one of "PERSON", "ORG", "PRODUCT", "DATE", "MONEY".
3. causal_triples: If the text explicitly states a cause and effect, list each as a
three-element array ["cause", "relation", "effect"]. If there is no explicit
causation, return an empty list []. All elements should be concisely summarized, in three words or less.
4. sentiment: One of "positive", "neutral", or "negative".
Return a JSON object with this EXACT structure:
{
"all_results": [
{
"input_id": "1",
"entities": [
{"name": "Apple", "type": "ORG"},
{"name": "Tim Cook", "type": "PERSON"}
],
"causal_triples": [[cause_1, relation_1, effect_1], [cause_2, relation_2, effect_2], ...],
"sentiment": "positive/neutral/negative"
}
]
}
Do not include any fields besides input_id, entities, causal_triples, and sentiment.
"""
out = extract_df(
df, prompt=prompt,
chunk_size=5,
max_workers=20,
backend="openai",
model="gpt-4.1-mini",
cache_path="demo.sqlite",
id_col="id", text_col="text",
)
print(out)
entities = out[['input_id', 'entities']].explode("entities")
print(entities)
Save:
out.to_csv("extraction.csv", index=False)
Speed up: chunk_size and max_workers
max_workers sets the size of the threadpool that issues API calls in parallel. chunk_size sets how many DataFrame rows are packed into each call (one shared system prompt, chunk_size user inputs). Combining them can speed up execution by 100× compared to a naive for loop with one row per call.
out = extract_df(
df, prompt=...,
chunk_size=5, # rows packed into one API call (amortizes the system prompt)
max_workers=20, # API calls in flight at once (a thread pool)
backend="openai", model="gpt-4.1-mini",
cache_path="big.sqlite",
)
At ~1 second per call, 100,000 rows:
| Approach | API calls | Wall-clock |
|---|---|---|
One row per call, serial (a plain for loop) |
100,000 | ~28 hours |
One row per call, max_workers=20 |
100,000 | ~83 minutes |
chunk_size=5, max_workers=20 |
20,000 | ~17 minutes |
Tuning: start chunk_size=5, max_workers=20. Raise max_workers until the log shows rate-limit retries (OpenAI tier-3 handles 60–100; Anthropic tier-2 handles 20–50).
For ~50% lower per-token cost in exchange for asynchronous execution, see the batch path below.
The results cache DB (cache_path)
API calls cost money and time. The cache exists so that an interrupted, edited, or replayed run does not re-send rows that already returned.
cache_path is required. It is the path to a SQLite file. Each completed row is written to it as soon as it returns from the model. The file is not auto-deleted.
On rerun with the same cache_path, rows already in the cache are skipped; only missing rows go to the model.
The schema is three columns:
results(row_id TEXT PRIMARY KEY, json_result TEXT, prompt_hash TEXT)
Open in any SQLite browser or sqlite3 my_experiment.sqlite to inspect. Force a full re-run: fresh=True.
What if I change the prompt?
Each cached row is stamped with sha256(prompt)[:16]. On rerun, rows whose stamp matches the current prompt are skipped; rows stamped with a different prompt are re-executed and the old row is overwritten (INSERT OR REPLACE on row_id).
Three escape hatches:
- Compare prompts side by side: use a different
cache_pathfor each prompt version. - Fixed a typo and want to keep the cached rows: pass
ignore_prompt_hash=True. - Wipe and redo:
fresh=True, or delete the.sqlitefile.
Optional: a JSON schema file
By default, output structure is enforced only by the prompt; the model returns a JSON object whose shape it infers from the instructions.
Pass schema= to enforce structure at the API layer. OpenAI consumes it as response_format={"type": "json_schema", ...}; Anthropic consumes it as the input_schema of a forced tool_use.
out = extract_df(
df, prompt=my_prompt, schema="my_schema.json",
backend="openai", model="gpt-4.1-mini",
cache_path="strict.sqlite",
)
The file contains a standard OpenAI response_format object. Here is one matching the Quickstart task above:
{
"type": "json_schema",
"json_schema": {
"name": "news_extraction",
"strict": true,
"schema": {
"type": "object",
"additionalProperties": false,
"required": ["all_results"],
"properties": {
"all_results": {
"type": "array",
"items": {
"type": "object",
"additionalProperties": false,
"required": ["input_id", "entities", "causal_triples", "sentiment"],
"properties": {
"input_id": {"type": "string"},
"entities": {
"type": "array",
"items": {
"type": "object",
"additionalProperties": false,
"required": ["name", "type"],
"properties": {
"name": {"type": "string"},
"type": {"type": "string", "enum": ["PERSON", "ORG", "PRODUCT", "DATE", "MONEY", "EVENT"]}
}
}
},
"causal_triples": {
"type": "array",
"items": {
"type": "array",
"minItems": 3, "maxItems": 3,
"items": {"type": "string"}
}
},
"sentiment": {"type": "string", "enum": ["positive", "neutral", "negative"]}
}
}
}
}
}
}
}
The same file works on Anthropic: the library extracts the inner schema object and passes it as the input_schema of a tool_use.
schema= accepts a file path, a dict with the contents above, or None (the default).
Provider support
| Capability | OpenAI | Anthropic | Gemini (via OpenAI compat) |
|---|---|---|---|
| Concurrent, with schema | yes: structured outputs | yes: tool_use + prompt caching |
yes: structured outputs |
| Concurrent, no schema | yes: json_object |
yes: plain text (parsed tolerantly) | yes: json_object |
| Batch API (~50% cheaper) | yes: OpenAIBatchExtractor |
yes: AnthropicBatchExtractor |
partial: native SDK needed for upload |
Gemini and OpenRouter speak the OpenAI chat-completions API, so you reach them by setting base_url=:
# Gemini
out = extract_df(
df, prompt=my_prompt,
backend="openai", model="gemini-2.5-flash",
base_url="https://generativelanguage.googleapis.com/v1beta/openai/",
api_key=os.environ["GEMINI_API_KEY"],
cache_path="gemini.sqlite",
)
# OpenRouter (Llama, Mistral, DeepSeek, Qwen, ...)
out = extract_df(
df, prompt=my_prompt,
backend="openai", model="meta-llama/llama-3.3-70b-instruct",
base_url="https://openrouter.ai/api/v1",
api_key=os.environ["OPENROUTER_API_KEY"],
cache_path="llama.sqlite",
)
The batch path
OpenAI and Anthropic both expose a Batch API at approximately 50% of the per-token cost. Execution is asynchronous with a stated 24-hour SLA (typical completion is faster). Lifecycle: build request files, submit, poll status, retrieve.
OpenAI batch
from lmsyz_genai_ie_rfs import OpenAIBatchExtractor
ext = OpenAIBatchExtractor(batch_root_dir="my_job/")
ext.create_batch_jsonl( # 1. write JSONL input files
dataframe=df, id_col="id", text_col="text",
prompt=prompt, job_id="my_job",
model_name="gpt-4.1-mini",
schema_dict=None, # or a strict json_schema dict
)
ext.submit_batches("my_job") # 2. upload + submit
ext.check_batch_status("my_job", # 3. poll
continuous=True, interval=60)
out = ext.retrieve_results_as_dataframe("my_job") # 4. results as DataFrame
Anthropic batch
Same lifecycle, different wire format (a JSON request body, not a JSONL file upload):
from lmsyz_genai_ie_rfs import AnthropicBatchExtractor
ext = AnthropicBatchExtractor(batch_root_dir="my_job/")
ext.create_batch_requests(..., schema_dict=my_schema)
ext.submit_batch("my_job")
ext.check_batch_status("my_job", continuous=True)
out = ext.retrieve_results_as_dataframe("my_job")
All intermediate files (JSONL input, submission manifest, raw results) are written under batch_root_dir/<job_id>/ and can be inspected directly.
All knobs
extract_df(
df,
prompt=..., # required
cache_path="path.sqlite", # required: results DB on disk
backend="openai", # or "anthropic"
model="gpt-4.1-mini", # required
schema=None, # optional: path to JSON schema file, dict, or None
id_col="id",
text_col="text",
chunk_size=5, # rows per API call; 5-20 is typical
max_workers=20, # parallel threads; tune to your rate limit
fresh=False, # True to ignore prior DB contents
ignore_prompt_hash=False, # True to reuse rows produced by a different prompt
api_key=None, # overrides .env / environment
base_url=None, # for OpenRouter / Gemini compat
client=None, # BYO preconfigured SDK client
)
Failed API calls are retried automatically (tenacity, exponential backoff, up to 5 attempts) on RateLimitError and APIError. If a chunk still fails after retries, the failure is logged and the chunk's rows are omitted from the returned DataFrame.
FAQ
Why does my DataFrame have fewer rows than I expect? A chunk failed after retries. Check the log for the stack trace.
How do I restart a batch job when I lost the job_id? Look under batch_root_dir/. The directories ARE the job_ids.
Is there retry logic? Yes: tenacity, 5 attempts with exponential backoff 2-30s for RateLimitError and APIError.
What about nested lists and dicts in the output? Flatten with df.explode("entities") (each list item becomes its own row), followed by pd.json_normalize(df["entities"]) (each dict's keys become columns). CSV stringifies nested fields; save as JSONL (df.to_json(..., orient="records", lines=True)) for clean round-tripping.
License
MIT.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file lmsyz_genai_ie_rfs-0.1.0a3.tar.gz.
File metadata
- Download URL: lmsyz_genai_ie_rfs-0.1.0a3.tar.gz
- Upload date:
- Size: 38.3 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.12.2
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
4254f234974d318acace6b732c1a4db0a9dd860c40c4966c5f4bafbe3ec17a87
|
|
| MD5 |
10d71c0ec1ea86ccc867b02575a9ac9f
|
|
| BLAKE2b-256 |
088156beaefe850fba1ba9052c17885e9690d227b10740d8afb1b5ea6c1f5ab6
|
File details
Details for the file lmsyz_genai_ie_rfs-0.1.0a3-py3-none-any.whl.
File metadata
- Download URL: lmsyz_genai_ie_rfs-0.1.0a3-py3-none-any.whl
- Upload date:
- Size: 27.1 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.12.2
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
30827538fb25412159636d17937915b792c3564d3f619e2b2bed91c2047763bc
|
|
| MD5 |
1e0e00cb111e4e5cb79dae1f4b20fa89
|
|
| BLAKE2b-256 |
9d0e90d5d7bb87bd3d31ad8ad7837b1e153ef5d563cdf4128573cd4ca879427d
|