Skip to main content

High-performance async batch processing for Datalab Marker API (OCR) and OpenAI Responses API

Project description

GenAI Coroutines

High-performance async batch processing for Datalab Marker API (OCR) and OpenAI Responses API.

Built in Rust with Python bindings, genai-coroutines eliminates GIL bottlenecks and processes hundreds of concurrent API requests with production-grade reliability — smart retries, precise rate limiting, structured output parsing, and cost tracking.

Features

  • Concurrent Processing — Semaphore-based concurrency control for both APIs.
  • Smart Retry Logic — Exponential backoff with jitter. Auto-retries on 429/5xx; fails fast on 400/401.
  • Zero GIL — Native Rust async runtime bypasses Python's Global Interpreter Lock.
  • Structured Output — JSON Schema enforcement (OpenAI) and page-level structured extraction (Datalab).
  • Cost & Usage Tracking — Token-level usage (OpenAI) and cost breakdown in cents (Datalab) returned with every result.
  • Parsing Helpers — Built-in parse_responses() and parse_documents() to extract clean text/HTML.
  • Order Preservation — Results always match input order.

Quick Start

import asyncio
from genai_coroutines import (
    DocumentConfig, DocumentProcessor, parse_documents,
    ResponsesRequest, ResponsesProcessor, parse_responses,
)

Datalab OCR — DocumentConfig & DocumentProcessor

Process PDFs and images into structured text, HTML, or Markdown using the Datalab Marker API.

All Parameters — DocumentConfig

Parameter Type Default Description
api_key str Required Your Datalab API key.
api_url str "https://www.datalab.to/api/v1/marker" API endpoint URL. Override for custom/self-hosted endpoints.
output_format str "json" "json" · "html" · "markdown" · "chunks"
mode str "accurate" "fast" · "balanced" · "accurate"
Concurrency & Retry
max_concurrent_requests int 10 Maximum parallel API calls. Controls semaphore permits.
poll_interval_secs int 2 Seconds between status polling requests.
max_poll_attempts int 60 Max polling attempts before timeout.
max_retries int 5 Retry attempts on rate-limit/server errors.
base_retry_delay_secs int 5 Base delay for exponential backoff (seconds).
jitter_percent int 200 Random jitter range (±%) applied to backoff.
Structured Extraction
page_schema str (JSON) None JSON schema string for structured data extraction per page. See Structured Extraction below.
Page Control
paginate bool False Return output separated by page.
page_range str None Specific pages to process (e.g., "0-5", "0,2,4").
max_pages int None Maximum number of pages to process.
disable_image_extraction bool False Skip image extraction from documents.
Advanced
extras str (JSON) None Additional API parameters as a JSON string.
webhook_url str None URL to receive webhook callback on completion.

Structured Extraction (Datalab)

Use page_schema to extract structured fields from each page. Pass a JSON schema string describing the fields you want:

import json

schema = json.dumps({
    "type": "object",
    "properties": {
        "patient_name": {"type": "string", "description": "Full name of the patient"},
        "diagnosis": {"type": "string", "description": "Primary diagnosis"},
        "date": {"type": "string", "description": "Date of visit (YYYY-MM-DD)"}
    },
    "required": ["patient_name", "diagnosis"]
})

config = DocumentConfig(
    api_key="YOUR_DATALAB_KEY",
    output_format="json",
    mode="accurate",
    page_schema=schema  # Structured extraction
)

Note: The page_schema value is validated as valid JSON at initialization time. If the string is not valid JSON, a ValueError is raised immediately.

Usage Example

import asyncio
from genai_coroutines import DocumentConfig, DocumentProcessor, parse_documents

async def main():
    config = DocumentConfig(
        api_key="YOUR_DATALAB_KEY",
        mode="accurate",
        max_concurrent_requests=10,
        page_range="0-5",        # Only process first 6 pages
        max_pages=10              # Safety cap
    )
    processor = DocumentProcessor(config)

    # Load documents
    files = ["report1.pdf", "report2.pdf", "scan.png"]
    batch = []
    for f in files:
        with open(f, "rb") as fh:
            batch.append(fh.read())

    # Process
    results = await processor.process_multiparts(batch)

    # Parse consolidated HTML from each document
    html_list = parse_documents(results)
    for i, html in enumerate(html_list):
        print(f"Doc {i}: {len(html)} chars of HTML")

    # Access cost breakdown (native Python dict)
    for r in results:
        if r["success"] and r.get("cost_breakdown"):
            cost = r["cost_breakdown"]  # Already a dict, no json.loads needed
            print(f"Cost: {cost['final_cost_cents']} cents")

asyncio.run(main())

OCR Output Structure

Each item in the returned list:

{
    "index": 0,                      # Matches input order
    "success": True,
    "json_response": "{...}",        # Raw JSON string from Datalab API
    "cost_breakdown": {              # Cost tracking (when available)
        "final_cost_cents": 15,
        "list_cost_cents": 15
    },
    "processing_time_secs": 4.5,
    "error": None                    # Error message if success=False
}

OpenAI Responses API — ResponsesRequest & ResponsesProcessor

Batch process chat completions with structured output, reasoning models, tools, and multi-turn conversations.

All Parameters — ResponsesRequest

Parameter Type Default Description
api_key str Required Your OpenAI API key.
system_prompt str Required System instructions for the model.
user_prompts list[str] Required List of user prompts to process as a batch.
model str Required Model ID: "gpt-4o", "gpt-4o-mini", "o3-mini", etc.
response_format dict Required Output format. See Structured Output below.
timeout_secs int 60 Per-request timeout in seconds.
Concurrency & Retry
max_concurrent_requests int 10 Maximum parallel API calls. Controls semaphore permits.
max_retries int 5 Retry attempts on rate-limit/server errors.
retry_delay_min_ms int 1000 Minimum backoff delay in milliseconds.
retry_delay_max_ms int 60000 Maximum backoff delay in milliseconds.
Sampling
temperature float None Sampling temperature (0.0–2.0).
top_p float None Nucleus sampling threshold.
max_output_tokens int None Maximum tokens in the response.
Reasoning (o-series models)
reasoning_effort str None "low" · "medium" · "high" — Controls thinking depth for o3-mini, o1, etc.
reasoning_summary str None "auto" · "concise" · "detailed" — Controls reasoning summary output.
Tools & Function Calling
tools list[dict] None Tool/function definitions for function calling.
tool_choice dict None Tool selection strategy ("auto", "required", or specific tool).
parallel_tool_calls bool None Allow model to call multiple tools in parallel.
Multi-Turn
previous_response_id str None ID of a previous response to continue a conversation.
include list[str] None Additional data to include (e.g., ["file_search_call.results"]).
Other
store bool None Whether to store the response for later retrieval.
truncation str None Truncation strategy for context window overflow.
metadata dict None Custom metadata to attach to the request.
service_tier str None Service tier ("auto", "default").
stream bool None Enable streaming (advanced).

Structured Output (OpenAI)

The response_format parameter controls how the model formats its output. Three modes are supported:

1. JSON Schema (Strict)

Force the model to output valid JSON matching your exact schema:

request = ResponsesRequest(
    api_key="YOUR_KEY",
    model="gpt-4o-mini",
    system_prompt="Extract patient info from the text.",
    user_prompts=["Patient John Doe, age 45, diagnosed with hypertension on 2024-01-15."],
    response_format={
        "type": "json_schema",
        "json_schema": {
            "name": "patient_info",
            "schema": {
                "type": "object",
                "properties": {
                    "name": {"type": "string"},
                    "age": {"type": "integer"},
                    "diagnosis": {"type": "string"},
                    "date": {"type": "string"}
                },
                "required": ["name", "age", "diagnosis", "date"],
                "additionalProperties": False
            },
            "strict": True
        }
    }
)

Output (guaranteed valid JSON matching schema):

{"name": "John Doe", "age": 45, "diagnosis": "hypertension", "date": "2024-01-15"}

2. JSON Object (Flexible)

Force valid JSON output without a specific schema:

response_format={"type": "json_object"}

3. Plain Text

No format enforcement:

response_format={"type": "text"}

Reasoning Models (o3-mini, o1)

For reasoning models, control the depth of thinking and summary:

request = ResponsesRequest(
    api_key="YOUR_KEY",
    model="o3-mini",
    system_prompt="Solve this step by step.",
    user_prompts=["What is 1234 * 5678?"],
    response_format={"type": "text"},
    reasoning_effort="high",         # low | medium | high
    reasoning_summary="detailed"     # auto | concise | detailed
)

Function Calling / Tools

Define tools the model can call:

request = ResponsesRequest(
    api_key="YOUR_KEY",
    model="gpt-4o",
    system_prompt="You are a helpful assistant with access to tools.",
    user_prompts=["What's the weather in San Francisco?"],
    response_format={"type": "text"},
    tools=[{
        "type": "function",
        "function": {
            "name": "get_weather",
            "description": "Get current weather for a location",
            "parameters": {
                "type": "object",
                "properties": {
                    "location": {"type": "string"}
                },
                "required": ["location"]
            }
        }
    }],
    tool_choice="auto"
)

Multi-Turn Conversations

Continue a conversation by referencing a previous response:

# First turn
results1 = await processor.process_batch(request1)
response_id = json.loads(results1["results"][0]["raw_response"])["id"]

# Second turn
request2 = ResponsesRequest(
    api_key="YOUR_KEY",
    model="gpt-4o",
    system_prompt="You are a helpful assistant.",
    user_prompts=["Can you elaborate on that?"],
    response_format={"type": "text"},
    previous_response_id=response_id  # Continue the conversation
)
results2 = await processor.process_batch(request2)

Usage Example

import asyncio, json
from genai_coroutines import ResponsesRequest, ResponsesProcessor, parse_responses

async def main():
    request = ResponsesRequest(
        api_key="YOUR_OPENAI_KEY",
        model="gpt-4o-mini",
        system_prompt="You are an expert data extractor.",
        user_prompts=[
            "Extract: John Doe, 45, hypertension",
            "Extract: Jane Smith, 32, diabetes",
        ],
        response_format={
            "type": "json_schema",
            "json_schema": {
                "name": "patient",
                "schema": {
                    "type": "object",
                    "properties": {
                        "name": {"type": "string"},
                        "age": {"type": "integer"},
                        "condition": {"type": "string"}
                    },
                    "required": ["name", "age", "condition"],
                    "additionalProperties": False
                },
                "strict": True
            }
        },
        max_concurrent_requests=20,
        max_retries=3
    )

    processor = ResponsesProcessor()
    results = await processor.process_batch(request)

    # Parse clean text
    texts = parse_responses(results)
    for text in texts:
        data = json.loads(text)
        print(f"{data['name']}{data['condition']}")

    # Access token usage (native Python dict — no json.loads needed)
    for r in results["results"]:
        if r["success"] and r.get("usage"):
            usage = r["usage"]  # Already a dict
            print(f"Tokens: {usage['total_tokens']}")

asyncio.run(main())

OpenAI Output Structure

{
    "total_success": 2,
    "total_errors": 0,
    "results": [
        {
            "success": True,
            "raw_response": "{...}",      # Full OpenAI JSON response (string)
            "usage": {                     # Token usage (native Python dict)
                "input_tokens": 42,
                "output_tokens": 18,
                "total_tokens": 60,
                "input_tokens_details": {"cached_tokens": 0},
                "output_tokens_details": {"reasoning_tokens": 0}
            }
        },
        {
            "success": False,
            "error": "401 Unauthorized: Invalid API key",
            "error_type": "authentication_error",
            "param": None,
            "code": "invalid_api_key",
            "is_retriable": False,
            "attempts": 1
        }
    ]
}

Helper Functions

parse_responses(results) → list[str]

Extracts clean assistant message text from OpenAI batch results.

from genai_coroutines import parse_responses

texts = parse_responses(results)
# ["John Doe, 45, hypertension...", "Jane Smith, 32, diabetes..."]
  • Input: The dict returned by ResponsesProcessor.process_batch().
  • Output: List of strings — one per input prompt (same length as user_prompts).
  • Behavior:
    • Aggregates all text from output[].content[].text for each response.
    • If the model called tools instead of producing text, returns the full raw JSON so you can inspect tool calls.
    • Returns "" for failed prompts.

parse_documents(results) → list[str]

Extracts content from all OCR results, auto-detecting the output format.

from genai_coroutines import parse_documents

contents = parse_documents(results)
# ["<p>Page 1 content</p>\n<p>Page 2 content</p>", ...]
  • Input: The list returned by DocumentProcessor.process_multiparts().
  • Output: List of strings — one per input document (same length as input batch).
  • Format handling:
    • output_format="json": Extracts and concatenates json.children[].html.
    • output_format="html": Returns raw HTML string.
    • output_format="markdown": Returns markdown string.
    • paginate=True: Concatenates content from paginated output.
    • page_schema (structured extraction): Returns the full JSON string.
    • Fallback: Returns raw JSON string so nothing is ever lost.
  • Returns "" for failed documents.

Error Handling

Errors are classified internally:

Category HTTP Codes Behavior
Retriable 429, 500, 502, 503, 504, timeouts Auto-retry with exponential backoff + jitter
Fatal 400, 401, 403, 404 Fail immediately, no retry
  • Failed items have "success": False with an "error" message.
  • The batch never crashes due to individual failures — all other items continue processing.
  • For OpenAI errors, is_retriable, error_type, param, code, and attempts are included.

Logging

All Rust-level logs are bridged to Python's logging module under the genai_coroutines logger:

import logging

# See all logs
logging.basicConfig(level=logging.INFO)

# Per-module control
logging.getLogger("genai_coroutines.ocr").setLevel(logging.DEBUG)
logging.getLogger("genai_coroutines.responses").setLevel(logging.WARNING)

Sample log output:

INFO  [chandra] batch_start | files=10 concurrency=10
WARN  [chandra] rate_limit | index=3 attempt=2/5
INFO  [chandra] task_success | index=3 time=12.50s attempts=2
INFO  [chandra] batch_done | ok=10/10 errors=0

Performance Tuning

Scenario max_concurrent_requests Retry Config
Small batch (<50) 5–10 Defaults
High volume (1k+) 30–50 Increase retry_delay_max_ms
Rate-limited API 3–5 Increase jitter_percent and base_retry_delay_secs
Reasoning models 5–10 Increase timeout_secs to 120+

max_concurrent_requests controls the semaphore. Even with high concurrency, the retry logic will back off automatically on rate limits.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

genai_coroutines-1.0.0.tar.gz (39.6 kB view details)

Uploaded Source

Built Distributions

If you're not sure about the file name format, learn more about wheel file names.

genai_coroutines-1.0.0-cp39-abi3-win_amd64.whl (2.0 MB view details)

Uploaded CPython 3.9+Windows x86-64

genai_coroutines-1.0.0-cp39-abi3-manylinux_2_28_aarch64.whl (2.4 MB view details)

Uploaded CPython 3.9+manylinux: glibc 2.28+ ARM64

genai_coroutines-1.0.0-cp39-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (2.4 MB view details)

Uploaded CPython 3.9+manylinux: glibc 2.17+ x86-64

genai_coroutines-1.0.0-cp39-abi3-macosx_11_0_arm64.whl (2.1 MB view details)

Uploaded CPython 3.9+macOS 11.0+ ARM64

genai_coroutines-1.0.0-cp39-abi3-macosx_10_12_x86_64.whl (2.2 MB view details)

Uploaded CPython 3.9+macOS 10.12+ x86-64

File details

Details for the file genai_coroutines-1.0.0.tar.gz.

File metadata

  • Download URL: genai_coroutines-1.0.0.tar.gz
  • Upload date:
  • Size: 39.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: maturin/1.12.2

File hashes

Hashes for genai_coroutines-1.0.0.tar.gz
Algorithm Hash digest
SHA256 d860ebc6d6b94d7aadcf4fad733e9fd1ddba2536c514bf6a8c434976127aee4e
MD5 878ba92ea51056ced160c19e598ee9aa
BLAKE2b-256 06c5191adc52a7bb8f260e204c64feeb7dd7abdcdbf861117cb77b0c13b98196

See more details on using hashes here.

File details

Details for the file genai_coroutines-1.0.0-cp39-abi3-win_amd64.whl.

File metadata

File hashes

Hashes for genai_coroutines-1.0.0-cp39-abi3-win_amd64.whl
Algorithm Hash digest
SHA256 475460bcb9b79bab9ad15190e5899d3be2d3d3f5368b25ef195605e6c17f3536
MD5 d02b1644b33c80a7e1f8d71a8ccb2097
BLAKE2b-256 035de7c721dab5cc45ee5c721a00bafa11d2602604f7e9faa8dc43ec8ce2b949

See more details on using hashes here.

File details

Details for the file genai_coroutines-1.0.0-cp39-abi3-manylinux_2_28_aarch64.whl.

File metadata

File hashes

Hashes for genai_coroutines-1.0.0-cp39-abi3-manylinux_2_28_aarch64.whl
Algorithm Hash digest
SHA256 22178c598ad52db76c93ccd91bd22677ac0c397beccf792b9a46129fffaa46c3
MD5 37b15dd73b7e2458de0c1c7b8b996454
BLAKE2b-256 f69b68c788c2387a81bc86d3a58b5ea5d12c9ceaa0d1ab42cbbfc5b5c645d353

See more details on using hashes here.

File details

Details for the file genai_coroutines-1.0.0-cp39-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.

File metadata

File hashes

Hashes for genai_coroutines-1.0.0-cp39-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl
Algorithm Hash digest
SHA256 86dfa294f8987d419a4df42e65517f75f9647c05e0cb004ef13ec3fb07f6f70b
MD5 571520a8c835ef3bebccefd6a2f5ebc2
BLAKE2b-256 11f49729fa5ae6f63245e53fa7a2539c796dce31ef3287c768588159401489fe

See more details on using hashes here.

File details

Details for the file genai_coroutines-1.0.0-cp39-abi3-macosx_11_0_arm64.whl.

File metadata

File hashes

Hashes for genai_coroutines-1.0.0-cp39-abi3-macosx_11_0_arm64.whl
Algorithm Hash digest
SHA256 2a0fb9db67627bd38992b4756d7ec81bf49ea24b48a0406db5a6e218d0f50057
MD5 21f27504b7cb43b9cc26eaa45abea74c
BLAKE2b-256 74faeb4378f7d167fad96cf5a14056127fbd93d18d52dd81387d4208205c579f

See more details on using hashes here.

File details

Details for the file genai_coroutines-1.0.0-cp39-abi3-macosx_10_12_x86_64.whl.

File metadata

File hashes

Hashes for genai_coroutines-1.0.0-cp39-abi3-macosx_10_12_x86_64.whl
Algorithm Hash digest
SHA256 8fa1ad2181acc6a1f9e38f6285f2d929274dee54db8e60846ba15fc3f4d1ae35
MD5 63a5acac7d85a52c1998ee2e13a18c8f
BLAKE2b-256 b8c0a70796189ffce0ce8903e953585600bd4af5cd620fdf813220a7422a3b68

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page