Skip to main content

httpx的封装

Project description

HTTPXX

🚀 HTTPXX 是一个基于 httpx HTTP 客户端封装库,提供完全异步支持、自动重试、断路器、速率限制等高级特性。

✨ 特性

  • 完全异步支持 - 基于 asyncio,支持高并发请求
  • 🔄 自动重试机制 - 支持多种退避策略(指数、线性、固定)
  • 🔌 断路器模式 - 防止级联故障,服务降级保护
  • ⏱️ 速率限制 - 控制请求频率和并发数,防止过载
  • 💾 智能缓存 - 自动缓存响应,减少重复请求
  • 🪝 完整钩子系统 - 支持在请求生命周期各阶段自定义逻辑
  • 🔒 SSL/TLS 支持 - 无缝的证书处理和验证
  • 🌐 HTTP/2 & HTTP/3 - 支持最新 HTTP 协议
  • 🔗 代理支持 - 支持 HTTP、HTTPS、SOCKS5 代理
  • 📊 完善日志系统 - 详细的请求追踪和错误记录
  • 🛡️ 灵活错误处理 - 支持抛异常或返回错误数据两种模式

📦 安装

pip install httpxx

🚀 快速开始

基础使用

import asyncio
from httpxx import HTTPXClient, RequestConfig, HTTPMethod

async def main():
    # 创建客户端
    async with HTTPXClient(base_url="https://api.example.com") as client:
        # 发送 GET 请求
        config = RequestConfig(
            url="/users",
            method=HTTPMethod.GET
        )
        response = await client.request(config)
        
        if response.is_success():
            print(response.json_data)
        else:
            print(f"请求失败: {response.error_message}")

asyncio.run(main())

简化请求方法

async def main():
    async with HTTPXClient() as client:
        # GET 请求
        response = await client.get("https://api.example.com/users")
        
        # POST 请求
        response = await client.post(
            "https://api.example.com/users",
            json={"name": "John", "age": 30}
        )
        
        # PUT 请求
        response = await client.put(
            "https://api.example.com/users/1",
            json={"age": 31}
        )
        
        # DELETE 请求
        response = await client.delete("https://api.example.com/users/1")

📖 核心配置

1. 重试配置 (RetryConfig)

自动重试失败的请求,支持多种退避策略。

from httpxx import HTTPXClient, RetryConfig, RetryStrategy

retry_config = RetryConfig(
    max_retries=3,                    # 最大重试次数
    strategy=RetryStrategy.EXPONENTIAL,  # 重试策略:指数退避
    base_delay=1.0,                   # 基础延迟(秒)
    max_delay=60.0,                   # 最大延迟(秒)
    jitter=True,                      # 添加随机抖动
    retry_on_status_codes=[408, 429, 500, 502, 503, 504],  # 需要重试的状态码
)

async with HTTPXClient(retry_config=retry_config) as client:
    response = await client.get("https://api.example.com/data")

重试策略说明:

  • EXPONENTIAL - 指数退避(推荐):1s, 2s, 4s, 8s...
  • LINEAR - 线性增长:1s, 2s, 3s, 4s...
  • FIXED - 固定延迟:1s, 1s, 1s, 1s...
  • NONE - 不重试

2. 超时配置 (TimeoutConfig)

精确控制请求各阶段的超时时间。

from httpxx import HTTPXClient, TimeoutConfig

# 方式1:简单超时
timeout = TimeoutConfig(timeout=30.0)  # 全局30秒超时

# 方式2:详细超时配置
timeout = TimeoutConfig(
    connect=10.0,  # 连接超时
    read=30.0,     # 读取超时
    write=30.0,    # 写入超时
    pool=5.0       # 连接池超时
)

# 方式3:快捷方法
timeout = TimeoutConfig.from_timeout(30.0)
timeout = TimeoutConfig.from_detailed(connect=10, read=30, write=30, pool=5)

async with HTTPXClient(timeout=timeout) as client:
    response = await client.get("https://api.example.com/data")

3. 速率限制 (RateLimitConfig)

控制请求频率,防止 API 限流。

from httpxx import HTTPXClient, RateLimitConfig

rate_limit = RateLimitConfig(
    max_requests_per_second=10,      # 每秒最多10个请求
    max_concurrent_requests=5,       # 最多5个并发请求
    per_host_rate_limit=5            # 每个主机每秒最多5个请求
)

async with HTTPXClient(rate_limit_config=rate_limit) as client:
    # 自动控制请求速率
    tasks = [client.get(f"https://api.example.com/item/{i}") for i in range(100)]
    responses = await asyncio.gather(*tasks)

4. 断路器 (CircuitBreakerConfig)

防止级联故障,自动熔断不可用服务。

from httpxx import HTTPXClient, CircuitBreakerConfig

circuit_breaker = CircuitBreakerConfig(
    enabled=True,              # 启用断路器
    failure_threshold=5,       # 连续失败5次后打开断路器
    recovery_timeout=60.0,     # 60秒后尝试恢复
    expected_exception=Exception  # 触发断路器的异常类型
)

async with HTTPXClient(circuit_breaker_config=circuit_breaker) as client:
    try:
        response = await client.get("https://unstable-api.example.com/data")
    except CircuitBreakerOpenError:
        print("服务暂时不可用,断路器已打开")

断路器状态:

  • closed - 正常状态,允许请求通过
  • open - 熔断状态,直接拒绝请求
  • half-open - 半开状态,尝试恢复服务

5. 缓存配置 (CacheConfig)

自动缓存响应,减少重复请求。

from httpxx import HTTPXClient, CacheConfig

cache = CacheConfig(
    enabled=True,                     # 启用缓存
    ttl=300.0,                        # 缓存有效期(秒)
    max_cache_memory=10*1024*1024,    # 最大缓存内存 10MB
    cacheable_methods=["GET", "HEAD"],  # 可缓存的方法
    cacheable_status_codes=[200, 203, 204, 206, 300, 301, 404]  # 可缓存的状态码
)

async with HTTPXClient(cache_config=cache) as client:
    # 第一次请求,从服务器获取
    response1 = await client.get("https://api.example.com/data")
    
    # 第二次请求,从缓存返回
    response2 = await client.get("https://api.example.com/data")

6. 代理配置 (ProxyConfig)

支持 HTTP、HTTPS、SOCKS5 代理。

from httpxx import HTTPXClient, ProxyConfig, ProxyType

# 方式1:简单代理
async with HTTPXClient(proxies="http://proxy.example.com:8080") as client:
    response = await client.get("https://api.example.com/data")

# 方式2:详细代理配置
proxy = ProxyConfig(
    url="proxy.example.com:8080",
    username="user",
    password="pass",
    proxy_type=ProxyType.HTTP,
    verify_ssl=True,
    timeout=30.0
)

async with HTTPXClient(proxies=[proxy]) as client:
    response = await client.get("https://api.example.com/data")

# 方式3:按协议配置代理
proxies = {
    "http://": "http://proxy1.example.com:8080",
    "https://": "https://proxy2.example.com:8080"
}

async with HTTPXClient(proxies=proxies) as client:
    response = await client.get("https://api.example.com/data")

🪝 钩子系统

HTTPXX 提供完整的钩子系统,支持在请求生命周期的各个阶段执行自定义逻辑。

钩子类型

1. 修改型钩子(可修改数据)

  • before_request - 请求发送前,可修改 RequestConfig
  • after_response - 收到响应后,可修改 ResponseData
  • on_request_failure - 请求失败时,可修改失败的 ResponseData

2. 通知型钩子(仅通知)

  • response_from_cache - 使用缓存响应时
  • circuit_breaker_open - 断路器打开时
  • request_error - 请求发生错误时
  • request_failed - 请求最终失败时
  • request_retry - 请求重试时
  • http_error - HTTP 错误时

钩子使用示例

from httpxx import HTTPXClient, RequestConfig, ResponseData

async def add_auth_header(config: RequestConfig) -> RequestConfig:
    """在请求前添加认证头"""
    if config.headers is None:
        config.headers = {}
    config.headers["Authorization"] = "Bearer token123"
    return config

async def log_response(response: ResponseData) -> ResponseData:
    """记录响应信息"""
    print(f"收到响应: {response.status_code} - {response.url}")
    return response

async def handle_error(response: ResponseData, config: RequestConfig, error: Exception) -> ResponseData:
    """处理请求失败"""
    print(f"请求失败: {config.url} - {error}")
    return response

def on_retry(attempt: int, delay: float, url: str):
    """通知重试事件"""
    print(f"重试第 {attempt} 次,延迟 {delay}秒: {url}")

async def main():
    async with HTTPXClient() as client:
        # 注册钩子
        client.hooks.register("before_request", add_auth_header)
        client.hooks.register("after_response", log_response)
        client.hooks.register("on_request_failure", handle_error)
        client.hooks.register("request_retry", on_retry)
        
        response = await client.get("https://api.example.com/data")

🔧 高级功能

1. 并发请求

async def fetch_multiple():
    async with HTTPXClient() as client:
        urls = [
            "https://api.example.com/user/1",
            "https://api.example.com/user/2",
            "https://api.example.com/user/3"
        ]
        
        # 并发发送请求
        tasks = [client.get(url) for url in urls]
        responses = await asyncio.gather(*tasks)
        
        for response in responses:
            if response.is_success():
                print(response.json_data)

2. 流式请求

async def download_file():
    async with HTTPXClient() as client:
        # 流式下载大文件
        async for chunk in client.stream_request(
            RequestConfig(
                url="https://example.com/large-file.zip",
                method=HTTPMethod.GET
            )
        ):
            # 处理数据块
            process_chunk(chunk)

3. 批量请求

async def batch_requests():
    async with HTTPXClient() as client:
        configs = [
            RequestConfig(url="https://api.example.com/endpoint1", method=HTTPMethod.GET),
            RequestConfig(url="https://api.example.com/endpoint2", method=HTTPMethod.POST, json={"key": "value"}),
            RequestConfig(url="https://api.example.com/endpoint3", method=HTTPMethod.PUT, json={"id": 1})
        ]
        
        responses = await client.batch_request(configs, max_concurrent=5)
        
        for response in responses:
            print(f"{response.url}: {response.status_code}")

4. 错误处理模式

# 模式1:抛异常模式(默认)
async with HTTPXClient(raise_on_error=True) as client:
    try:
        response = await client.get("https://api.example.com/data")
    except httpx.HTTPError as e:
        print(f"请求失败: {e}")

# 模式2:返回错误数据模式
async with HTTPXClient(raise_on_error=False) as client:
    response = await client.get("https://api.example.com/data")
    
    if response.has_error():
        error_info = response.get_error_info()
        print(f"错误类型: {error_info['type']}")
        print(f"错误信息: {error_info['message']}")
        print(f"堆栈跟踪: {error_info['traceback']}")
    else:
        print(response.json_data)

5. 自定义请求头和认证

# 默认请求头
async with HTTPXClient(
    headers={
        "User-Agent": "MyApp/1.0",
        "Accept": "application/json"
    }
) as client:
    response = await client.get("https://api.example.com/data")

# HTTP 基本认证
async with HTTPXClient(auth=("username", "password")) as client:
    response = await client.get("https://api.example.com/secure-data")

# Bearer Token 认证(通过钩子)
async def add_bearer_token(config: RequestConfig) -> RequestConfig:
    if config.headers is None:
        config.headers = {}
    config.headers["Authorization"] = f"Bearer {get_token()}"
    return config

async with HTTPXClient() as client:
    client.hooks.register("before_request", add_bearer_token)
    response = await client.get("https://api.example.com/protected")

6. 连接池优化

from httpxx import HTTPXClient

async with HTTPXClient(
    max_connections=100,           # 最大连接数
    max_keepalive_connections=20,  # 最大保活连接数
    pool_timeout=30.0,             # 连接池超时
    http2=True                     # 启用 HTTP/2
) as client:
    # 复用连接,提升性能
    tasks = [client.get(f"https://api.example.com/item/{i}") for i in range(1000)]
    responses = await asyncio.gather(*tasks)

📊 ResponseData API

ResponseData 对象封装了完整的响应信息:

response = await client.get("https://api.example.com/data")

# 响应属性
response.status_code      # HTTP 状态码
response.headers          # 响应头(字典)
response.content          # 响应体(字节)
response.text             # 响应体(字符串)
response.json_data        # 响应体(JSON 对象)
response.url              # 请求 URL
response.elapsed          # 请求耗时(秒)
response.history          # 重定向历史

# 错误信息
response.error            # 异常对象
response.error_message    # 错误消息
response.error_type       # 错误类型
response.error_traceback  # 堆栈跟踪

# 状态检查方法
response.is_success()      # 是否成功 (2xx)
response.has_error()       # 是否有错误
response.is_redirect()     # 是否重定向 (3xx)
response.is_client_error() # 是否客户端错误 (4xx)
response.is_server_error() # 是否服务器错误 (5xx)

# 错误处理
response.raise_for_error()  # 如果有错误则抛出异常
error_info = response.get_error_info()  # 获取详细错误信息

🎯 最佳实践

1. 使用上下文管理器

# ✅ 推荐:自动管理资源
async with HTTPXClient() as client:
    response = await client.get("https://api.example.com/data")

# ❌ 不推荐:需手动关闭
client = HTTPXClient()
response = await client.get("https://api.example.com/data")
await client.close()  # 容易忘记

2. 合理配置重试

# ✅ 推荐:针对不同场景配置
retry_config = RetryConfig(
    max_retries=3,
    strategy=RetryStrategy.EXPONENTIAL,
    retry_on_status_codes=[429, 500, 502, 503, 504]  # 只重试特定错误
)

# ❌ 不推荐:盲目重试所有错误
retry_config = RetryConfig(max_retries=10)  # 可能重试 4xx 错误

3. 启用缓存减少请求

# ✅ 推荐:缓存不常变化的数据
cache_config = CacheConfig(
    enabled=True,
    ttl=300,  # 5分钟缓存
    cacheable_methods=["GET"]
)

4. 使用速率限制保护 API

# ✅ 推荐:遵守 API 限制
rate_limit = RateLimitConfig(
    max_requests_per_second=10,  # API 限制
    max_concurrent_requests=5
)

5. 监控和日志

import logging

# 启用详细日志
logging.basicConfig(level=logging.DEBUG)

# 自定义钩子记录请求
async def log_request(config: RequestConfig) -> RequestConfig:
    logger.info(f"发送请求: {config.method.value} {config.url}")
    return config

async def log_response(response: ResponseData) -> ResponseData:
    logger.info(f"收到响应: {response.status_code} - 耗时 {response.elapsed}s")
    return response

client.hooks.register("before_request", log_request)
client.hooks.register("after_response", log_response)

🔍 完整示例

import asyncio
import logging
from httpxx import (
    HTTPXClient,
    RequestConfig,
    HTTPMethod,
    RetryConfig,
    RetryStrategy,
    TimeoutConfig,
    RateLimitConfig,
    CacheConfig,
)

# 配置日志
logging.basicConfig(level=logging.INFO)

async def main():
    # 配置客户端
    client = HTTPXClient(
        base_url="https://api.example.com",
        timeout=TimeoutConfig.from_timeout(30),
        retry_config=RetryConfig(
            max_retries=3,
            strategy=RetryStrategy.EXPONENTIAL,
            base_delay=1.0
        ),
        rate_limit_config=RateLimitConfig(
            max_requests_per_second=10,
            max_concurrent_requests=5
        ),
        cache_config=CacheConfig(
            enabled=True,
            ttl=300
        ),
        headers={"User-Agent": "MyApp/1.0"},
        raise_on_error=False  # 不抛异常,返回错误数据
    )
    
    # 注册钩子
    async def add_auth(config: RequestConfig) -> RequestConfig:
        if config.headers is None:
            config.headers = {}
        config.headers["Authorization"] = "Bearer YOUR_TOKEN"
        return config
    
    client.hooks.register("before_request", add_auth)
    
    async with client:
        # 发送请求
        response = await client.get("/users")
        
        if response.is_success():
            users = response.json_data
            print(f"获取到 {len(users)} 个用户")
        else:
            error_info = response.get_error_info()
            print(f"请求失败: {error_info['message']}")
        
        # 批量请求
        tasks = [client.get(f"/users/{i}") for i in range(1, 11)]
        responses = await asyncio.gather(*tasks)
        
        successful = [r for r in responses if r.is_success()]
        print(f"成功获取 {len(successful)} 个用户详情")

if __name__ == "__main__":
    asyncio.run(main())

📝 异常类型

  • HTTPXWrapperException - 基础异常类
  • CircuitBreakerOpenError - 断路器打开异常
  • RateLimitExceededError - 速率限制超出异常
  • CacheError - 缓存错误异常

📄 许可证

本项目采用 MIT 许可证。详见 LICENSE 文件。

🔗 相关链接

⚠️ 注意事项

  1. Python 版本要求:需要 Python 3.8+
  2. HTTP/3 支持:需要安装 httpx[http3]
  3. 异步环境:所有方法都是异步的,需要在 async 函数中使用
  4. 内存管理:启用缓存时注意 max_cache_memory 配置
  5. 敏感信息:日志会自动过滤敏感请求头(如 Authorization、Cookie)

💡 提示

  • 使用 raise_on_error=False 可以避免异常,获取完整的错误信息
  • 钩子系统支持修改请求和响应,非常适合添加认证、日志等
  • 断路器可以保护服务免受雪崩效应影响
  • 缓存只对 JSON 响应生效,确保 Content-Type 正确
  • 速率限制器在并发场景下自动控制请求频率

Happy Coding! 🎉

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

httpxx-0.1.2.tar.gz (26.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

httpxx-0.1.2-py3-none-any.whl (24.3 kB view details)

Uploaded Python 3

File details

Details for the file httpxx-0.1.2.tar.gz.

File metadata

  • Download URL: httpxx-0.1.2.tar.gz
  • Upload date:
  • Size: 26.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.9.18 {"installer":{"name":"uv","version":"0.9.18","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":null,"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for httpxx-0.1.2.tar.gz
Algorithm Hash digest
SHA256 663a187f9bddb146df523e496c1b97ae62c12781f431cd245ad3da07ecdd8cf1
MD5 988a655f70a7e3b5a19a0aa4af0cce99
BLAKE2b-256 d517bd974febfdecdcd7157cf83a402eadaad42617976e24f680b9be10f17154

See more details on using hashes here.

File details

Details for the file httpxx-0.1.2-py3-none-any.whl.

File metadata

  • Download URL: httpxx-0.1.2-py3-none-any.whl
  • Upload date:
  • Size: 24.3 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.9.18 {"installer":{"name":"uv","version":"0.9.18","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":null,"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for httpxx-0.1.2-py3-none-any.whl
Algorithm Hash digest
SHA256 6d0d39c7d5fd298464a8c7f443797f1d261c38eeeeaac12060985b105d12f10b
MD5 d78b0cb1136bf32d045e0620d079b1de
BLAKE2b-256 dae4b367e8dd236cbde921c0834717cae3c6018144740befdf6313e0a141e664

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page