Skip to main content

High-throughput scheduler & resilience toolkit for Azure OpenAI endpoints (queueing, rate-limit smoothing, backoff, tracing).

Project description

azure-openai-blaster

Python versions License: MIT Status

High-throughput, multi-endpoint scheduler & resilience layer for Azure OpenAI (queueing, rate-limit backoff, endpoint health, weighted routing).

azure-openai-blaster lets you fan out chat completion traffic across multiple Azure OpenAI deployments while smoothing rate limits, backing off on transient errors, and auto-disabling unhealthy endpoints — all behind a simple, OpenAI-like API.


✨ Features

  • Multi-endpoint routing: Weighted round-robin across any number of deployments.
  • Automatic cooldown & backoff: Exponential backoff for transient timeouts; header/message–derived cooldown for rate limits.
  • Endpoint health tracking: Consecutive transient failures trigger auto-disable (with reason preserved).
  • Unified sync / future API: chat_completion() (blocking) or submit_chat_completion() (returns Future[str]).
  • Streaming support: Pass stream=True to assemble a streamed completion into a final string transparently.
  • Flexible auth: API key or credential-based (default, az CLI, or interactive browser) selection per deployment.
  • Structured error stats: Snapshot endpoint state via AzureEndpointState.report().
  • Minimal dependencies: Only openai + azure-identity.
  • Config-first: Simple JSON/YAML→dict config to spin up workers fast.
  • Threaded workers: Background queue; specify worker count for throughput.

📦 Installation

pip install azure_openai_blaster

Requires Python ≥ 3.11.


🚀 Quick Start

from azure_openai_blaster import AzureLLMBlaster
import concurrent.futures

config = {
  "strategy": "weighted",  # currently only 'weighted' implemented
  "deployments": [
    {
      "name": "gpt-4o",
      "endpoint": "https://my-aoai-resource.openai.azure.com/",
      "api_key": "YOUR_KEY",          # or "default" / "az" / "interactive"
      "model": "gpt-4o",
      "weight": 2,
      "temperature": 0.2,
      "max_completion_tokens": 512
    },
    {
      "name": "gpt-4o-backup",
      "endpoint": "https://my-aoai-resource-2.openai.azure.com/",
      "api_key": "default",
      "model": "gpt-4o-mini",
      "weight": 1
    }
  ],
  # Optional runtime overrides:
  # "num_workers": 16,
  # "max_job_retry": 5,
  # "worker_polling_interval": 0.5
}

blaster = AzureLLMBlaster.from_config(config, num_workers=12)

messages = [
  {"role": "system", "content": "You are concise."},
  {"role": "user", "content": "Summarize why backoff matters."},
]

# Blocking call (single request)
text = blaster.chat_completion(messages, temperature=0.1)
print(text)

# Or use the built-in future API (submit + later wait)
future = blaster.submit_chat_completion(messages)
result = future.result(timeout=30)

# Blocking calls executed concurrently via ThreadPoolExecutor
prompts = [
  "Summarize why backoff matters.",
  "Explain weighted round robin scheduling.",
  "List reasons endpoints get temporarily disabled.",
]

with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:  # choose desired concurrency
  futures = [
    executor.submit(
      blaster.chat_completion,
      [
        {"role": "system", "content": "You are concise."},
        {"role": "user", "content": prompt},
      ],
      temperature=0.1,
    )
    for prompt in prompts
  ]
  for f in concurrent.futures.as_completed(futures):
    print(f.result())


blaster.close()

🧩 Config Schema

Each entry in deployments maps to AzureDeploymentConfig:

Field Required Description
name yes Identifier for logs/metrics
endpoint yes Base Azure OpenAI endpoint URL
api_key yes Key string or "default", "az", "interactive" for credential auth
model yes Deployed model name
api_version no (default: 2025-01-01-preview) API version
weight no (default: 1) Weighted share in round-robin
temperature no (default: 1.0) Per-endpoint temperature
max_completion_tokens no Cap on generation size
rpm_limit future Reserved; not enforced yet
tpm_limit future Reserved; not enforced yet

Top-level optional fields (fallback to constructor defaults): num_workers, max_job_retry, worker_polling_interval.

strategy is reserved; currently only weighted round-robin scheduler is active.


🔄 Scheduling & Resilience

  • Weighted Round Robin: Endpoint appears in internal ring weight times; random initial shuffle.
  • Cooldown Handling: On RateLimitError, parses Retry-After header or message (fallback 15s); endpoint excluded until timestamp passes.
  • Transient Failures: APITimeoutError triggers exponential backoff: base * 2^(failure_streak-1).
  • Auto-Disable: After N consecutive transient failures (auto_disable_threshold=5), endpoint disabled with reason.
  • Retry Logic: Jobs retried up to max_job_retry if marked retryable; otherwise exception surfaces via the future/result.

📡 Streaming

text = blaster.chat_completion(messages, stream=True)

Internally collects streamed deltas into a single string. (Incremental callback API not yet implemented.)


🧪 Advanced Usage

Direct programmatic setup (bypass config dict):

from azure_openai_blaster import (
  AzureLLMBlaster, AzureDeploymentConfig, build_endpoint_states
)

cfgs = [
  AzureDeploymentConfig(
    name="primary",
    endpoint="https://...",
    api_key="default",
    model="gpt-4o",
    weight=3,
    temperature=0.2,
  ),
  AzureDeploymentConfig(
    name="backup",
    endpoint="https://...",
    api_key="YOUR_KEY",
    model="gpt-4o-mini",
    weight=1,
  ),
]

states = build_endpoint_states({"deployments": [c.__dict__ for c in cfgs]})
blaster = AzureLLMBlaster(endpoints=states, num_workers=10)

Inspect endpoint health:

for state in states:
  print(state.report())

⚠️ Limitations / Roadmap

  • rpm_limit / tpm_limit not enforced yet.
  • Single scheduling strategy.
  • No async interface (threaded only).
  • No partial-stream callback surface.
  • No metrics export integration (you can poll .report() manually).

🧪 Testing / Dev

git clone https://github.com/jinu-jang/aoai-blaster
cd aoai-blaster
pip install -e ".[dev]"

🤝 Contributing

Pre-alpha; feedback & PRs welcome.

  1. Fork & branch
  2. Add/adjust tests
  3. Maintain formatting (black, isort)
  4. Conventional commits preferred

📄 License

MIT © 2025 Jinu Jang.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

azure_openai_blaster-0.1.0.tar.gz (14.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

azure_openai_blaster-0.1.0-py3-none-any.whl (16.6 kB view details)

Uploaded Python 3

File details

Details for the file azure_openai_blaster-0.1.0.tar.gz.

File metadata

  • Download URL: azure_openai_blaster-0.1.0.tar.gz
  • Upload date:
  • Size: 14.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.7

File hashes

Hashes for azure_openai_blaster-0.1.0.tar.gz
Algorithm Hash digest
SHA256 a2eb161deab7037c8f3323a1fe131881f2f1e4a8d53f12b3d44c3a6af0ef1fe1
MD5 7a46fa6d626a4269e3ef6003780d0758
BLAKE2b-256 df3ee310a03bc5498a00461984ecf38342c4d98bff9026cfe2dfb93ee1f5de21

See more details on using hashes here.

File details

Details for the file azure_openai_blaster-0.1.0-py3-none-any.whl.

File metadata

File hashes

Hashes for azure_openai_blaster-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 acf4e15d0752973886306daeeb53f36059b4abdb14a053d111ce9ee349eec250
MD5 f559adc07ff654dddbd1d243cf6657ca
BLAKE2b-256 d7a100c2d65b49e5d58b7e8ab38380ad26da069283e28cfcea9d52ea7ef56a02

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page