Skip to main content

High-performance LLM client with batch processing, caching, and checkpoint recovery

Project description

flexllm

High-Performance LLM Client for Production
Batch processing with checkpoint recovery, response caching, load balancing, and cost tracking

PyPI version License pypi downloads


Why flexllm?

Built for production batch processing at scale.

from flexllm import LLMClient

client = LLMClient(base_url="https://api.openai.com/v1", model="gpt-4", api_key="...")

# Process 100k requests with automatic checkpoint recovery
# Interrupted at 50k? Just restart - it continues from 50,001
results = await client.chat_completions_batch(
    messages_list,
    output_jsonl="results.jsonl",  # Progress saved here
    show_progress=True,
    track_cost=True,  # Real-time cost display
)

Scale out across multiple endpoints with zero code change.

from flexllm import LLMClientPool

# Same API, multiple GPU nodes — faster endpoints automatically handle more tasks
pool = LLMClientPool(
    endpoints=[
        {"base_url": "http://gpu1:8000/v1", "model": "qwen", "concurrency_limit": 50},
        {"base_url": "http://gpu2:8000/v1", "model": "qwen", "concurrency_limit": 20},
        {"base_url": "http://gpu3:8000/v1", "model": "qwen"},
    ],
    fallback=True,  # Auto-switch on endpoint failure
)

results = await pool.chat_completions_batch(messages_list, output_jsonl="results.jsonl")

Features

Feature Description
Checkpoint Recovery Batch jobs auto-resume from interruption - process millions of requests safely
Multi-Endpoint Pool Distribute tasks across GPU nodes with shared-queue dynamic balancing and automatic failover
Response Caching Built-in caching with TTL and IPC multi-process sharing
Cost Tracking Real-time cost monitoring with budget control
High-Performance Async Fine-grained concurrency control, QPS limiting, and streaming
Multi-Provider Supports OpenAI-compatible APIs, Gemini, Claude

Installation

pip install flexllm

# With all features
pip install flexllm[all]

Claude Code Integration

Enable Claude Code to use flexllm for LLM API calls, batch processing, and more:

flexllm install-skill

After installation, Claude Code gains the ability to use flexllm across all your projects.


Quick Start

Basic Usage

from flexllm import LLMClient

# Recommended: use context manager for proper resource cleanup
async with LLMClient(
    model="gpt-4",
    base_url="https://api.openai.com/v1",
    api_key="your-api-key"
) as client:
    # Async call
    response = await client.chat_completions([
        {"role": "user", "content": "Hello!"}
    ])

# Sync version (also supports context manager)
with LLMClient(model="gpt-4", base_url="...", api_key="...") as client:
    response = client.chat_completions_sync([
        {"role": "user", "content": "Hello!"}
    ])

# Get token usage
result = await client.chat_completions(
    messages=[{"role": "user", "content": "Hello!"}],
    return_usage=True,  # Returns ChatCompletionResult with usage info
)
print(f"Tokens: {result.usage}")  # {'prompt_tokens': 10, 'completion_tokens': 5, ...}

Batch Processing with Checkpoint Recovery

Process millions of requests safely. If interrupted, just restart - it continues from where it left off.

messages_list = [
    [{"role": "user", "content": f"Question {i}"}]
    for i in range(100000)
]

# Interrupted at 50,000? Re-run and it continues from 50,001.
results = await client.chat_completions_batch(
    messages_list,
    output_jsonl="results.jsonl",  # Progress saved here
    show_progress=True,
)

Multi-Endpoint Pool

Distribute batch tasks across multiple GPU nodes / API endpoints. Faster endpoints automatically handle more tasks via a shared queue model, with automatic failover and health monitoring.

LLMClient and LLMClientPool share the same API. Single endpoint → use LLMClient; multiple endpoints → use LLMClientPool.

from flexllm import LLMClientPool

pool = LLMClientPool(
    endpoints=[
        # Each endpoint can have independent rate limits
        {"base_url": "http://gpu1:8000/v1", "model": "qwen", "concurrency_limit": 50, "max_qps": 100},
        {"base_url": "http://gpu2:8000/v1", "model": "qwen", "concurrency_limit": 20, "max_qps": 50},
        {"base_url": "http://gpu3:8000/v1", "model": "qwen"},
    ],
    fallback=True,               # Auto-switch on endpoint failure
    failure_threshold=3,         # Mark unhealthy after 3 consecutive failures
    recovery_time=60.0,          # Try to recover after 60 seconds
)

# Single request — automatic failover across endpoints
result = await pool.chat_completions(messages)

# Distributed batch — shared queue, dynamic load balancing, checkpoint recovery
results = await pool.chat_completions_batch(
    messages_list,
    distribute=True,
    output_jsonl="results.jsonl",
    track_cost=True,
)

# Streaming with failover
async for chunk in pool.chat_completions_stream(messages):
    print(chunk, end="", flush=True)

Highlights:

  • Shared Queue: Faster endpoints automatically pull more tasks — no manual tuning needed
  • Automatic Failover: Failed requests retry on healthy endpoints; unhealthy nodes auto-recover
  • Per-Endpoint Config: Independent concurrency_limit and max_qps for each endpoint
  • Full Feature Support: Checkpoint recovery, caching, cost tracking all work with Pool

Response Caching

from flexllm import LLMClient, ResponseCacheConfig

client = LLMClient(
    model="gpt-4",
    base_url="https://api.openai.com/v1",
    api_key="your-api-key",
    cache=ResponseCacheConfig(enabled=True, ttl=3600),  # 1 hour TTL
)

# First call: API request (~2s, ~$0.01)
result1 = await client.chat_completions(messages)

# Second call: Cache hit (~0.001s, $0)
result2 = await client.chat_completions(messages)

Cost Tracking

# Track costs during batch processing
results, cost_report = await client.chat_completions_batch(
    messages_list,
    return_cost_report=True,
)
print(f"Total cost: ${cost_report.total_cost:.4f}")

# Real-time cost display in progress bar
results = await client.chat_completions_batch(
    messages_list,
    track_cost=True,  # Shows 💰 $0.0012 in progress bar
)

Streaming

# Token-by-token streaming
async for chunk in client.chat_completions_stream(messages):
    print(chunk, end="", flush=True)

# Batch streaming - process results as they complete
async for result in client.iter_chat_completions_batch(messages_list):
    process(result)

Thinking Mode (Reasoning Models)

Unified interface for DeepSeek-R1, Qwen3, Claude extended thinking, Gemini thinking.

result = await client.chat_completions(
    messages,
    thinking=True,      # Enable thinking
    return_raw=True,
)

# Unified parsing across all providers
parsed = client.parse_thoughts(result.data)
print("Thinking:", parsed["thought"])
print("Answer:", parsed["answer"])

Tool Calls (Function Calling)

tools = [{
    "type": "function",
    "function": {
        "name": "get_weather",
        "description": "Get weather information",
        "parameters": {
            "type": "object",
            "properties": {"location": {"type": "string"}},
            "required": ["location"],
        },
    },
}]

result = await client.chat_completions(
    messages=[{"role": "user", "content": "What's the weather in Tokyo?"}],
    tools=tools,
    return_usage=True,
)

if result.tool_calls:
    for call in result.tool_calls:
        print(f"Call: {call.function['name']}({call.function['arguments']})")

CLI

# Quick ask
flexllm ask "What is Python?"

# Interactive chat
flexllm chat

# Batch processing with cost tracking
flexllm batch input.jsonl -o output.jsonl --track-cost
flexllm batch input.jsonl -o output.jsonl -n 5           # First 5 records only
flexllm batch data.jsonl -o out.jsonl -uf text -sf sys   # Custom field names

# Model management
flexllm list              # Configured models
flexllm models            # Remote available models
flexllm set-model gpt-4   # Set default model
flexllm test              # Test connection
flexllm init              # Initialize config file

# Utilities
flexllm pricing gpt-4     # Query model pricing
flexllm credits           # Check API key balance
flexllm mock              # Start mock LLM server for testing

Configuration

Config file location: ~/.flexllm/config.yaml

# Default model
default: "gpt-4"

# Model list
models:
  - id: gpt-4
    name: gpt-4
    provider: openai
    base_url: https://api.openai.com/v1
    api_key: your-api-key

  - id: local-ollama
    name: local-ollama
    provider: openai
    base_url: http://localhost:11434/v1
    api_key: EMPTY

# Batch command config (optional)
batch:
  concurrency: 20
  cache: true
  track_cost: true

Environment variables (higher priority than config file):

  • FLEXLLM_BASE_URL / OPENAI_BASE_URL
  • FLEXLLM_API_KEY / OPENAI_API_KEY
  • FLEXLLM_MODEL / OPENAI_MODEL

Architecture

flexllm/
├── clients/           # All client implementations
│   ├── base.py        # Abstract base class (LLMClientBase)
│   ├── llm.py         # Unified entry point (LLMClient)
│   ├── openai.py      # OpenAI-compatible backend
│   ├── gemini.py      # Google Gemini backend
│   ├── claude.py      # Anthropic Claude backend
│   ├── pool.py        # Multi-endpoint load balancer
│   └── router.py      # Provider routing strategies
├── pricing/           # Cost estimation and tracking
│   ├── cost_tracker.py
│   └── token_counter.py
├── cache/             # Response caching with IPC
├── async_api/         # High-performance async engine
└── msg_processors/    # Multi-modal message processing

The architecture follows a simple layered design:

LLMClient (single endpoint)  /  LLMClientPool (multi-endpoint)
    │                                  │
    │                                  ├── ProviderRouter (round_robin)
    │                                  ├── Health Monitor (failure threshold + auto recovery)
    │                                  └── Shared Task Queue (dynamic load balancing)
    │                                  │
    └──────────── Backend Clients ─────┘
                    ├── OpenAIClient
                    ├── GeminiClient
                    └── ClaudeClient
                            │
                            └── LLMClientBase (Abstract - 4 methods to implement)
                                    │
                                    ├── ConcurrentRequester (Async engine)
                                    ├── ResponseCache (Caching layer)
                                    └── CostTracker (Cost monitoring)

API Reference

LLMClient

LLMClient(
    provider: str = "auto",        # "auto", "openai", "gemini", "claude"
    model: str,                    # Model name
    base_url: str = None,          # API base URL (required for openai)
    api_key: str = "EMPTY",        # API key
    cache: ResponseCacheConfig,    # Cache config
    concurrency_limit: int = 10,   # Max concurrent requests
    max_qps: float = None,         # Max requests per second
    retry_times: int = 3,          # Retry count on failure
    timeout: int = 120,            # Request timeout (seconds)
)

Main Methods

Method Description
chat_completions(messages) Single async request
chat_completions_sync(messages) Single sync request
chat_completions_batch(messages_list) Batch async with checkpoint
iter_chat_completions_batch(messages_list) Streaming batch results
chat_completions_stream(messages) Token-by-token streaming

License

Apache 2.0

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

flexllm-0.5.1.tar.gz (170.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

flexllm-0.5.1-py3-none-any.whl (157.7 kB view details)

Uploaded Python 3

File details

Details for the file flexllm-0.5.1.tar.gz.

File metadata

  • Download URL: flexllm-0.5.1.tar.gz
  • Upload date:
  • Size: 170.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for flexllm-0.5.1.tar.gz
Algorithm Hash digest
SHA256 d63b5c7e89677c8dc04c1f9e0d49a3fdaf83ae24c34f847cfdf1e2cce8d0ff7f
MD5 3e89fd02db547a523c18c39eb7954007
BLAKE2b-256 60333a4aca993e0a007a1f741233ec5fb1e53d2b9b03a1c527e1290626a85cf4

See more details on using hashes here.

Provenance

The following attestation bundles were made for flexllm-0.5.1.tar.gz:

Publisher: python-publish.yml on KenyonY/flexllm

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file flexllm-0.5.1-py3-none-any.whl.

File metadata

  • Download URL: flexllm-0.5.1-py3-none-any.whl
  • Upload date:
  • Size: 157.7 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for flexllm-0.5.1-py3-none-any.whl
Algorithm Hash digest
SHA256 32e80ee67e9a3a811fedd0ac178462982d1e3602fb56a105db2e27d67d5935cf
MD5 dd12c1043e33da0ddad471ec4a39745d
BLAKE2b-256 ddb3522f0d8d721981eaaa97c733226d0b1502781c655be3f86bef4934bd7369

See more details on using hashes here.

Provenance

The following attestation bundles were made for flexllm-0.5.1-py3-none-any.whl:

Publisher: python-publish.yml on KenyonY/flexllm

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page