Умный и мощный Python клиент для работы с REST API с синхронным и асинхронным интерфейсом, встроенной валидацией, пагинацией, повторными попытками и middleware системой
Project description
API Smart Kit
Умный и мощный Python клиент для работы с REST API
Простота использования, гибкость настройки и готовность к production
🚀 Особенности
✨ Основные возможности
- Синхронный и асинхронный API - выбор под ваши нужды
- Полная типизация - автодополнение и проверка типов
- Встроенная валидация - на основе Pydantic моделей
- Автоматическая пагинация - поддержка различных стратегий
- Повторные попытки - умный retry с экспоненциальной задержкой
- Middleware система - расширяемая архитектура
- Потоковая обработка - работа с большими данными
- Массовые операции - параллельное выполнение запросов
🛡️ Production-ready
- Circuit Breaker - защита от сбоев внешних API
- Метрики и мониторинг - встроенный сбор статистики
- Безопасность - фильтрация чувствительных данных в логах
- Тестируемость - легко мокать и тестировать
- Полное покрытие тестами - уверенность в стабильности
📦 Установка
pip install apismartkit
🎯 Быстрый старт
Простой пример
from apismartkit import APIClient, Endpoint, HttpMethod, ResponseType
from pydantic import BaseModel
# Определяем модели данных
class User(BaseModel):
id: int
name: str
email: str
class CreateUserRequest(BaseModel):
name: str
email: str
# Создаем endpoint'ы
get_user_endpoint = Endpoint(
method=HttpMethod.GET,
path="/users/{user_id}",
response_schema=User,
response_type=ResponseType.SINGLE_OBJECT
)
create_user_endpoint = Endpoint(
method=HttpMethod.POST,
path="/users",
request_schema=CreateUserRequest,
response_schema=User,
response_type=ResponseType.SINGLE_OBJECT
)
# Используем клиент
with APIClient(base_url="https://api.example.com") as client:
# Создаем пользователя
new_user = client.build_request(create_user_endpoint)\
.with_data({"name": "John Doe", "email": "john@example.com"})\
.execute()\
.as_object()
print(f"Создан пользователь: {new_user.name}")
# Получаем пользователя
user = client.build_request(get_user_endpoint)\
.with_path_vars(user_id=new_user.id)\
.execute()\
.as_object()
print(f"Получен пользователь: {user.name}")
Асинхронный пример
import asyncio
from http import HTTPMethod
from apismartkit import AsyncAPIClient, Endpoint
async def main():
async with AsyncAPIClient(base_url="https://api.example.com") as client:
endpoint = Endpoint(method=HTTPMethod.GET, path="/users/{id}")
request = client.build_request(endpoint)\
.with_path_vars(id=1)\
.build()
response = await client.execute(request)
user = response.as_object()
print(f"Пользователь: {user}")
# Запуск
asyncio.run(main())
🔧 Основное использование
Конфигурация клиента
from apismartkit import APIClient, ClientConfig
# Простая конфигурация
client = APIClient(base_url="https://api.example.com")
# Расширенная конфигурация
config = ClientConfig(
base_url="https://api.example.com",
timeout=30.0,
max_retries=3,
raise_for_status=True,
default_headers={"User-Agent": "MyApp/1.0"},
max_concurrent=10
)
client = APIClient(config)
Создание endpoint'ов
from http import HTTPMethod
from apismartkit import Endpoint, ResponseType
from pydantic import BaseModel
class Product(BaseModel):
id: int
name: str
price: float
in_stock: bool
class ProductList(BaseModel):
products: list[Product]
total: int
# Простой endpoint
get_product_endpoint = Endpoint(
method=HTTPMethod.GET,
path="/products/{product_id}",
response_schema=Product,
response_type=ResponseType.SINGLE_OBJECT
)
# Endpoint с пагинацией
list_products_endpoint = Endpoint(
method=HTTPMethod.GET,
path="/products",
response_schema=Product,
response_type=ResponseType.PAGINATED_LIST,
pagination_keys={"items": "products", "meta": "meta"}
)
# Endpoint с параметрами
search_products_endpoint = Endpoint(
method=HTTPMethod.GET,
path="/products/search",
params_schema=ProductSearchParams, # Pydantic модель
response_schema=ProductList,
response_type=ResponseType.SINGLE_OBJECT
)
Выполнение запросов
# Fluid interface через RequestBuilder
response = client.build_request(get_product_endpoint)\
.with_path_vars(product_id=123)\
.with_params({"include": "category"})\
.with_headers({"X-API-Key": "your-key"})\
.with_timeout(10.0)\
.execute()
product = response.as_object()
# Прямое создание Request
from apismartkit import Request
request = Request(
endpoint=get_product_endpoint,
path_vars={"product_id": 123},
params={"include": "category"},
headers={"X-API-Key": "your-key"}
)
response = client.execute(request)
product = response.as_object()
🚀 Расширенные возможности
Пагинация
# Автоматическая пагинация через генератор
for page in client.paginate(list_products_request):
products = page.as_list()
print(f"Получено {len(products)} продуктов")
# Обрабатываем продукты...
# Асинхронная пагинация
async for page in async_client.paginate(list_products_request):
products = page.as_list()
print(f"Получено {len(products)} продуктов")
# Ручное управление пагинацией
request = list_products_request.with_pagination(
strategy="page",
page_size=50
)
paginator = request.paginator
while paginator.has_more:
response = client.execute(request)
products = response.as_list()
# Обработка...
# Переход к следующей странице
request = response.as_paginated().paginator.get_next_page()
Массовые запросы
# Создаем несколько запросов
requests = [
client.build_request(get_product_endpoint)
.with_path_vars(product_id=i)
.build()
for i in range(1, 101)
]
# Выполняем параллельно с ограничением concurrency
responses = await async_client.execute_bulk(
requests,
max_concurrent=10
)
for response in responses:
if response.success:
product = response.as_object()
print(f"Успех: {product.name}")
Потоковая обработка
# Стриминг больших файлов
async for chunk in async_client.stream(download_request):
# Обрабатываем чанки данных
file.write(chunk)
# Endpoint для стриминга
download_endpoint = Endpoint(
method=HTTPMethod.GET,
path="/files/{file_id}/download",
response_type=ResponseType.STREAM
)
🔌 Middleware
Встроенные middleware
from apismartkit.middlewares import (
LoggingMiddleware,
RetryMiddleware,
CacheMiddleware,
MetricsMiddleware,
CircuitBreakerMiddleware
)
# Настройка middleware
middlewares = [
LoggingMiddleware(
sensitive_headers={"Authorization", "X-API-Key"},
sensitive_params={"password", "token"}
),
RetryMiddleware(
max_retries=3,
retry_status_codes=[429, 500, 502, 503, 504]
),
CacheMiddleware(default_ttl=300), # 5 минут
MetricsMiddleware(),
CircuitBreakerMiddleware(
failure_threshold=5,
recovery_timeout=60
)
]
client = APIClient(
base_url="https://api.example.com",
middlewares=middlewares
)
Кастомные middleware
from apismartkit.middlewares import BaseMiddleware
from apismartkit import Request, Response
class AuthMiddleware(BaseMiddleware):
def __init__(self, api_key: str):
self.api_key = api_key
async def on_request(self, request: Request) -> Request:
# Добавляем API ключ к каждому запросу
request.headers["X-API-Key"] = self.api_key
return request
async def on_response(self, response: Response) -> Response:
# Логируем статус ответа
if response.status_code == 401:
print("Ошибка аутентификации!")
return response
# Использование кастомного middleware
client = APIClient(
base_url="https://api.example.com",
middlewares=[AuthMiddleware("your-api-key")]
)
🛡️ Обработка ошибок
Встроенные исключения
from apismartkit.exceptions import (
APIClientException,
RequestValidationError,
ResponseValidationError,
CircuitBreakerOpen
)
try:
response = client.execute(request)
data = response.as_object()
except RequestValidationError as e:
print(f"Ошибка валидации запроса: {e}")
print(f"Поле с ошибкой: {e.error.details.get('field')}")
except ResponseValidationError as e:
print(f"Ошибка валидации ответа: {e}")
except CircuitBreakerOpen as e:
print(f"Circuit breaker открыт: {e}")
print(f"Попробуйте позже")
except APIClientException as e:
print(f"Ошибка API: {e.message}")
print(f"Request ID: {e.request_id}")
Кастомная обработка ошибок
from apismartkit import Response
response = client.execute(request)
if not response.success:
print(f"Запрос завершился с ошибкой: {response.status_code}")
# Получаем сырые данные для отладки
raw_data = response.as_raw()
print(f"Ответ сервера: {raw_data}")
# Можно обработать специфичные статус коды
if response.status_code == 429:
print("Слишком много запросов, попробуйте позже")
elif response.status_code == 403:
print("Доступ запрещен")
else:
data = response.as_object()
💡 Примеры использования
Работа с JSONPlaceholder API
from apismartkit import APIClient, Endpoint, HttpMethod, ResponseType
from pydantic import BaseModel
class Post(BaseModel):
userId: int
id: int
title: str
body: str
class User(BaseModel):
id: int
name: str
email: str
# Создаем endpoint'ы
get_posts_endpoint = Endpoint(
method=HttpMethod.GET,
path="/posts",
response_schema=Post,
response_type=ResponseType.PLAIN_LIST
)
get_user_endpoint = Endpoint(
method=HttpMethod.GET,
path="/users/{user_id}",
response_schema=User,
response_type=ResponseType.SINGLE_OBJECT
)
# Используем клиент
with APIClient(base_url="https://jsonplaceholder.typicode.com") as client:
# Получаем все посты
posts = client.build_request(get_posts_endpoint)\
.execute()\
.as_list()
print(f"Получено {len(posts)} постов")
# Получаем информацию о пользователе
user = client.build_request(get_user_endpoint)\
.with_path_vars(user_id=1)\
.execute()\
.as_object()
print(f"Пользователь: {user.name}")
Интеграция с внешним API
from typing import List
from apismartkit import APIClient, Endpoint, HttpMethod, ResponseType
from pydantic import BaseModel, Field
class GitHubRepo(BaseModel):
id: int
name: str
full_name: str
html_url: str
description: str | None
stargazers_count: int = Field(alias="stars")
forks_count: int = Field(alias="forks")
class GitHubUser(BaseModel):
login: str
id: int
avatar_url: str
html_url: str
public_repos: int
# Endpoint'ы для GitHub API
get_user_endpoint = Endpoint(
method=HttpMethod.GET,
path="/users/{username}",
response_schema=GitHubUser,
response_type=ResponseType.SINGLE_OBJECT
)
get_user_repos_endpoint = Endpoint(
method=HttpMethod.GET,
path="/users/{username}/repos",
response_schema=GitHubRepo,
response_type=ResponseType.PLAIN_LIST
)
class GitHubClient:
def __init__(self, token: str | None = None):
headers = {}
if token:
headers["Authorization"] = f"token {token}"
self.client = APIClient(
base_url="https://api.github.com",
default_headers=headers
)
def get_user(self, username: str) -> GitHubUser:
return self.client.build_request(get_user_endpoint)\
.with_path_vars(username=username)\
.execute()\
.as_object()
def get_user_repos(self, username: str) -> List[GitHubRepo]:
return self.client.build_request(get_user_repos_endpoint)\
.with_path_vars(username=username)\
.execute()\
.as_list()
# Использование
github = GitHubClient()
user = github.get_user("octocat")
repos = github.get_user_repos("octocat")
print(f"Пользователь: {user.login}")
print(f"Количество репозиториев: {len(repos)}")
Асинхронный клиент для высоконагруженных приложений
import asyncio
from typing import List
from apismartkit import AsyncAPIClient, Endpoint, HttpMethod
class NewsAPI:
def __init__(self, api_key: str):
self.client = AsyncAPIClient(
base_url="https://newsapi.org/v2",
default_headers={"X-Api-Key": api_key}
)
self.top_headlines_endpoint = Endpoint(
method=HttpMethod.GET,
path="/top-headlines",
response_type=ResponseType.SINGLE_OBJECT
)
async def get_top_headlines(self, country: str = "us") -> List[dict]:
request = self.client.build_request(self.top_headlines_endpoint)\
.with_params({"country": country})\
.build()
response = await self.client.execute(request)
data = response.as_object()
return data.get("articles", [])
async def get_multiple_countries(self, countries: List[str]) -> dict:
tasks = [
self.get_top_headlines(country)
for country in countries
]
results = {}
for country, headlines in zip(countries, await asyncio.gather(*tasks)):
results[country] = headlines
return results
# Использование
async def main():
news_api = NewsAPI("your-api-key")
# Получаем новости для нескольких стран параллельно
headlines = await news_api.get_multiple_countries(["us", "gb", "ru"])
for country, articles in headlines.items():
print(f"Новости {country}: {len(articles)} статей")
asyncio.run(main())
📄 Лицензия
Этот проект распространяется под лицензией MIT.
🐛 Сообщение об ошибках
Если вы нашли ошибку, пожалуйста, создайте issue в GitHub Issues с подробным описанием проблемы.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file apismartkit-0.0.1.tar.gz.
File metadata
- Download URL: apismartkit-0.0.1.tar.gz
- Upload date:
- Size: 23.0 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/2.2.0 CPython/3.13.0 Windows/11
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
a690e818797fea787990e49c2999079ff88efe84dd022392fe95b7fb4f1c6528
|
|
| MD5 |
20fac3d98994acd21708d5e71b03d40d
|
|
| BLAKE2b-256 |
85b9805c26031363249d9bea9f11a378cf8186cb83e3295459e3883fc3e3ec5e
|
File details
Details for the file apismartkit-0.0.1-py3-none-any.whl.
File metadata
- Download URL: apismartkit-0.0.1-py3-none-any.whl
- Upload date:
- Size: 24.7 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/2.2.0 CPython/3.13.0 Windows/11
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
cb9fc8d692b257c5c779636c439d21be0edaa7c47a20b054d3374f5eb80200d9
|
|
| MD5 |
ae3864d101fca11f3955d9e102191e3d
|
|
| BLAKE2b-256 |
baefa8e4eb76d2e43aa8aa14ec6ba191de685c2f2a621263288d4fb2776ce057
|