Concurrent and batch LLM extraction over a pandas DataFrame, no schema boilerplate.
Project description
GenAI Information Extraction for DataFrames
A general-purpose library for prompt-based information extraction over DataFrames, with concurrent and batch execution against OpenAI, Anthropic, and models on OpenRouter.
Cite as: Li, Mai, Shen, Yang, Zhang (2026), RFS. Full citation and BibTeX at the bottom of this page.
Install
pip install -U lmsyz_genai_ie_rfs
Set at least one provider key (or drop them in a .env file next to your notebook):
import os
os.environ["OPENAI_API_KEY"] = "sk-..."
# os.environ["ANTHROPIC_API_KEY"] = "sk-ant-..."
# os.environ["OPENROUTER_API_KEY"] = "sk-or-..."
Quickstart: entity + relation extraction
Three short news sentences in:
| id | text |
|---|---|
| 1 | Apple CEO Tim Cook announced the iPhone 17 at WWDC in June 2025. |
| 2 | Tesla acquired SolarCity in 2016 for $2.6 billion to enter the solar market. |
| 3 | Pfizer's decision to spin off its consumer health unit was driven by activist pressure from Trian Partners. |
Structured DataFrame out:
| input_id | entities | causal_triples | sentiment |
|---|---|---|---|
| 1 | [{Tim Cook, PERSON}, {Apple, ORG}, {iPhone 17, PRODUCT}, {WWDC, EVENT}, {June 2025, DATE}] |
[] |
neutral |
| 2 | [{Tesla, ORG}, {SolarCity, ORG}, {2016, DATE}, {$2.6 billion, MONEY}] |
[[acquisition, enabled, market entry]] |
positive |
| 3 | [{Pfizer, ORG}, {Trian Partners, ORG}] |
[[activist pressure, drove, spin-off]] |
neutral |
One function call:
import os
import pandas as pd
from lmsyz_genai_ie_rfs import extract_df
df = pd.DataFrame({
"id": [1, 2, 3],
"text": [
"Apple CEO Tim Cook announced the iPhone 17 at WWDC in June 2025.",
"Tesla's $2.6 billion acquisition of SolarCity in 2016 enabled its entry into the solar market.",
"Pfizer's decision to spin off its consumer health unit was driven by activist pressure from Trian Partners.",
],
})
prompt = """
You are an information-extraction assistant. For each input row, analyze the text and extract structured information.
Step-by-step instructions:
1. input_id: Copy the input_id from the row verbatim.
2. entities: List every named entity mentioned in the text. For each entity give:
- name: the surface form as it appears in the text.
- type: one of "PERSON", "ORG", "PRODUCT", "DATE", "MONEY".
3. causal_triples: If the text explicitly states a cause and effect, list each as a
three-element array ["cause", "relation", "effect"]. If there is no explicit
causation, return an empty list []. All elements should be concisely summarized, in three words or less.
4. sentiment: One of "positive", "neutral", or "negative".
Return a JSON object with this EXACT structure:
{
"all_results": [
{
"input_id": "1",
"entities": [
{"name": "Apple", "type": "ORG"},
{"name": "Tim Cook", "type": "PERSON"}
],
"causal_triples": [[cause_1, relation_1, effect_1], [cause_2, relation_2, effect_2], ...],
"sentiment": "positive/neutral/negative"
}
]
}
Do not include any fields besides input_id, entities, causal_triples, and sentiment.
"""
out = extract_df(
df, prompt=prompt,
chunk_size=5,
max_workers=20,
backend="openai",
model="gpt-4.1-mini",
cache_path="demo.sqlite",
id_col="id", text_col="text",
api_key=os.environ["OPENAI_API_KEY"],
)
print(out)
entities = out[['input_id', 'entities']].explode("entities")
print(entities)
Save:
out.to_csv("extraction.csv", index=False)
Tip: No prompt yet? draft_prompt(goal="...") returns a starter you can edit. See draft_prompt below.
Speed up: chunk_size and max_workers
max_workers sets the size of the threadpool that issues API calls in parallel. chunk_size sets how many DataFrame rows are packed into each call (one shared system prompt, chunk_size user inputs). Combining them can speed up execution by 100× compared to a naive for loop with one row per call.
out = extract_df(
df, prompt=...,
chunk_size=5, # rows packed into one API call (amortizes the system prompt)
max_workers=20, # API calls in flight at once (a thread pool)
backend="openai", model="gpt-4.1-mini",
cache_path="big.sqlite",
)
At ~1 second per call, 100,000 rows:
| Approach | API calls | Wall-clock |
|---|---|---|
One row per call, serial (a plain for loop) |
100,000 | ~28 hours |
One row per call, max_workers=20 |
100,000 | ~83 minutes |
chunk_size=5, max_workers=20 |
20,000 | ~17 minutes |
Tuning: start chunk_size=5, max_workers=20. Raise max_workers until the log shows rate-limit retries (OpenAI tier-3 handles 60–100; Anthropic tier-2 handles 20–50).
For ~50% lower per-token cost in exchange for asynchronous execution, see the batch API below.
The results cache DB (cache_path)
API calls cost money and time. The cache exists so that an interrupted, edited, or replayed run does not re-send rows that already returned.
cache_path is required. It is the path to a SQLite file. Each completed row is written to it as soon as it returns from the model. The file is not auto-deleted.
On rerun with the same cache_path, rows already in the cache are skipped; only missing rows go to the model.
The schema is three columns:
results(row_id TEXT PRIMARY KEY, json_result TEXT, prompt_hash TEXT)
Open in any SQLite browser or sqlite3 my_experiment.sqlite to inspect. Force a full re-run: fresh=True.
What if I change the prompt?
Each cached row is stamped with sha256(prompt)[:16]. On rerun, rows whose stamp matches the current prompt are skipped; rows stamped with a different prompt are re-executed and the old row is overwritten (INSERT OR REPLACE on row_id).
Three escape hatches:
- Compare prompts side by side: use a different
cache_pathfor each prompt version. - Fixed a typo and want to keep the cached rows: pass
ignore_prompt_hash=True. - Wipe and redo:
fresh=True, or delete the.sqlitefile.
Batch API: ~50% cheaper
OpenAI and Anthropic both expose a Batch API at approximately 50% of the per-token cost. Execution is asynchronous with a stated 24-hour SLA (typical completion is faster). Lifecycle: build request files, submit, poll status, retrieve.
OpenAI batch
from lmsyz_genai_ie_rfs import OpenAIBatchExtractor
ext = OpenAIBatchExtractor(batch_root_dir="my_job/")
ext.create_batch_jsonl( # 1. write JSONL input files
dataframe=df, id_col="id", text_col="text",
prompt=prompt, job_id="my_job",
model_name="gpt-4.1-mini",
schema_dict=None, # or a strict json_schema dict
)
ext.submit_batches("my_job") # 2. upload + submit
ext.check_batch_status("my_job", # 3. poll
continuous=True, interval=60)
out = ext.retrieve_results_as_dataframe("my_job") # 4. results as DataFrame
Anthropic batch
Same lifecycle, different wire format (a JSON request body, not a JSONL file upload):
from lmsyz_genai_ie_rfs import AnthropicBatchExtractor
ext = AnthropicBatchExtractor(batch_root_dir="my_job/")
ext.create_batch_requests(..., schema_dict=my_schema)
ext.submit_batch("my_job")
ext.check_batch_status("my_job", continuous=True)
out = ext.retrieve_results_as_dataframe("my_job")
if out is None:
print("Batch not finished yet; check status and retry.")
else:
print(out.head())
All intermediate files (JSONL input, submission manifest, raw results) are written under batch_root_dir/<job_id>/ and can be inspected directly.
Model providers
Two native backends (OpenAI, Anthropic), plus any OpenAI-compatible endpoint reachable by setting base_url=. The same extract_df call shape works for all of them; only model, base_url, and api_key change.
To switch to Anthropic, change backend and model:
out = extract_df(df, prompt=prompt, cache_path="run.sqlite",
backend="anthropic", model="claude-3-5-sonnet-20241022")
| Capability | OpenAI | Anthropic | OpenAI-compatible (OpenRouter, Gemini, Together, vLLM, ...) |
|---|---|---|---|
| Concurrent, no schema | yes: json_object |
yes: plain text (parsed tolerantly) | model-dependent |
| Concurrent, with schema | yes: structured outputs | yes: tool_use + prompt caching |
model-dependent (gateway forwards response_format) |
| Batch API (~50% cheaper) | yes: OpenAIBatchExtractor |
yes: AnthropicBatchExtractor |
use the gateway's native batch API |
| Long-prompt caching | automatic (1024+ tokens) | explicit cache_control on system |
upstream-model-dependent |
OpenRouter: one key, hundreds of models
OpenRouter is a gateway: a single key + base_url puts you in front of Llama, Mistral, DeepSeek, Qwen, Gemini, Claude, and many more. Swap model= for whichever you want.
out = extract_df(
df, prompt=my_prompt,
backend="openai",
base_url="https://openrouter.ai/api/v1",
api_key=os.environ["OPENROUTER_API_KEY"],
model="qwen/qwen-2.5-72b-instruct", # Qwen
# model="deepseek/deepseek-chat", # DeepSeek
# model="meta-llama/llama-3.3-70b-instruct", # Llama
# model="mistralai/mistral-large", # Mistral
# model="anthropic/claude-haiku-4.5", # Claude (proxied)
cache_path="openrouter.sqlite",
)
JSON-mode support varies by model on OpenRouter. If a model returns malformed JSON, switch to one that supports response_format, or pass a strict schema= so the gateway routes via JSON-schema mode.
Gemini direct
out = extract_df(
df, prompt=my_prompt,
backend="openai", model="gemini-2.5-flash",
base_url="https://generativelanguage.googleapis.com/v1beta/openai/",
api_key=os.environ["GEMINI_API_KEY"],
cache_path="gemini.sqlite",
)
Drafting a prompt with draft_prompt
Pass a sentence describing what you want to extract; draft_prompt returns a candidate prompt in the package's standard structure (a numbered field list followed by a {"all_results": [...]} JSON envelope) for you to edit:
import os
from lmsyz_genai_ie_rfs import draft_prompt
prompt = draft_prompt(
goal="Tag short product reviews with sentiment (positive/neutral/negative) and a confidence score 0-1.",
api_key=os.environ["OPENAI_API_KEY"],
)
Prints and returns:
You are an information-extraction assistant. For each input row, analyze the product review and extract structured information.
Step-by-step instructions:
1. input_id: Copy the input_id from the row verbatim.
2. sentiment: Tag the review's overall sentiment as one of "positive", "neutral", or "negative".
3. confidence: Provide a confidence score for the sentiment tag as a float between 0 and 1 inclusive.
Return a JSON object with this EXACT structure:
{
"all_results": [
{
"input_id": "42",
"sentiment": "positive",
"confidence": 0.87
}
]
}
Do not include any fields besides input_id, sentiment, and confidence.
Read the result, tighten enums, add any domain-specific instructions, then pass it to extract_df(prompt=prompt, ...). Defaults are backend="openai", model="gpt-4.1-mini"; pass backend="anthropic" to use Claude instead. The temperature is fixed at 0, so the same goal reproduces the same starting prompt.
Optional: a JSON schema file
By default, output structure is enforced only by the prompt; the model returns a JSON object whose shape it infers from the instructions.
Pass schema= to enforce structure at the API layer. OpenAI consumes it as response_format={"type": "json_schema", ...}; Anthropic consumes it as the input_schema of a forced tool_use.
out = extract_df(
df, prompt=my_prompt, schema="my_schema.json",
backend="openai", model="gpt-4.1-mini",
cache_path="strict.sqlite",
)
The file contains a standard OpenAI response_format object. Here is one matching the Quickstart task above:
{
"type": "json_schema",
"json_schema": {
"name": "news_extraction",
"strict": true,
"schema": {
"type": "object",
"additionalProperties": false,
"required": ["all_results"],
"properties": {
"all_results": {
"type": "array",
"items": {
"type": "object",
"additionalProperties": false,
"required": ["input_id", "entities", "causal_triples", "sentiment"],
"properties": {
"input_id": {"type": "string"},
"entities": {
"type": "array",
"items": {
"type": "object",
"additionalProperties": false,
"required": ["name", "type"],
"properties": {
"name": {"type": "string"},
"type": {"type": "string", "enum": ["PERSON", "ORG", "PRODUCT", "DATE", "MONEY", "EVENT"]}
}
}
},
"causal_triples": {
"type": "array",
"items": {
"type": "array",
"minItems": 3, "maxItems": 3,
"items": {"type": "string"}
}
},
"sentiment": {"type": "string", "enum": ["positive", "neutral", "negative"]}
}
}
}
}
}
}
}
The same file works on Anthropic: the library extracts the inner schema object and passes it as the input_schema of a tool_use.
schema= accepts a file path, a dict with the contents above, or None (the default).
All knobs
extract_df(
df,
prompt=..., # required
cache_path="path.sqlite", # required: results DB on disk
backend="openai", # or "anthropic"
model="gpt-4.1-mini", # required
schema=None, # optional: path to JSON schema file, dict, or None
id_col="id",
text_col="text",
chunk_size=5, # rows per API call; 5-20 is typical
max_workers=20, # parallel threads; tune to your rate limit
fresh=False, # True to ignore prior DB contents
ignore_prompt_hash=False, # True to reuse rows produced by a different prompt
api_key=None, # overrides .env / environment
base_url=None, # for OpenRouter / Gemini compat
client=None, # BYO preconfigured SDK client
)
Failed API calls are retried automatically (tenacity, exponential backoff, up to 5 attempts) on RateLimitError and APIError. If a chunk still fails after retries, the failure is logged and the chunk's rows are omitted from the returned DataFrame.
draft_prompt knobs
draft_prompt(
goal, # required: plain-English description of what to extract
backend="openai", # or "anthropic"
model="gpt-4.1-mini", # model for the meta-call; any chat model works
api_key=None, # overrides .env / environment
base_url=None, # for OpenRouter / Gemini compat
print_prompt=True, # print result to stdout (useful in notebooks)
)
draft_prompt returns the generated prompt string. Set print_prompt=False to suppress
the automatic print and capture only the return value.
FAQ
Why does my DataFrame have fewer rows than I expect? A chunk failed after retries. Check the log for the stack trace.
How do I restart a batch job when I lost the job_id? Look under batch_root_dir/. The directories ARE the job_ids.
Is there retry logic? Yes: tenacity, 5 attempts with exponential backoff 2-30s for RateLimitError and APIError.
What about nested lists and dicts in the output? Flatten with df.explode("entities") (each list item becomes its own row), followed by pd.json_normalize(df["entities"]) (each dict's keys become columns). CSV stringifies nested fields; save as JSONL (df.to_json(..., orient="records", lines=True)) for clean round-tripping.
Documentation
Full docs (concepts, how-to guides, API reference): https://maifeng.github.io/lmsyz_genai_ie_rfs/
Citation
If you find this library useful in your research, please cite:
Li, Kai, Feng Mai, Rui Shen, Chelsea Yang, and Tengfei Zhang (2026). "Dissecting Corporate Culture Using Generative AI." Review of Financial Studies 39(1):253–296. doi.org/10.1093/rfs/hhaf081.
@article{li2026dissecting,
title = {Dissecting Corporate Culture Using Generative {AI}},
author = {Li, Kai and Mai, Feng and Shen, Rui and Yang, Chelsea and Zhang, Tengfei},
journal = {Review of Financial Studies},
volume = {39},
number = {1},
pages = {253--296},
year = {2026},
doi = {10.1093/rfs/hhaf081},
}
GitHub
Source code: https://github.com/maifeng/lmsyz_genai_ie_rfs
License
MIT.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file lmsyz_genai_ie_rfs-0.1.3.tar.gz.
File metadata
- Download URL: lmsyz_genai_ie_rfs-0.1.3.tar.gz
- Upload date:
- Size: 43.7 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.12.4
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
f479cda245008acbb94293c052edc1a98fc78f994cc1fe786d42aabc3f043fff
|
|
| MD5 |
266c7eee1d056b277ef2505e922d4dde
|
|
| BLAKE2b-256 |
037087edfdda3b25fc8c7c7defd9223162599e2cf428cae35e4a6a102c64a61f
|
File details
Details for the file lmsyz_genai_ie_rfs-0.1.3-py3-none-any.whl.
File metadata
- Download URL: lmsyz_genai_ie_rfs-0.1.3-py3-none-any.whl
- Upload date:
- Size: 31.7 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.12.4
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
066d99e4a5f3b61ed59e6b17886855728c4994e1457344166226e391535ecb5c
|
|
| MD5 |
42bd4accce9375221f7bbfd129a5f91f
|
|
| BLAKE2b-256 |
065d6432a10977c8ee2653b8ad18ec2abc906afe8bb6b01bbf518c8f3f2a0faa
|