High-performance async batch processing for Datalab Marker API (OCR) and OpenAI Responses API
Project description
GenAI Coroutines
High-performance async batch processing for Datalab Marker API (OCR) and OpenAI Responses API.
Built in Rust with Python bindings, genai-coroutines eliminates GIL bottlenecks and processes hundreds of concurrent API requests with production-grade reliability — smart retries, precise rate limiting, structured output parsing, and cost tracking.
Features
- Concurrent Processing — Semaphore-based concurrency control for both APIs.
- Smart Retry Logic — Exponential backoff with jitter. Auto-retries on 429/5xx; fails fast on 400/401.
- Zero GIL — Native Rust async runtime bypasses Python's Global Interpreter Lock.
- Structured Output — JSON Schema enforcement (OpenAI) and page-level structured extraction (Datalab).
- Cost & Usage Tracking — Token-level usage (OpenAI) and cost breakdown in cents (Datalab) returned with every result.
- Parsing Helpers — Built-in
parse_responses()andparse_documents()to extract clean text/HTML. - Order Preservation — Results always match input order.
Quick Start
import asyncio
from genai_coroutines import (
DocumentConfig, DocumentProcessor, parse_documents,
ResponsesRequest, ResponsesProcessor, parse_responses,
)
Datalab OCR — DocumentConfig & DocumentProcessor
Process PDFs and images into structured text, HTML, or Markdown using the Datalab Marker API.
All Parameters — DocumentConfig
| Parameter | Type | Default | Description |
|---|---|---|---|
api_key |
str |
Required | Your Datalab API key. |
api_url |
str |
"https://www.datalab.to/api/v1/marker" |
API endpoint URL. Override for custom/self-hosted endpoints. |
output_format |
str |
"json" |
"json" · "html" · "markdown" · "chunks" |
mode |
str |
"accurate" |
"fast" · "balanced" · "accurate" |
| Concurrency & Retry | |||
max_concurrent_requests |
int |
10 |
Maximum parallel API calls. Controls semaphore permits. |
poll_interval_secs |
int |
2 |
Seconds between status polling requests. |
max_poll_attempts |
int |
60 |
Max polling attempts before timeout. |
max_retries |
int |
5 |
Retry attempts on rate-limit/server errors. |
base_retry_delay_secs |
int |
5 |
Base delay for exponential backoff (seconds). |
jitter_percent |
int |
200 |
Random jitter range (±%) applied to backoff. |
| Structured Extraction | |||
page_schema |
str (JSON) |
None |
JSON schema string for structured data extraction per page. See Structured Extraction below. |
| Page Control | |||
paginate |
bool |
False |
Return output separated by page. |
page_range |
str |
None |
Specific pages to process (e.g., "0-5", "0,2,4"). |
max_pages |
int |
None |
Maximum number of pages to process. |
disable_image_extraction |
bool |
False |
Skip image extraction from documents. |
| Advanced | |||
extras |
str (JSON) |
None |
Additional API parameters as a JSON string. |
webhook_url |
str |
None |
URL to receive webhook callback on completion. |
Structured Extraction (Datalab)
Use page_schema to extract structured fields from each page. Pass a JSON schema string describing the fields you want:
import json
schema = json.dumps({
"type": "object",
"properties": {
"patient_name": {"type": "string", "description": "Full name of the patient"},
"diagnosis": {"type": "string", "description": "Primary diagnosis"},
"date": {"type": "string", "description": "Date of visit (YYYY-MM-DD)"}
},
"required": ["patient_name", "diagnosis"]
})
config = DocumentConfig(
api_key="YOUR_DATALAB_KEY",
output_format="json",
mode="accurate",
page_schema=schema # Structured extraction
)
Note: The
page_schemavalue is validated as valid JSON at initialization time. If the string is not valid JSON, aValueErroris raised immediately.
Usage Example
import asyncio
from genai_coroutines import DocumentConfig, DocumentProcessor, parse_documents
async def main():
config = DocumentConfig(
api_key="YOUR_DATALAB_KEY",
mode="accurate",
max_concurrent_requests=10,
page_range="0-5", # Only process first 6 pages
max_pages=10 # Safety cap
)
processor = DocumentProcessor(config)
# Load documents
files = ["report1.pdf", "report2.pdf", "scan.png"]
batch = []
for f in files:
with open(f, "rb") as fh:
batch.append(fh.read())
# Process
results = await processor.process_multiparts(batch)
# Parse consolidated HTML from each document
html_list = parse_documents(results)
for i, html in enumerate(html_list):
print(f"Doc {i}: {len(html)} chars of HTML")
# Access cost breakdown (native Python dict)
for r in results:
if r["success"] and r.get("cost_breakdown"):
cost = r["cost_breakdown"] # Already a dict, no json.loads needed
print(f"Cost: {cost['final_cost_cents']} cents")
asyncio.run(main())
OCR Output Structure
Each item in the returned list:
{
"index": 0, # Matches input order
"success": True,
"json_response": "{...}", # Raw JSON string from Datalab API
"cost_breakdown": { # Cost tracking (when available)
"final_cost_cents": 15,
"list_cost_cents": 15
},
"processing_time_secs": 4.5,
"error": None # Error message if success=False
}
OpenAI Responses API — ResponsesRequest & ResponsesProcessor
Batch process chat completions with structured output, reasoning models, tools, and multi-turn conversations.
All Parameters — ResponsesRequest
| Parameter | Type | Default | Description |
|---|---|---|---|
api_key |
str |
Required | Your OpenAI API key. |
system_prompt |
str |
Required | System instructions for the model. |
user_prompts |
list[str] |
Required | List of user prompts to process as a batch. |
model |
str |
Required | Model ID: "gpt-4o", "gpt-4o-mini", "o3-mini", etc. |
response_format |
dict |
Required | Output format. See Structured Output below. |
timeout_secs |
int |
60 |
Per-request timeout in seconds. |
| Concurrency & Retry | |||
max_concurrent_requests |
int |
10 |
Maximum parallel API calls. Controls semaphore permits. |
max_retries |
int |
5 |
Retry attempts on rate-limit/server errors. |
retry_delay_min_ms |
int |
1000 |
Minimum backoff delay in milliseconds. |
retry_delay_max_ms |
int |
60000 |
Maximum backoff delay in milliseconds. |
| Sampling | |||
temperature |
float |
None |
Sampling temperature (0.0–2.0). |
top_p |
float |
None |
Nucleus sampling threshold. |
max_output_tokens |
int |
None |
Maximum tokens in the response. |
| Reasoning (o-series models) | |||
reasoning_effort |
str |
None |
"low" · "medium" · "high" — Controls thinking depth for o3-mini, o1, etc. |
reasoning_summary |
str |
None |
"auto" · "concise" · "detailed" — Controls reasoning summary output. |
| Tools & Function Calling | |||
tools |
list[dict] |
None |
Tool/function definitions for function calling. |
tool_choice |
dict |
None |
Tool selection strategy ("auto", "required", or specific tool). |
parallel_tool_calls |
bool |
None |
Allow model to call multiple tools in parallel. |
| Multi-Turn | |||
previous_response_id |
str |
None |
ID of a previous response to continue a conversation. |
include |
list[str] |
None |
Additional data to include (e.g., ["file_search_call.results"]). |
| Other | |||
store |
bool |
None |
Whether to store the response for later retrieval. |
truncation |
str |
None |
Truncation strategy for context window overflow. |
metadata |
dict |
None |
Custom metadata to attach to the request. |
service_tier |
str |
None |
Service tier ("auto", "default"). |
stream |
bool |
None |
Enable streaming (advanced). |
Structured Output (OpenAI)
The response_format parameter controls how the model formats its output. Three modes are supported:
1. JSON Schema (Strict)
Force the model to output valid JSON matching your exact schema:
request = ResponsesRequest(
api_key="YOUR_KEY",
model="gpt-4o-mini",
system_prompt="Extract patient info from the text.",
user_prompts=["Patient John Doe, age 45, diagnosed with hypertension on 2024-01-15."],
response_format={
"type": "json_schema",
"json_schema": {
"name": "patient_info",
"schema": {
"type": "object",
"properties": {
"name": {"type": "string"},
"age": {"type": "integer"},
"diagnosis": {"type": "string"},
"date": {"type": "string"}
},
"required": ["name", "age", "diagnosis", "date"],
"additionalProperties": False
},
"strict": True
}
}
)
Output (guaranteed valid JSON matching schema):
{"name": "John Doe", "age": 45, "diagnosis": "hypertension", "date": "2024-01-15"}
2. JSON Object (Flexible)
Force valid JSON output without a specific schema:
response_format={"type": "json_object"}
3. Plain Text
No format enforcement:
response_format={"type": "text"}
Reasoning Models (o3-mini, o1)
For reasoning models, control the depth of thinking and summary:
request = ResponsesRequest(
api_key="YOUR_KEY",
model="o3-mini",
system_prompt="Solve this step by step.",
user_prompts=["What is 1234 * 5678?"],
response_format={"type": "text"},
reasoning_effort="high", # low | medium | high
reasoning_summary="detailed" # auto | concise | detailed
)
Function Calling / Tools
Define tools the model can call:
request = ResponsesRequest(
api_key="YOUR_KEY",
model="gpt-4o",
system_prompt="You are a helpful assistant with access to tools.",
user_prompts=["What's the weather in San Francisco?"],
response_format={"type": "text"},
tools=[{
"type": "function",
"function": {
"name": "get_weather",
"description": "Get current weather for a location",
"parameters": {
"type": "object",
"properties": {
"location": {"type": "string"}
},
"required": ["location"]
}
}
}],
tool_choice="auto"
)
Multi-Turn Conversations
Continue a conversation by referencing a previous response:
# First turn
results1 = await processor.process_batch(request1)
response_id = json.loads(results1["results"][0]["raw_response"])["id"]
# Second turn
request2 = ResponsesRequest(
api_key="YOUR_KEY",
model="gpt-4o",
system_prompt="You are a helpful assistant.",
user_prompts=["Can you elaborate on that?"],
response_format={"type": "text"},
previous_response_id=response_id # Continue the conversation
)
results2 = await processor.process_batch(request2)
Usage Example
import asyncio, json
from genai_coroutines import ResponsesRequest, ResponsesProcessor, parse_responses
async def main():
request = ResponsesRequest(
api_key="YOUR_OPENAI_KEY",
model="gpt-4o-mini",
system_prompt="You are an expert data extractor.",
user_prompts=[
"Extract: John Doe, 45, hypertension",
"Extract: Jane Smith, 32, diabetes",
],
response_format={
"type": "json_schema",
"json_schema": {
"name": "patient",
"schema": {
"type": "object",
"properties": {
"name": {"type": "string"},
"age": {"type": "integer"},
"condition": {"type": "string"}
},
"required": ["name", "age", "condition"],
"additionalProperties": False
},
"strict": True
}
},
max_concurrent_requests=20,
max_retries=3
)
processor = ResponsesProcessor()
results = await processor.process_batch(request)
# Parse clean text
texts = parse_responses(results)
for text in texts:
data = json.loads(text)
print(f"{data['name']} — {data['condition']}")
# Access token usage (native Python dict — no json.loads needed)
for r in results["results"]:
if r["success"] and r.get("usage"):
usage = r["usage"] # Already a dict
print(f"Tokens: {usage['total_tokens']}")
asyncio.run(main())
OpenAI Output Structure
{
"total_success": 2,
"total_errors": 0,
"results": [
{
"success": True,
"raw_response": "{...}", # Full OpenAI JSON response (string)
"usage": { # Token usage (native Python dict)
"input_tokens": 42,
"output_tokens": 18,
"total_tokens": 60,
"input_tokens_details": {"cached_tokens": 0},
"output_tokens_details": {"reasoning_tokens": 0}
}
},
{
"success": False,
"error": "401 Unauthorized: Invalid API key",
"error_type": "authentication_error",
"param": None,
"code": "invalid_api_key",
"is_retriable": False,
"attempts": 1
}
]
}
Helper Functions
parse_responses(results) → list[str]
Extracts clean assistant message text from OpenAI batch results.
from genai_coroutines import parse_responses
texts = parse_responses(results)
# ["John Doe, 45, hypertension...", "Jane Smith, 32, diabetes..."]
- Input: The dict returned by
ResponsesProcessor.process_batch(). - Output: List of strings — one per input prompt (same length as
user_prompts). - Behavior:
- Aggregates all text from
output[].content[].textfor each response. - If the model called tools instead of producing text, returns the full raw JSON so you can inspect tool calls.
- Returns
""for failed prompts.
- Aggregates all text from
parse_documents(results) → list[str]
Extracts content from all OCR results, auto-detecting the output format.
from genai_coroutines import parse_documents
contents = parse_documents(results)
# ["<p>Page 1 content</p>\n<p>Page 2 content</p>", ...]
- Input: The list returned by
DocumentProcessor.process_multiparts(). - Output: List of strings — one per input document (same length as input batch).
- Format handling:
output_format="json": Extracts and concatenatesjson.children[].html.output_format="html": Returns raw HTML string.output_format="markdown": Returns markdown string.paginate=True: Concatenates content from paginated output.page_schema(structured extraction): Returns the full JSON string.- Fallback: Returns raw JSON string so nothing is ever lost.
- Returns
""for failed documents.
Error Handling
Errors are classified internally:
| Category | HTTP Codes | Behavior |
|---|---|---|
| Retriable | 429, 500, 502, 503, 504, timeouts | Auto-retry with exponential backoff + jitter |
| Fatal | 400, 401, 403, 404 | Fail immediately, no retry |
- Failed items have
"success": Falsewith an"error"message. - The batch never crashes due to individual failures — all other items continue processing.
- For OpenAI errors,
is_retriable,error_type,param,code, andattemptsare included.
Logging
All Rust-level logs are bridged to Python's logging module under the genai_coroutines logger:
import logging
# See all logs
logging.basicConfig(level=logging.INFO)
# Per-module control
logging.getLogger("genai_coroutines.ocr").setLevel(logging.DEBUG)
logging.getLogger("genai_coroutines.responses").setLevel(logging.WARNING)
Sample log output:
INFO [chandra] batch_start | files=10 concurrency=10
WARN [chandra] rate_limit | index=3 attempt=2/5
INFO [chandra] task_success | index=3 time=12.50s attempts=2
INFO [chandra] batch_done | ok=10/10 errors=0
Performance Tuning
| Scenario | max_concurrent_requests |
Retry Config |
|---|---|---|
| Small batch (<50) | 5–10 | Defaults |
| High volume (1k+) | 30–50 | Increase retry_delay_max_ms |
| Rate-limited API | 3–5 | Increase jitter_percent and base_retry_delay_secs |
| Reasoning models | 5–10 | Increase timeout_secs to 120+ |
max_concurrent_requestscontrols the semaphore. Even with high concurrency, the retry logic will back off automatically on rate limits.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distributions
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file genai_coroutines-1.0.0.tar.gz.
File metadata
- Download URL: genai_coroutines-1.0.0.tar.gz
- Upload date:
- Size: 39.6 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: maturin/1.12.2
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
d860ebc6d6b94d7aadcf4fad733e9fd1ddba2536c514bf6a8c434976127aee4e
|
|
| MD5 |
878ba92ea51056ced160c19e598ee9aa
|
|
| BLAKE2b-256 |
06c5191adc52a7bb8f260e204c64feeb7dd7abdcdbf861117cb77b0c13b98196
|
File details
Details for the file genai_coroutines-1.0.0-cp39-abi3-win_amd64.whl.
File metadata
- Download URL: genai_coroutines-1.0.0-cp39-abi3-win_amd64.whl
- Upload date:
- Size: 2.0 MB
- Tags: CPython 3.9+, Windows x86-64
- Uploaded using Trusted Publishing? No
- Uploaded via: maturin/1.12.2
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
475460bcb9b79bab9ad15190e5899d3be2d3d3f5368b25ef195605e6c17f3536
|
|
| MD5 |
d02b1644b33c80a7e1f8d71a8ccb2097
|
|
| BLAKE2b-256 |
035de7c721dab5cc45ee5c721a00bafa11d2602604f7e9faa8dc43ec8ce2b949
|
File details
Details for the file genai_coroutines-1.0.0-cp39-abi3-manylinux_2_28_aarch64.whl.
File metadata
- Download URL: genai_coroutines-1.0.0-cp39-abi3-manylinux_2_28_aarch64.whl
- Upload date:
- Size: 2.4 MB
- Tags: CPython 3.9+, manylinux: glibc 2.28+ ARM64
- Uploaded using Trusted Publishing? No
- Uploaded via: maturin/1.12.2
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
22178c598ad52db76c93ccd91bd22677ac0c397beccf792b9a46129fffaa46c3
|
|
| MD5 |
37b15dd73b7e2458de0c1c7b8b996454
|
|
| BLAKE2b-256 |
f69b68c788c2387a81bc86d3a58b5ea5d12c9ceaa0d1ab42cbbfc5b5c645d353
|
File details
Details for the file genai_coroutines-1.0.0-cp39-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.
File metadata
- Download URL: genai_coroutines-1.0.0-cp39-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl
- Upload date:
- Size: 2.4 MB
- Tags: CPython 3.9+, manylinux: glibc 2.17+ x86-64
- Uploaded using Trusted Publishing? No
- Uploaded via: maturin/1.12.2
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
86dfa294f8987d419a4df42e65517f75f9647c05e0cb004ef13ec3fb07f6f70b
|
|
| MD5 |
571520a8c835ef3bebccefd6a2f5ebc2
|
|
| BLAKE2b-256 |
11f49729fa5ae6f63245e53fa7a2539c796dce31ef3287c768588159401489fe
|
File details
Details for the file genai_coroutines-1.0.0-cp39-abi3-macosx_11_0_arm64.whl.
File metadata
- Download URL: genai_coroutines-1.0.0-cp39-abi3-macosx_11_0_arm64.whl
- Upload date:
- Size: 2.1 MB
- Tags: CPython 3.9+, macOS 11.0+ ARM64
- Uploaded using Trusted Publishing? No
- Uploaded via: maturin/1.12.2
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
2a0fb9db67627bd38992b4756d7ec81bf49ea24b48a0406db5a6e218d0f50057
|
|
| MD5 |
21f27504b7cb43b9cc26eaa45abea74c
|
|
| BLAKE2b-256 |
74faeb4378f7d167fad96cf5a14056127fbd93d18d52dd81387d4208205c579f
|
File details
Details for the file genai_coroutines-1.0.0-cp39-abi3-macosx_10_12_x86_64.whl.
File metadata
- Download URL: genai_coroutines-1.0.0-cp39-abi3-macosx_10_12_x86_64.whl
- Upload date:
- Size: 2.2 MB
- Tags: CPython 3.9+, macOS 10.12+ x86-64
- Uploaded using Trusted Publishing? No
- Uploaded via: maturin/1.12.2
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
8fa1ad2181acc6a1f9e38f6285f2d929274dee54db8e60846ba15fc3f4d1ae35
|
|
| MD5 |
63a5acac7d85a52c1998ee2e13a18c8f
|
|
| BLAKE2b-256 |
b8c0a70796189ffce0ce8903e953585600bd4af5cd620fdf813220a7422a3b68
|