Multiple API Key Manager (Next Generation, sqlalchemy_mate free)
Project description
apipool-ng
apipool-ng is a next-generation Multiple API Key Manager.
It allows developers to manage multiple API keys simultaneously. For example,
if a single API key has 1000/day quota, you can register 10 API keys and let
apipool-ng automatically rotate them.
Features
-
Automatically rotate API keys across multiple credentials.
-
Built-in usage statistics, searchable by time, status, and apikey.
Stats collector can be deployed on any relational database (SQLite by default).
-
Clean API, minimal code required to implement complex features.
-
Async support —
adummyclientprovides full async chain-proxy withawait. -
Server SDK mode — connect to an
apipool-serverinstance and proxy all APIcalls through it, with transparent key rotation and zero local key material.
-
Dynamic scaling —
DynamicKeyManagerauto-refreshes keys from server,expanding or shrinking the pool in real time.
-
Lightweight dependencies —
sqlalchemy+httpx.
Installation
pip install apipool-ng
Quick Start
Library Mode (Local Keys)
Implement an ApiKey subclass, then create an ApiKeyManager:
from apipool import ApiKey, ApiKeyManager
class MyApiKey(ApiKey):
def __init__(self, key):
self.key = key
def get_primary_key(self):
return self.key
def create_client(self):
return MyApiClient(api_key=self.key)
def test_usability(self, client):
return client.test_connection()
apikeys = [MyApiKey(k) for k in ["key1", "key2", "key3"]]
manager = ApiKeyManager(apikey_list=apikeys)
manager.check_usable()
# Synchronous calls — keys auto-rotate
result = manager.dummyclient.some_api_method(arg)
Server SDK Mode (Remote Proxy)
Connect to an apipool-server instance — no API keys stored locally:
import os
from apipool import connect, login
# 1. Authenticate
tokens = login(
service_url="http://localhost:8000",
username="alice",
password="password",
)
# 2. Connect to a pool
manager = connect(
service_url="http://localhost:8000",
pool_identifier="google-geocoding",
auth_token=tokens["access_token"],
)
# 3. Use exactly like library mode
result = manager.dummyclient.geocode("1600 Amphitheatre Parkway")
Fetch Raw Keys from Server
Retrieve decrypted keys and build a local ApiKeyManager:
from apipool import login, get_keys, ApiKeyManager
tokens = login("http://localhost:8000", "alice", "password")
raw_keys = get_keys(
service_url="http://localhost:8000",
pool_identifier="coingecko",
auth_token=tokens["access_token"],
)
# raw_keys = ["CG-xxx", "CG-yyy", ...]
DummyClient & AsyncDummyClient
Use manager.dummyclient (sync) or manager.adummyclient (async) just like
your original API client. Under the hood, they automatically select a usable
key, record usage events, and rotate keys on rate-limit errors.
# Sync — use dummyclient
result = manager.dummyclient.some_method()
# Async — use adummyclient
result = await manager.adummyclient.some_method()
# Multi-level attribute chains are supported natively
result = await manager.adummyclient.coins.simple.price.get(ids="bitcoin")
Rate Limit Handling
Specify the exception type that indicates rate-limit exhaustion:
from apipool import ApiKeyManager
manager = ApiKeyManager(
apikey_list=apikeys,
reach_limit_exc=RateLimitError, # auto-rotate on this exception
)
When a call raises reach_limit_exc, the key is automatically removed from
the pool and the exception is re-raised. When all keys are exhausted,
PoolExhaustedError is raised on the next call.
from apipool import ApiKeyManager, PoolExhaustedError
try:
result = manager.dummyclient.some_method()
except PoolExhaustedError:
print("All API keys exhausted")
Server SDK Mode (Full API)
| Function | Sync | Async |
|---|---|---|
| Authenticate | login(service_url, username, password) | await alogin(...) |
| Connect to pool | connect(service_url, pool_identifier, auth_token) | await async_connect(...) |
| Fetch raw keys | get_keys(service_url, pool_identifier, auth_token) | await aget_keys(...) |
Async example:
from apipool import async_connect, alogin
tokens = await alogin("http://localhost:8000", "alice", "password")
manager = await async_connect(
service_url="http://localhost:8000",
pool_identifier="coingecko",
auth_token=tokens["access_token"],
)
result = await manager.adummyclient.coins.simple.price.get(ids="bitcoin")
Dynamic Key Manager (Auto-Refresh)
DynamicKeyManager extends ApiKeyManager with a background thread that
periodically fetches the latest key list from apipool-server and reconciles
the local pool — adding new keys and removing deleted ones automatically.
Sync: DynamicKeyManager
from apipool import DynamicKeyManager, get_keys, login
tokens = login("http://localhost:8000", "alice", "password")
manager = DynamicKeyManager(
key_fetcher=lambda: get_keys(
service_url="http://localhost:8000",
pool_identifier="coingecko",
auth_token=tokens["access_token"],
),
api_key_factory=lambda raw_key: CoinGeckoApiKey(raw_key),
refresh_interval=120, # seconds
on_keys_added=lambda keys: print(f"Added: {keys}"),
on_keys_removed=lambda keys: print(f"Removed: {keys}"),
)
# Use like a normal ApiKeyManager
result = manager.dummyclient.ping()
print(f"Pool size: {manager.pool_size}")
# Graceful shutdown
manager.shutdown()
Async: AsyncDynamicKeyManager
from apipool import AsyncDynamicKeyManager, aget_keys, alogin
tokens = await alogin("http://localhost:8000", "alice", "password")
manager = AsyncDynamicKeyManager(
key_fetcher=lambda: aget_keys(
service_url="http://localhost:8000",
pool_identifier="coingecko",
auth_token=tokens["access_token"],
),
api_key_factory=lambda raw_key: AsyncCoinGeckoKey(raw_key),
refresh_interval=120,
)
await manager.astart() # initial fetch + auto-refresh task
result = await manager.adummyclient.coins.simple.price.get(ids="bitcoin")
print(f"Pool size: {manager.pool_size}")
await manager.ashutdown()
How It Works
| Step | Description |
|---|---|
| 1. Fetch | Call key_fetcher() to get the latest list[str] from the server |
| 2. Diff | Compare server keys vs. local active + archived keys |
| 3. Add | New keys → create via api_key_factory → add to pool |
| 4. Restore | Archived keys that reappear on server → restore to active pool |
| 5. Remove | Keys gone from server → remove from active pool to archive |
| 6. Callback | on_keys_added / on_keys_removed fired if keys changed |
Parameters
| Parameter | Type | Description |
|---|---|---|
| key_fetcher | Callable[[], list[str]] | Returns current raw keys from server |
| api_key_factory | Callable[[str], ApiKey] | Converts raw key to ApiKey instance |
| refresh_interval | float | Seconds between refreshes (default 60) |
| on_keys_added | Callable[[list[str]], None] | Callback after keys are added |
| on_keys_removed | Callable[[list[str]], None] | Callback after keys are removed |
| config_fetcher | Callable[[], PoolConfig] | Returns pool config from server (optional) |
Configuration Sync
DynamicKeyManager can automatically sync configuration from the server.
Pass a config_fetcher (typically get_config bound with your credentials)
to enable automatic config synchronization on each refresh cycle.
PoolConfig
Server-side pool configuration is stored as pool_config JSON and synced to
the client as a PoolConfig dataclass:
| Field | Type | Default | Description |
|---|---|---|---|
| concurrency | int | 0 | Max concurrent calls (0 = unlimited) |
| timeout | float | 30.0 | Per-request timeout in seconds |
| rate_limit | int | 0 | Max requests per key per interval |
| rate_limit_interval | int | 60 | Interval for rate limit counting |
| retry_on_failure | bool | False | Retry failed calls on another key |
| max_retries | int | 0 | Maximum retry attempts |
| custom | dict | {} | Arbitrary key-value settings |
| batch_retry_on_failure | bool \| None | None | Batch retry (falls back to retry_on_failure) |
| batch_max_retries | int \| None | None | Batch retries (falls back to max_retries) |
| ban_threshold | int | 3 | Consecutive failures before key ban |
| ban_duration | float | 300.0 | Key ban duration in seconds |
| reach_limit_exception | str | None | Dotted path to exception class |
| rotation_strategy | str | "random" | Key rotation strategy |
Sync Example
from apipool import DynamicKeyManager, get_keys, get_config, login
tokens = login("http://localhost:8000", "alice", "password")
manager = DynamicKeyManager(
key_fetcher=lambda: get_keys(
service_url="http://localhost:8000",
pool_identifier="coingecko",
auth_token=tokens["access_token"],
),
api_key_factory=lambda raw_key: CoinGeckoApiKey(raw_key),
config_fetcher=lambda: get_config(
service_url="http://localhost:8000",
pool_identifier="my-pool",
auth_token=tokens["access_token"],
),
refresh_interval=120,
)
# Config is auto-synced. Access it anytime:
print(f"Concurrency: {manager.config.concurrency}")
print(f"Timeout: {manager.config.timeout}")
Manual Config Fetch
from apipool import get_config, aget_config
# Sync
config = get_config("http://localhost:8000", "my-pool", tokens["access_token"])
manager.apply_config(config)
# Async
config = await aget_config("http://localhost:8000", "my-pool", tokens["access_token"])
Concurrent Execution
Execute the same method across multiple argument sets with bounded concurrency:
Sync: call_concurrent
# Execute 100 API calls with max 10 concurrent
results = manager.call_concurrent(
method_name="some_api_method",
args_list=[(arg1,), (arg2,), (arg3,)],
kwargs_list=[{"key": "a"}, {"key": "b"}, {"key": "c"}],
max_concurrency=10, # overrides config.concurrency
timeout=15.0, # overrides config.timeout
)
Async: acall_concurrent
results = await manager.acall_concurrent(
method_name="coins.simple.price.get",
args_list=[(), (), ()],
kwargs_list=[
{"ids": "bitcoin"},
{"ids": "ethereum"},
{"ids": "solana"},
],
max_concurrency=5,
timeout=10.0,
)
Concurrency and timeout default to manager.config values when not specified.
Batch Execution
batch_exec and abatch_exec are designed for high-volume workloads
such as fetching 10 000 token prices from CoinGecko. Key features:
-
Deduplication — each
item_idis guaranteed to execute at most once. -
Retry with rotation — when an API call fails, the item is retried on a
different key, honouring the key rotation strategy.
-
Temporary banning — keys that accumulate
ban_thresholdconsecutivefailures are temporarily excluded from the batch group for
ban_durationseconds, then automatically re-admitted.
-
Server-configurable — all tuning parameters can be set centrally in
pool_configon the server and synced to clients viaPoolConfig.
Sync: batch_exec
from apipool import ApiKeyManager, BatchResult
manager = ApiKeyManager(apikey_list=apikeys)
# Each item is (item_id, args_tuple, kwargs_dict)
items = [
("bitcoin", (), {"ids": "bitcoin", "vs_currencies": "usd"}),
("ethereum", (), {"ids": "ethereum", "vs_currencies": "usd"}),
# ... up to 10 000 items
]
result: BatchResult = manager.batch_exec(
method_name="coins.simple.price.get",
items=items,
max_concurrency=20, # 20 parallel calls
timeout=10.0, # per-call timeout
retry_on_failure=True, # retry on another key
max_retries=3, # up to 3 retries per item
ban_threshold=3, # ban key after 3 consecutive failures
ban_duration=300.0, # ban for 5 minutes
)
print(f"Success: {result.succeeded}/{result.total} ({result.success_rate:.1%})")
print(f"Failed items: {list(result.errors.keys())}")
print(f"Banned keys: {list(result.banned_keys.keys())}")
# Access individual results
btc_price = result.results["bitcoin"]
Async: abatch_exec
result: BatchResult = await manager.abatch_exec(
method_name="coins.simple.price.get",
items=items,
max_concurrency=50,
retry_on_failure=True,
max_retries=3,
)
BatchResult
| Field | Type | Description |
|---|---|---|
| total | int | Total items submitted |
| succeeded | int | Items that completed successfully |
| failed | int | Items that failed after all retries |
| results | dict | item_id → result for successful items |
| errors | dict | item_id → Exception for failed items |
| banned_keys | dict | primary_key → ban_expiry_timestamp |
| elapsed | float | Wall-clock seconds for the batch |
| success_rate | float | Property: succeeded / total |
How It Works
| Step | Description |
|---|---|
| 1. Deduplicate | Each item_id is unique — no item is executed twice |
| 2. Dispatch | Items are dispatched to ThreadPoolExecutor (sync) or asyncio.Semaphore (async) |
| 3. Execute | Each call goes through the normal key selection → ChainProxy → stats pipeline |
| 4. Fail → Retry | On failure, the item is retried on a different key (up to max_retries) |
| 5. Ban | Keys hitting ban_threshold consecutive failures are banned for ban_duration |
| 6. Collect | Successful results and final errors are collected into BatchResult |
Batch Config Fields
These PoolConfig fields control batch behaviour from the server:
| Field | Type | Default | Description |
|---|---|---|---|
| batch_retry_on_failure | bool \| None | None (→ retry_on_failure) | Retry failed items on another key |
| batch_max_retries | int \| None | None (→ max_retries) | Max retries per item in batch |
| ban_threshold | int | 3 | Consecutive failures before banning a key |
| ban_duration | float | 300.0 | Seconds a banned key is excluded |
ApiKey Abstract Class
Subclass ApiKey and implement three methods:
| Method | Description |
|---|---|
| get_primary_key() | Return a unique identifier for this key |
| create_client() | Create and return the SDK client instance |
| test_usability(client) | Test if the key is usable; return bool |
Optional async methods on ApiKey:
| Method | Description |
|---|---|
| aconnect_client() | Async version of connect_client() |
| ais_usable() | Async version of is_usable() |
StatsCollector
Query usage statistics through manager.stats:
from apipool import StatusCollection
# Usage count per key in last hour
manager.stats.usage_count_stats_in_recent_n_seconds(3600)
# {"key1": 3, "key2": 5, "key3": 2}
# Count specific events
count = manager.stats.usage_count_in_recent_n_seconds(
n_seconds=3600,
status_id=StatusCollection.c9_ReachLimit.id,
)
API Reference
Exported Symbols
from apipool import (
# Core
ApiKey,
ApiKeyManager,
PoolExhaustedError,
BatchResult,
# Async chain proxy
AsyncDummyClient,
AsyncChainProxy,
AsyncApiCaller,
# Dynamic scaling
DynamicKeyManager,
AsyncDynamicKeyManager,
# Stats
StatusCollection,
StatsCollector,
# Configuration
PoolConfig,
# Server SDK mode — sync
connect,
login,
get_keys,
get_config,
# Server SDK mode — async
async_connect,
alogin,
aget_keys,
aget_config,
)
License
MIT License
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file apipool_ng-1.0.8.tar.gz.
File metadata
- Download URL: apipool_ng-1.0.8.tar.gz
- Upload date:
- Size: 133.8 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.14.0
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
ec5dfd956ad312f6fe9a9ecc006cc5182bc7a983149a7480c12e733bd81c6d48
|
|
| MD5 |
6856ee7fdb2b66465416574bb3364491
|
|
| BLAKE2b-256 |
c12a9c30c455a1d7b94a333e554a03ae5a4bc3da80ea10825cd8a1310c25f5ec
|
File details
Details for the file apipool_ng-1.0.8-py3-none-any.whl.
File metadata
- Download URL: apipool_ng-1.0.8-py3-none-any.whl
- Upload date:
- Size: 137.1 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.14.0
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
1fae6aaecbabe97f57238b6eb56755022339afcd20b308adfe4bfed0025b7be8
|
|
| MD5 |
2a4ea80bbfa70a92552cb395371ad046
|
|
| BLAKE2b-256 |
c1b938ae8efe2c3044933dff17667f121a599f0aa7f3141293f7d66f846aa145
|