Skip to main content

High-performance LLM client with batch processing, caching, and checkpoint recovery

Project description

flexllm

Production-grade LLM client with checkpoint recovery, response caching, and multi-provider support

PyPI version License pypi downloads


Features

Feature Description
Checkpoint Recovery Batch jobs auto-resume from interruption - process millions of requests without losing progress
Response Caching Built-in intelligent caching with TTL and IPC multi-process sharing - avoid duplicate API calls
Multi-Provider One interface for OpenAI, Gemini, Claude, and any OpenAI-compatible API (vLLM, Ollama, etc.)
High-Performance Async Fine-grained concurrency control, QPS limiting, and streaming batch results
Load Balancing Multi-endpoint distribution with automatic failover (round_robin/weighted/random/fallback)

Core Strengths

1. Checkpoint Recovery - Never Lose Progress

Process millions of requests without fear of interruption. When your batch job crashes at 3 AM, just restart it - flexllm picks up exactly where it left off.

# Process 100,000 requests - if interrupted, resume automatically
results = await client.chat_completions_batch(
    messages_list,
    output_jsonl="results.jsonl",  # Progress saved here
)
# Ctrl+C at 50,000? No problem. Re-run and it continues from 50,001.

2. Response Caching - Save Money, Save Time

Built-in intelligent caching avoids duplicate API calls. Same question? Instant answer from cache.

client = LLMClient(
    model="gpt-4",
    cache=ResponseCacheConfig.with_ttl(3600),  # 1 hour cache
)

# First call: API request (~2s, ~$0.01)
result1 = await client.chat_completions(messages)

# Second call: Cache hit (~0.001s, $0)
result2 = await client.chat_completions(messages)

Supports multi-process cache sharing via IPC - perfect for distributed workloads.

3. One Interface, All Providers

Write once, run everywhere. Switch between OpenAI, Gemini, Claude, or self-hosted models without changing your code.

# OpenAI
client = LLMClient(provider="openai", base_url="https://api.openai.com/v1", ...)

# Gemini
client = LLMClient(provider="gemini", api_key="...", model="gemini-2.0-flash")

# Claude
client = LLMClient(provider="claude", api_key="...", model="claude-sonnet-4-20250514")

# Self-hosted (vLLM, Ollama, etc.)
client = LLMClient(base_url="http://localhost:8000/v1", model="qwen2.5")

# Same API for all:
result = await client.chat_completions(messages)

4. High-Performance Async Engine

Maximize throughput with fine-grained concurrency control and QPS limiting.

client = LLMClient(
    concurrency_limit=100,  # 100 concurrent requests
    max_qps=50,             # Rate limit: 50 req/sec
    retry_times=3,          # Auto-retry on failure
)

# Process 10,000 requests with optimal parallelism
results = await client.chat_completions_batch(messages_list, show_progress=True)

Streaming results - process results as they complete, don't wait for all:

async for result in client.iter_chat_completions_batch(messages_list):
    process(result)  # Handle each result immediately

5. Load Balancing & Failover

Distribute workloads across multiple endpoints with automatic failover.

pool = LLMClientPool(
    endpoints=[
        {"base_url": "http://gpu1:8000/v1", "model": "qwen"},
        {"base_url": "http://gpu2:8000/v1", "model": "qwen"},
        {"base_url": "http://gpu3:8000/v1", "model": "qwen"},
    ],
    load_balance="round_robin",  # or "weighted", "random", "fallback"
    fallback=True,               # Auto-switch on failure
)

# Requests automatically distributed across healthy endpoints
results = await pool.chat_completions_batch(messages_list, distribute=True)

6. Thinking Mode Support

Unified interface for reasoning models - DeepSeek-R1, Qwen3, Claude extended thinking, Gemini thinking.

result = await client.chat_completions(
    messages,
    thinking=True,      # Enable thinking
    return_raw=True,
)

# Unified parsing across all providers
parsed = client.parse_thoughts(result.data)
print("Thinking:", parsed["thought"])
print("Answer:", parsed["answer"])

Installation

pip install flexllm

# With caching support
pip install flexllm[cache]

# With CLI
pip install flexllm[cli]

# All features
pip install flexllm[all]

Quick Start

Single Request

from flexllm import LLMClient

client = LLMClient(
    model="gpt-4",
    base_url="https://api.openai.com/v1",
    api_key="your-api-key"
)

# Async
response = await client.chat_completions([
    {"role": "user", "content": "Hello!"}
])

# Sync
response = client.chat_completions_sync([
    {"role": "user", "content": "Hello!"}
])

Batch Processing with Checkpoint Recovery

from flexllm import LLMClient

client = LLMClient(
    model="gpt-4",
    base_url="https://api.openai.com/v1",
    api_key="your-api-key",
    concurrency_limit=50,
    max_qps=100,
)

messages_list = [
    [{"role": "user", "content": f"Question {i}"}]
    for i in range(10000)
]

# If interrupted, re-running resumes from where it stopped
results = await client.chat_completions_batch(
    messages_list,
    output_jsonl="results.jsonl",
    show_progress=True,
)

Response Caching

from flexllm import LLMClient, ResponseCacheConfig

client = LLMClient(
    model="gpt-4",
    base_url="https://api.openai.com/v1",
    api_key="your-api-key",
    cache=ResponseCacheConfig.with_ttl(3600),  # 1 hour TTL
)

# Duplicate requests hit cache automatically
result1 = await client.chat_completions(messages)  # API call
result2 = await client.chat_completions(messages)  # Cache hit (instant)

# Multi-process cache sharing (IPC mode - default)
cache = ResponseCacheConfig.ipc(ttl=86400)  # 24h, shared across processes

Streaming Response

async for chunk in client.chat_completions_stream(messages):
    print(chunk, end="", flush=True)

Multi-Modal (Vision)

from flexllm import MllmClient

client = MllmClient(
    base_url="https://api.openai.com/v1",
    api_key="your-api-key",
    model="gpt-4o",
)

messages = [
    {
        "role": "user",
        "content": [
            {"type": "text", "text": "What's in this image?"},
            {"type": "image_url", "image_url": {"url": "path/to/image.jpg"}}
        ]
    }
]

response = await client.call_llm([messages])

Load Balancing with Failover

from flexllm import LLMClientPool

pool = LLMClientPool(
    endpoints=[
        {"base_url": "http://host1:8000/v1", "api_key": "key1", "model": "qwen"},
        {"base_url": "http://host2:8000/v1", "api_key": "key2", "model": "qwen"},
    ],
    load_balance="round_robin",
    fallback=True,
)

# Single request with automatic failover
result = await pool.chat_completions(messages)

# Batch requests distributed across endpoints
results = await pool.chat_completions_batch(messages_list, distribute=True)

Gemini Client

from flexllm import GeminiClient

# Gemini Developer API
client = GeminiClient(
    model="gemini-2.0-flash",
    api_key="your-gemini-api-key"
)

# With thinking mode
response = await client.chat_completions(
    messages,
    thinking="high",  # False, True, "minimal", "low", "medium", "high"
)

# Vertex AI mode
client = GeminiClient(
    model="gemini-2.0-flash",
    project_id="your-project-id",
    location="us-central1",
    use_vertex_ai=True,
)

Claude Client

from flexllm import LLMClient, ClaudeClient

# Using unified LLMClient (recommended)
client = LLMClient(
    provider="claude",
    api_key="your-anthropic-key",
    model="claude-sonnet-4-20250514",
)

response = await client.chat_completions([
    {"role": "user", "content": "Hello, Claude!"}
])

# With extended thinking
result = await client.chat_completions(
    messages,
    thinking=True,
    return_raw=True,
)
parsed = client.parse_thoughts(result.data)

Function Calling (Tool Use)

from flexllm import LLMClient

client = LLMClient(
    base_url="https://api.openai.com/v1",
    api_key="your-api-key",
    model="gpt-4",
)

tools = [
    {
        "type": "function",
        "function": {
            "name": "get_weather",
            "description": "Get current weather for a location",
            "parameters": {
                "type": "object",
                "properties": {
                    "location": {"type": "string", "description": "City name"}
                },
                "required": ["location"]
            }
        }
    }
]

result = await client.chat_completions(
    messages=[{"role": "user", "content": "What's the weather in Tokyo?"}],
    tools=tools,
    return_usage=True,
)

if result.tool_calls:
    for tool_call in result.tool_calls:
        print(f"Function: {tool_call.function['name']}")
        print(f"Arguments: {tool_call.function['arguments']}")

CLI Usage

# Quick ask
flexllm ask "What is Python?"
flexllm ask "Explain this" -s "You are a code expert"
echo "long text" | flexllm ask "Summarize"

# Interactive chat
flexllm chat
flexllm chat --model=gpt-4 "Hello"

# Batch processing with checkpoint recovery
flexllm batch input.jsonl -o output.jsonl

# List models
flexllm models           # Remote models
flexllm list_models      # Configured models

# Test connection
flexllm test

# Initialize config
flexllm init

CLI Configuration

Create ~/.flexllm/config.yaml:

default: "gpt-4"

models:
  - id: gpt-4
    name: gpt-4
    provider: openai
    base_url: https://api.openai.com/v1
    api_key: your-api-key

  - id: local
    name: local-ollama
    provider: openai
    base_url: http://localhost:11434/v1
    api_key: EMPTY

Or use environment variables:

export FLEXLLM_BASE_URL="https://api.openai.com/v1"
export FLEXLLM_API_KEY="your-key"
export FLEXLLM_MODEL="gpt-4"

API Reference

LLMClient

LLMClient(
    provider: str = "auto",        # "auto", "openai", "gemini", "claude"
    model: str,                    # Model name
    base_url: str,                 # API base URL
    api_key: str = "EMPTY",        # API key
    cache: ResponseCacheConfig,    # Cache config
    concurrency_limit: int = 10,   # Max concurrent requests
    max_qps: float = None,         # Max requests per second
    retry_times: int = 3,          # Retry count on failure
    retry_delay: float = 1.0,      # Delay between retries
    timeout: int = 120,            # Request timeout (seconds)
)

Methods

Method Description
chat_completions(messages) Single async request
chat_completions_sync(messages) Single sync request
chat_completions_batch(messages_list) Batch async with checkpoint
chat_completions_batch_sync(messages_list) Batch sync with checkpoint
iter_chat_completions_batch(messages_list) Streaming batch results
chat_completions_stream(messages) Token-by-token streaming
parse_thoughts(response_data) Parse thinking content

ResponseCacheConfig

# Shortcuts
ResponseCacheConfig.with_ttl(3600)     # 1 hour TTL
ResponseCacheConfig.persistent()        # Never expire
ResponseCacheConfig.ipc(ttl=86400)      # Multi-process shared (default)
ResponseCacheConfig.local(ttl=86400)    # Single process only

# Full config
ResponseCacheConfig(
    enabled: bool = False,
    ttl: int = 86400,              # Time-to-live in seconds
    cache_dir: str = "~/.cache/flexllm/llm_response",
    use_ipc: bool = True,          # Multi-process cache sharing
)

Token Counting

from flexllm import count_tokens, estimate_cost, estimate_batch_cost

tokens = count_tokens("Hello world", model="gpt-4")
cost = estimate_cost(tokens, model="gpt-4", is_input=True)
total_cost = estimate_batch_cost(messages_list, model="gpt-4")

Architecture

LLMClient (Unified entry point)
    ├── OpenAIClient (OpenAI-compatible APIs)
    ├── GeminiClient (Google Gemini)
    └── ClaudeClient (Anthropic Claude)
            │
            └── LLMClientBase (Abstract base - 4 methods to implement)
                    │
                    ├── ConcurrentRequester (Async engine with QPS control)
                    ├── ResponseCache (FlaxKV2-based caching with IPC)
                    └── ImageProcessor (Multi-modal support)

LLMClientPool (Multi-endpoint load balancing)
    └── ProviderRouter (round_robin / weighted / random / fallback)

License

Apache 2.0

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

flexllm-0.3.1.tar.gz (123.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

flexllm-0.3.1-py3-none-any.whl (125.1 kB view details)

Uploaded Python 3

File details

Details for the file flexllm-0.3.1.tar.gz.

File metadata

  • Download URL: flexllm-0.3.1.tar.gz
  • Upload date:
  • Size: 123.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for flexllm-0.3.1.tar.gz
Algorithm Hash digest
SHA256 dae32cd4f0cca9fe3f882a2e32db00248433a33e58446b4a5a484c87c0b3bbff
MD5 62924e65a3f14368e83b9d182d02bb14
BLAKE2b-256 01e929d6fa7e4b08acacd5ad976e426f23b839c1265252c4f3648c25d74ab485

See more details on using hashes here.

Provenance

The following attestation bundles were made for flexllm-0.3.1.tar.gz:

Publisher: python-publish.yml on KenyonY/flexllm

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file flexllm-0.3.1-py3-none-any.whl.

File metadata

  • Download URL: flexllm-0.3.1-py3-none-any.whl
  • Upload date:
  • Size: 125.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for flexllm-0.3.1-py3-none-any.whl
Algorithm Hash digest
SHA256 6ce4262648a883f40e504f7a9d925ff45eea50f51f4ced9bee9461f1af52e919
MD5 dfea7802387b08df34267f4be107e0b3
BLAKE2b-256 0263aff30cca673dc457a8ffb7d29ae9b173d69ae356e852a8eed26249938965

See more details on using hashes here.

Provenance

The following attestation bundles were made for flexllm-0.3.1-py3-none-any.whl:

Publisher: python-publish.yml on KenyonY/flexllm

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page