Skip to main content

High-performance LLM client with batch processing, caching, and checkpoint recovery

Project description

flexllm

High-Performance LLM Client for Production
Batch processing with checkpoint recovery, response caching, load balancing, and cost tracking

PyPI version License pypi downloads


Why flexllm?

Built for production batch processing at scale.

from flexllm import LLMClient

client = LLMClient(base_url="https://api.openai.com/v1", model="gpt-4", api_key="...")

# Process 100k requests with automatic checkpoint recovery
# Interrupted at 50k? Just restart - it continues from 50,001
results = await client.chat_completions_batch(
    messages_list,
    output_jsonl="results.jsonl",  # Progress saved here
    show_progress=True,
    track_cost=True,  # Real-time cost display
)

Scale out across multiple endpoints with zero code change.

from flexllm import LLMClientPool

# Same API, multiple GPU nodes — faster endpoints automatically handle more tasks
pool = LLMClientPool(
    endpoints=[
        {"base_url": "http://gpu1:8000/v1", "model": "qwen", "concurrency_limit": 50},
        {"base_url": "http://gpu2:8000/v1", "model": "qwen", "concurrency_limit": 20},
        {"base_url": "http://gpu3:8000/v1", "model": "qwen"},
    ],
    fallback=True,  # Auto-switch on endpoint failure
)

results = await pool.chat_completions_batch(messages_list, output_jsonl="results.jsonl")

Features

Feature Description
Checkpoint Recovery Batch jobs auto-resume from interruption - process millions of requests safely
Multi-Endpoint Pool Distribute tasks across GPU nodes with shared-queue dynamic balancing and automatic failover
Response Caching Built-in caching with TTL and IPC multi-process sharing
Cost Tracking Real-time cost monitoring with budget control
High-Performance Async Fine-grained concurrency control, QPS limiting, and streaming
Multi-Provider Supports OpenAI-compatible APIs, Gemini, Claude

Installation

pip install flexllm

# With all features
pip install flexllm[all]

Claude Code Integration

Enable Claude Code to use flexllm for LLM API calls, batch processing, and more:

flexllm install-skill

After installation, Claude Code gains the ability to use flexllm across all your projects.


Quick Start

Basic Usage

from flexllm import LLMClient

# Recommended: use context manager for proper resource cleanup
async with LLMClient(
    model="gpt-4",
    base_url="https://api.openai.com/v1",
    api_key="your-api-key"
) as client:
    # Async call
    response = await client.chat_completions([
        {"role": "user", "content": "Hello!"}
    ])

# Sync version (also supports context manager)
with LLMClient(model="gpt-4", base_url="...", api_key="...") as client:
    response = client.chat_completions_sync([
        {"role": "user", "content": "Hello!"}
    ])

# Get token usage
result = await client.chat_completions(
    messages=[{"role": "user", "content": "Hello!"}],
    return_usage=True,  # Returns ChatCompletionResult with usage info
)
print(f"Tokens: {result.usage}")  # {'prompt_tokens': 10, 'completion_tokens': 5, ...}

Batch Processing with Checkpoint Recovery

Process millions of requests safely. If interrupted, just restart - it continues from where it left off.

messages_list = [
    [{"role": "user", "content": f"Question {i}"}]
    for i in range(100000)
]

# Interrupted at 50,000? Re-run and it continues from 50,001.
results = await client.chat_completions_batch(
    messages_list,
    output_jsonl="results.jsonl",  # Progress saved here
    show_progress=True,
)

Multi-Endpoint Pool

Distribute batch tasks across multiple GPU nodes / API endpoints. Faster endpoints automatically handle more tasks via a shared queue model, with automatic failover and health monitoring.

LLMClient and LLMClientPool share the same API. Single endpoint → use LLMClient; multiple endpoints → use LLMClientPool.

from flexllm import LLMClientPool

pool = LLMClientPool(
    endpoints=[
        # Each endpoint can have independent rate limits
        {"base_url": "http://gpu1:8000/v1", "model": "qwen", "concurrency_limit": 50, "max_qps": 100},
        {"base_url": "http://gpu2:8000/v1", "model": "qwen", "concurrency_limit": 20, "max_qps": 50},
        {"base_url": "http://gpu3:8000/v1", "model": "qwen"},
    ],
    fallback=True,               # Auto-switch on endpoint failure
    failure_threshold=3,         # Mark unhealthy after 3 consecutive failures
    recovery_time=60.0,          # Try to recover after 60 seconds
)

# Single request — automatic failover across endpoints
result = await pool.chat_completions(messages)

# Distributed batch — shared queue, dynamic load balancing, checkpoint recovery
results = await pool.chat_completions_batch(
    messages_list,
    distribute=True,
    output_jsonl="results.jsonl",
    track_cost=True,
)

# Streaming with failover
async for chunk in pool.chat_completions_stream(messages):
    print(chunk, end="", flush=True)

Highlights:

  • Shared Queue: Faster endpoints automatically pull more tasks — no manual tuning needed
  • Automatic Failover: Failed requests retry on healthy endpoints; unhealthy nodes auto-recover
  • Per-Endpoint Config: Independent concurrency_limit and max_qps for each endpoint
  • Full Feature Support: Checkpoint recovery, caching, cost tracking all work with Pool

Response Caching

from flexllm import LLMClient, ResponseCacheConfig

client = LLMClient(
    model="gpt-4",
    base_url="https://api.openai.com/v1",
    api_key="your-api-key",
    cache=ResponseCacheConfig(enabled=True, ttl=3600),  # 1 hour TTL
)

# First call: API request (~2s, ~$0.01)
result1 = await client.chat_completions(messages)

# Second call: Cache hit (~0.001s, $0)
result2 = await client.chat_completions(messages)

Cost Tracking

# Track costs during batch processing
results, cost_report = await client.chat_completions_batch(
    messages_list,
    return_cost_report=True,
)
print(f"Total cost: ${cost_report.total_cost:.4f}")

# Real-time cost display in progress bar
results = await client.chat_completions_batch(
    messages_list,
    track_cost=True,  # Shows 💰 $0.0012 in progress bar
)

Streaming

# Token-by-token streaming
async for chunk in client.chat_completions_stream(messages):
    print(chunk, end="", flush=True)

# Batch streaming - process results as they complete
async for result in client.iter_chat_completions_batch(messages_list):
    process(result)

Thinking Mode (Reasoning Models)

Unified interface for DeepSeek-R1, Qwen3, Claude extended thinking, Gemini thinking.

result = await client.chat_completions(
    messages,
    thinking=True,      # Enable thinking
    return_raw=True,
)

# Unified parsing across all providers
parsed = client.parse_thoughts(result.data)
print("Thinking:", parsed["thought"])
print("Answer:", parsed["answer"])

Tool Calls (Function Calling)

tools = [{
    "type": "function",
    "function": {
        "name": "get_weather",
        "description": "Get weather information",
        "parameters": {
            "type": "object",
            "properties": {"location": {"type": "string"}},
            "required": ["location"],
        },
    },
}]

result = await client.chat_completions(
    messages=[{"role": "user", "content": "What's the weather in Tokyo?"}],
    tools=tools,
    return_usage=True,
)

if result.tool_calls:
    for call in result.tool_calls:
        print(f"Call: {call.function['name']}({call.function['arguments']})")

CLI

# Quick ask
flexllm ask "What is Python?"

# Interactive chat
flexllm chat

# Batch processing with cost tracking
flexllm batch input.jsonl -o output.jsonl --track-cost
flexllm batch input.jsonl -o output.jsonl -n 5           # First 5 records only
flexllm batch data.jsonl -o out.jsonl -uf text -sf sys   # Custom field names

# Model management
flexllm list              # Configured models
flexllm models            # Remote available models
flexllm set-model gpt-4   # Set default model
flexllm test              # Test connection
flexllm init              # Initialize config file

# Utilities
flexllm pricing gpt-4     # Query model pricing
flexllm credits           # Check API key balance
flexllm mock              # Start mock LLM server for testing

Configuration

Config file location: ~/.flexllm/config.yaml

# Default model
default: "gpt-4"

# Model list
models:
  - id: gpt-4
    name: gpt-4
    provider: openai
    base_url: https://api.openai.com/v1
    api_key: your-api-key

  - id: local-ollama
    name: local-ollama
    provider: openai
    base_url: http://localhost:11434/v1
    api_key: EMPTY

# Batch command config (optional)
batch:
  concurrency: 20
  cache: true
  track_cost: true

Environment variables (higher priority than config file):

  • FLEXLLM_BASE_URL / OPENAI_BASE_URL
  • FLEXLLM_API_KEY / OPENAI_API_KEY
  • FLEXLLM_MODEL / OPENAI_MODEL

Architecture

flexllm/
├── clients/           # All client implementations
│   ├── base.py        # Abstract base class (LLMClientBase)
│   ├── llm.py         # Unified entry point (LLMClient)
│   ├── openai.py      # OpenAI-compatible backend
│   ├── gemini.py      # Google Gemini backend
│   ├── claude.py      # Anthropic Claude backend
│   ├── pool.py        # Multi-endpoint load balancer
│   └── router.py      # Provider routing strategies
├── pricing/           # Cost estimation and tracking
│   ├── cost_tracker.py
│   └── token_counter.py
├── cache/             # Response caching with IPC
├── async_api/         # High-performance async engine
└── msg_processors/    # Multi-modal message processing

The architecture follows a simple layered design:

LLMClient (single endpoint)  /  LLMClientPool (multi-endpoint)
    │                                  │
    │                                  ├── ProviderRouter (round_robin)
    │                                  ├── Health Monitor (failure threshold + auto recovery)
    │                                  └── Shared Task Queue (dynamic load balancing)
    │                                  │
    └──────────── Backend Clients ─────┘
                    ├── OpenAIClient
                    ├── GeminiClient
                    └── ClaudeClient
                            │
                            └── LLMClientBase (Abstract - 4 methods to implement)
                                    │
                                    ├── ConcurrentRequester (Async engine)
                                    ├── ResponseCache (Caching layer)
                                    └── CostTracker (Cost monitoring)

API Reference

LLMClient

LLMClient(
    provider: str = "auto",        # "auto", "openai", "gemini", "claude"
    model: str,                    # Model name
    base_url: str = None,          # API base URL (required for openai)
    api_key: str = "EMPTY",        # API key
    cache: ResponseCacheConfig,    # Cache config
    concurrency_limit: int = 10,   # Max concurrent requests
    max_qps: float = None,         # Max requests per second
    retry_times: int = 3,          # Retry count on failure
    timeout: int = 120,            # Request timeout (seconds)
)

Main Methods

Method Description
chat_completions(messages) Single async request
chat_completions_sync(messages) Single sync request
chat_completions_batch(messages_list) Batch async with checkpoint
iter_chat_completions_batch(messages_list) Streaming batch results
chat_completions_stream(messages) Token-by-token streaming

License

Apache 2.0

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

flexllm-0.5.0.tar.gz (169.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

flexllm-0.5.0-py3-none-any.whl (157.5 kB view details)

Uploaded Python 3

File details

Details for the file flexllm-0.5.0.tar.gz.

File metadata

  • Download URL: flexllm-0.5.0.tar.gz
  • Upload date:
  • Size: 169.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for flexllm-0.5.0.tar.gz
Algorithm Hash digest
SHA256 de52d130df56a1593265679b2c0ce88f7fd109684c799c4cffa2d4b68f0f7aa7
MD5 00f28e283217498e3e3c91ee75d3b0d7
BLAKE2b-256 96b43c951c43ba13397ab89938df7c82e77979532ea0114fa1a791384b16d41b

See more details on using hashes here.

Provenance

The following attestation bundles were made for flexllm-0.5.0.tar.gz:

Publisher: python-publish.yml on KenyonY/flexllm

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file flexllm-0.5.0-py3-none-any.whl.

File metadata

  • Download URL: flexllm-0.5.0-py3-none-any.whl
  • Upload date:
  • Size: 157.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for flexllm-0.5.0-py3-none-any.whl
Algorithm Hash digest
SHA256 2b4829ed6fcabca9d36daecaea0a731c302363ca05bbee7e0e389d0cb0e32fcc
MD5 d11d56006448af67377f1677ad09d145
BLAKE2b-256 16778368ade00f52c35ccba3a0f3de2e5e23ce3a3e4bae13348d18b64689b1a9

See more details on using hashes here.

Provenance

The following attestation bundles were made for flexllm-0.5.0-py3-none-any.whl:

Publisher: python-publish.yml on KenyonY/flexllm

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page