Skip to main content

华策AIGC Auth Client - 提供 Token 验证、用户信息获取、权限检查、旧系统接入等功能

Project description

AIGC Auth Python SDK

PyPI version Python Version

Python 后端服务接入华策 AIGC 鉴权中心的 SDK 工具包。

安装

pip install huace-aigc-auth-client

环境变量配置

在项目根目录创建 .env 文件:

# 必填:应用 ID 和密钥(在鉴权中心创建应用后获取)
AIGC_AUTH_APP_ID=your_app_id
AIGC_AUTH_APP_SECRET=your_app_secret
# 鉴权服务地址(默认为生产环境)
AIGC_AUTH_BASE_URL=your-auth-api-url-prefix

如需通过 Nginx 代理:

location /aigc-auth/ {
    proxy_pass https://aigc-auth.huacemedia.com/aigc-auth/;
    proxy_set_header Host $host;
    proxy_set_header X-Real-IP $remote_addr;
    proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
    proxy_set_header X-Forwarded-Proto $scheme;
}

快速开始

1. 初始化客户端

from huace_aigc_auth_client import AigcAuthClient

# 方式一:从环境变量读取配置
client = AigcAuthClient()

# 方式二:直接传入参数
client = AigcAuthClient(
    app_id="your_app_id",
    app_secret="your_app_secret",
    base_url="your-auth-api-url-prefix"
)

2. 验证 Token

result = client.verify_token(token)

if result.valid:
    print(f"用户 ID: {result.user_id}")
    print(f"用户名: {result.username}")
    print(f"过期时间: {result.expires_at}")
else:
    print("Token 无效")

3. 获取用户信息

from huace_aigc_auth_client import AigcAuthClient, AigcAuthError

client = AigcAuthClient()

try:
    user = client.get_user_info(token)
    
    print(f"用户名: {user.username}")
    print(f"昵称: {user.nickname}")
    print(f"邮箱: {user.email}")
    print(f"角色: {user.roles}")
    print(f"权限: {user.permissions}")
    
    # 检查角色(在 Auth 里面配置)
    if user.has_role("admin") or user.is_admin:
        print("是管理员")
    
    # 检查权限(在 Auth 里面配置)
    if user.has_permission("user:write"):
        print("有用户写权限")
        
except AigcAuthError as e:
    print(f"错误: {e.message}")

4. 批量检查权限

results = client.check_permissions(token, ["user:read", "user:write", "admin:access"])

for permission, has_permission in results.items():
    print(f"{permission}: {'✓' if has_permission else '✗'}")

5. 批量获取用户信息

# 按用户名批量查询
result = client.batch_get_users(["zhangsan", "lisi"])
for user in result["users"]:
    print(f"{user['username']}: {user['nickname']}")
# result["not_found"] 包含未找到的用户名

# 按用户ID批量查询(未找到的返回"未知用户"占位)
result = client.batch_get_users_by_ids([1, 2, 999])
for user in result["users"]:
    print(f"{user['id']}: {user['username']}")

FastAPI 集成

方式一:使用中间件(推荐)

from fastapi import FastAPI, Request, HTTPException
from huace_aigc_auth_client import AigcAuthClient, AuthMiddleware

app = FastAPI()

# 初始化客户端和中间件
client = AigcAuthClient()
auth_middleware = AuthMiddleware(
    client,
    exclude_paths=["/health", "/docs", "/openapi.json"],
    exclude_prefixes=["/public/"]
)

# 注册中间件
@app.middleware("http")
async def auth(request: Request, call_next):
    return await auth_middleware.fastapi_middleware(request, call_next)

# 在路由中获取用户信息
@app.get("/me")
async def get_current_user(request: Request):
    user = request.state.user_info
    return {
        "username": user.username,
        "roles": user.roles
    }

@app.get("/admin")
async def admin_only(request: Request):
    user = request.state.user_info
    if not user.has_role("admin"):
        raise HTTPException(status_code=403, detail="需要管理员权限")
    return {"message": "欢迎管理员"}

方式二:使用依赖注入

from fastapi import FastAPI, Depends, HTTPException
from huace_aigc_auth_client import AigcAuthClient, create_fastapi_auth_dependency, UserInfo

app = FastAPI()
client = AigcAuthClient()

# 创建认证依赖
get_current_user = create_fastapi_auth_dependency(client)

@app.get("/me")
async def get_me(user: UserInfo = Depends(get_current_user)):
    return {"username": user.username}

# 创建权限检查依赖
def require_permission(permission: str):
    async def check(user: UserInfo = Depends(get_current_user)):
        if not user.has_permission(permission):
            raise HTTPException(status_code=403, detail="权限不足")
        return user
    return check

@app.get("/users")
async def list_users(user: UserInfo = Depends(require_permission("user:read"))):
    return {"users": [...]}

方式三:使用装饰器

from fastapi import FastAPI, Request
from huace_aigc_auth_client import AigcAuthClient, require_auth, UserInfo

app = FastAPI()
client = AigcAuthClient()

@app.get("/protected")
@require_auth(client)
async def protected_route(request: Request, user_info: UserInfo):
    return {"user": user_info.username}

@app.get("/admin")
@require_auth(client, permissions=["admin:access"])
async def admin_route(request: Request, user_info: UserInfo):
    return {"admin": True}

# 只需要任意一个权限
@app.get("/editor")
@require_auth(client, permissions=["article:write", "article:edit"], any_permission=True)
async def editor_route(request: Request, user_info: UserInfo):
    return {"editor": True}

Flask 集成

from flask import Flask, g, jsonify
from functools import wraps
from huace_aigc_auth_client import AigcAuthClient, AuthMiddleware

app = Flask(__name__)

# 初始化客户端和中间件
client = AigcAuthClient()
auth_middleware = AuthMiddleware(
    client,
    exclude_paths=["/health", "/login"],
    exclude_prefixes=["/public/"]
)

# 注册 before_request
@app.before_request
def before_request():
    return auth_middleware.flask_before_request()

# 在路由中获取用户信息
@app.route("/me")
def get_me():
    user = g.user_info
    return jsonify({
        "username": user.username,
        "roles": user.roles
    })

# 权限检查装饰器
def require_permission(permission):
    def decorator(f):
        @wraps(f)
        def decorated_function(*args, **kwargs):
            user = g.user_info
            if not user.has_permission(permission):
                return jsonify({"error": "权限不足"}), 403
            return f(*args, **kwargs)
        return decorated_function
    return decorator

@app.route("/admin")
@require_permission("admin:access")
def admin_only():
    return jsonify({"message": "欢迎管理员"})

上下文管理

SDK 提供了上下文管理功能,可以在任意位置(Service、Utility 等)获取当前请求的用户信息,无需层层传递参数。 支持 Flask 和 FastAPI,兼容同步和异步环境。

注意:使用此功能前必须先注册 AuthMiddleware

from huace_aigc_auth_client import get_current_user

def do_something_logic():
    # 获取当前用户信息(返回字典,非 UserInfo 对象)
    user = get_current_user()
    
    if user:
        print(f"当前操作用户ID: {user['id']}")
        print(f"当前操作用户名: {user['username']}")
        
        # 也可以获取 app_id 等信息(如果未过滤)
        if 'app_id' in user:
            print(f"应用ID: {user['app_id']}")
    else:
        print("无用户上下文")

API 参考

UserInfo 对象

属性 类型 说明
id int 用户 ID
username str 用户名
nickname str 昵称
email str 邮箱
phone str 手机号
avatar str 头像 URL
roles List[str] 角色代码列表
permissions List[str] 权限代码列表
department str 部门
company str 公司
方法 说明
has_role(role) 检查是否拥有指定角色
has_permission(permission) 检查是否拥有指定权限
has_any_permission(permissions) 检查是否拥有任意一个权限
has_all_permissions(permissions) 检查是否拥有所有权限

异常处理

from huace_aigc_auth_client import AigcAuthClient, AigcAuthError

client = AigcAuthClient()

try:
    user = client.get_user_info(token)
except AigcAuthError as e:
    print(f"错误码: {e.code}")
    print(f"错误信息: {e.message}")
错误码 说明
401 未授权(Token 无效或已过期)
403 禁止访问(用户被禁用或权限不足)
404 用户不存在
-1 网络请求失败

旧系统接入(用户同步)

如果你的系统已有用户表,可通过 SDK 提供的「旧系统适配器」实现低成本接入,无需修改历史代码和表结构

接入原理

┌─────────────────┐     ┌─────────────────┐     ┌─────────────────┐
│   aigc-auth     │────▶│   SDK 同步层     │────▶│   旧系统         │
│   (鉴权中心)     │     │   (字段映射)     │     │   (用户表)       │
└─────────────────┘     └─────────────────┘     └─────────────────┘
        │                       │                       │
        │   1. 初始化同步        │                       │
        │◀──────────────────────────────────────────────│
        │   (批量同步旧用户到 auth)                       │
        │                       │                       │
        │   2. 增量同步          │                       │
        │──────────────────────▶│──────────────────────▶│
        │   (auth 新用户自动同步到旧系统)                  │
        │                       │                       │
        │   3. Webhook 推送      │                       │
        │──────────────────────────────────────────────▶│
        │   (用户变更主动通知)                            │

快速开始

1. 配置环境变量

# .env 文件
AIGC_AUTH_APP_ID=your_app_id
AIGC_AUTH_APP_SECRET=your_app_secret
AIGC_AUTH_BASE_URL=your-auth-api-url-prefix

# 同步配置
AIGC_AUTH_SYNC_ENABLED=true
AIGC_AUTH_SYNC_PASSWORD=通用密码
AIGC_AUTH_WEBHOOK_URL=https://your-domain.com/api/v1/webhook/auth
AIGC_AUTH_WEBHOOK_SECRET=your_secret

2. 创建字段映射

from huace_aigc_auth_client import (
    FieldMapping,
    SyncConfig,
    PasswordMode,
    create_sync_config
)

# 定义字段映射
field_mappings = [
    FieldMapping(
        auth_field="username",
        legacy_field="username",
        required=True
    ),
    FieldMapping(
        auth_field="email",
        legacy_field="email"
    ),
    FieldMapping(
        auth_field="nickname",
        legacy_field="nickname"
    ),
    # 状态字段转换:auth 的 status 映射到旧系统的 is_active
    FieldMapping(
        auth_field="status",
        legacy_field="is_active",
        transform_to_legacy=lambda s: s == "active",
        transform_to_auth=lambda a: "active" if a else "disabled"
    ),
    # 角色字段转换:auth 的 roles 列表映射到旧系统的 role 字符串
    FieldMapping(
        auth_field="roles",
        legacy_field="role",
        transform_to_legacy=lambda r: r[0] if r else "viewer",
        transform_to_auth=lambda r: [r] if r else ["viewer"]
    ),
]

# 创建同步配置
sync_config = create_sync_config(
    field_mappings=field_mappings,
    password_mode=PasswordMode.UNIFIED,
    unified_password="通用密码",
    webhook_url="https://your-domain.com/api/v1/webhook/auth",
)

3. 实现旧系统适配器

from huace_aigc_auth_client import LegacySystemAdapter, LegacyUserData

class MyLegacyAdapter(LegacySystemAdapter):
    """实现与旧系统用户表的交互"""
    
    def __init__(self, db, sync_config, auth_client=None):
        super().__init__(sync_config, auth_client)
        self.db = db
    
    def get_user_by_unique_field(self, username: str):
        """通过用户名获取旧系统用户"""
        user = self.db.query(User).filter(User.username == username).first()
        if not user:
            return None
        return LegacyUserData({
            "id": user.id,
            "username": user.username,
            "email": user.email,
            # ... 其他字段
        })

    async def get_user_by_username_async(self, username: str) -> Optional[LegacyUserData]:
        """异步通过用户名获取旧系统用户"""
        user = await self.db.execute(
            select(User).where(User.username == username)
        )
        user = user.scalars().first()
        if not user:
            return None
        return LegacyUserData({
            "id": user.id,
            "username": user.username,
            "email": user.email,
            # ... 其他字段
        })
    
    async def _create_user_async(self, user_data: Dict[str, Any]) -> Optional[Any]:
        """在旧系统创建用户"""
        user = User(
            username=user_data["username"],
            email=user_data.get("email"),
            hashed_password=hash_password(user_data["password"]),
            # ... 其他字段
        )
        self.db.add(user)
        await self.db.commit()
        return user.id
    
    async def _update_user_async(self, username: str, user_data: Dict[str, Any]) -> bool:
        """异步更新旧系统用户"""
        user = await self.db.execute(
            select(User).where(User.username == username)
        )
        user = user.scalars().first()
        if user:
            for key, value in user_data.items():
                setattr(user, key, value)
            await self.db.commit()
            return True
        return False

    async def _delete_user_async(self, username: str) -> bool:
        """异步删除旧系统用户"""
        user = await self.db.execute(
            select(User).where(User.username == username)
        )
        user = user.scalars().first()
        if user:
            await self.db.delete(user) 
            # 或者软删除:user.is_active = False
            await self.db.commit()
            return True
        return False
    
    async def get_all_users_async(self) -> List[LegacyUserData]:
        """异步获取所有用户(用于初始化同步)"""
        result = await self.db.execute(select(User))
        users = result.scalars().all()
        return [LegacyUserData({"username": u.username, ...}) for u in users]

4. 集成到登录流程

from huace_aigc_auth_client import AigcAuthClient, UserSyncService

client = AigcAuthClient()
adapter = MyLegacyAdapter(db, sync_config, client)
sync_service = UserSyncService(client, adapter)

# 在获取用户信息后调用同步
@app.middleware("http")
async def auth_middleware(request, call_next):
    response = await auth.fastapi_middleware(request, call_next)
    
    # 登录成功后,同步用户到旧系统
    if hasattr(request.state, "user_info"):
        user_info = request.state.user_info
        await sync_service.sync_on_login_async(user_info)
    
    return response

5. 添加 Webhook 接收端点

推荐方式:使用 SDK 提供的通用 Webhook 路由

from fastapi import APIRouter
from huace_aigc_auth_client import register_webhook_router

api_router = APIRouter()
client = AigcAuthClient()
adapter = MyLegacyAdapter(db, sync_config, client)

# 定义 webhook 处理函数
async def handle_auth_webhook(request_data: Dict[str, Any]) -> Dict[str, Any]:
    """
    独立的 webhook 处理函数(用于 SDK 集成)
    
    这个函数不需要 db 参数,会在内部创建数据库会话。
    适用于通过 SDK 的 register_webhook_router 注册。
    """
    from app.db.session import get_db
    
    async for db in get_db():
        try:
            event = request_data.get("event")
            data = request_data.get("data", {})
            logger.info(f"Received webhook event: {event} with data: {data}")
            
            # 调用适配器的 handle_webhook 方法
            return await adapter.handle_webhook(event, data)
        except Exception as e:
            logger.exception(f"Failed to handle webhook: {e}")
            raise

# 注册 webhook 路由(自动处理签名验证)
register_webhook_router(
    api_router,
    handler=handle_auth_webhook,
    prefix="/webhook",  # 可选,默认 "/webhook"
    secret_env_key="aigc-auth-webhook-secret",  # 可选,在 auth 后台配置
    tags=["Webhook"]  # 可选,默认 ["Webhook"]
)

# webhook 端点将自动创建在: /webhook/auth

Webhook 签名验证说明

SDK 的 register_webhook_router 会自动处理签名验证:

  • 从请求头 X-Webhook-Signature 获取签名
  • 从环境变量读取密钥(默认 AIGC_AUTH_WEBHOOK_SECRET
  • 使用 HMAC-SHA256 算法验证签名
  • 签名不匹配时返回 401 错误

6. 执行初始化同步

# 一次性脚本:将旧系统用户同步到 aigc-auth
async def init_sync():
    adapter = MyLegacyAdapter(db, sync_config, client)
    users = adapter.get_all_users()
    
    for user in users:
        auth_data = adapter.transform_legacy_to_auth(user)
        
        # 获取密码(支持元组返回格式)
        password_result = adapter.get_password_for_sync(user)
        if isinstance(password_result, tuple):
            password, is_hashed = password_result
        else:
            password, is_hashed = password_result, False
        
        # 根据是否已加密选择不同的字段
        if is_hashed:
            auth_data["password_hashed"] = password  # 直接传递已加密密码
        else:
            auth_data["password"] = password  # 传递明文密码,服务端会加密
        
        client.sync_user_to_auth(auth_data)
    
    print(f"同步完成:{len(users)} 个用户")

# 运行
asyncio.run(init_sync())

密码处理策略

模式 说明 适用场景
UNIFIED 统一初始密码 新系统接入,安全性高
CUSTOM_MAPPING 自定义映射函数 需要保留原密码时
CUSTOM_MAPPING + password_is_hashed=True 直接同步已加密密码 推荐,两系统使用相同加密方式 (bcrypt)
from huace_aigc_auth_client import PasswordMode, create_sync_config

# 统一密码模式
sync_config = create_sync_config(
    password_mode=PasswordMode.UNIFIED,
    unified_password="Abc@123456"
)

# 自定义映射模式(返回明文密码)
def password_mapper(user_data):
    return user_data.get("password", "default")

sync_config = create_sync_config(
    password_mode=PasswordMode.CUSTOM_MAPPING,
    password_mapper=password_mapper
)

# 直接同步已加密密码(推荐,适用于两系统使用相同的 bcrypt 加密)
def hashed_password_mapper(user_data):
    return user_data.get("hashed_password", "")

sync_config = create_sync_config(
    password_mode=PasswordMode.CUSTOM_MAPPING,
    password_mapper=hashed_password_mapper,
    password_is_hashed=True  # 标记返回的是已加密密码
)

注意:当 password_is_hashed=True 时,SDK 会使用 password_hashed 字段传递到服务端, 服务端会直接使用该哈希值而不再重新加密。

同步方向配置

from huace_aigc_auth_client import SyncDirection, create_sync_config

# 仅从 auth 同步到旧系统(默认)
sync_config = create_sync_config(
    direction=SyncDirection.AUTH_TO_LEGACY
)

# 仅从旧系统同步到 auth(初始化用)
sync_config = create_sync_config(
    direction=SyncDirection.LEGACY_TO_AUTH
)

# 双向同步(需谨慎使用)
sync_config = create_sync_config(
    direction=SyncDirection.BIDIRECTIONAL
)

高级功能

API 统计收集

SDK 提供了接口统计收集功能,自动收集和上报 API 调用数据到鉴权中心。

1. 初始化统计收集器

from huace_aigc_auth_client import init_api_stats_collector

# 仅仅示例,SDK 接入鉴权会默认启用数据收集功能(可以手动传参关闭)
# 在应用启动时初始化
init_api_stats_collector(
    api_url='http://auth.example.com/api/sdk',
    app_id='your-app-id',
    app_secret='your-app-secret',
    batch_size=10,          # 批量提交大小
    flush_interval=5.0,     # 刷新间隔(秒)
    enabled=True            # 是否启用
)

2. 在中间件中自动收集

FastAPI 示例:

from fastapi import FastAPI, Request
from huace_aigc_auth_client import collect_api_stat
import time

app = FastAPI()


# 仅仅示例,SDK 内默认会启用数据收集未 exclude 的接口(可以手动传参关闭)
@app.middleware("http")
async def monitor_middleware(request: Request, call_next):
    start_time = time.time()

    try:
        response = await call_next(request)
        response_time = time.time() - start_time

        # 从请求上下文获取 token
        token = request.state.user_info.token if hasattr(request.state, 'user_info') else None

        # 收集统计(会自动过滤 3xx 重定向请求)
        collect_api_stat(
            api_path=request.url.path,
            api_method=request.method,
            status_code=response.status_code,
            response_time=response_time,
            token=token,
            request_params={
                'headers': dict(request.headers),
                'query_params': dict(request.query_params)
            },
            response_headers=dict(response.headers)  # 上游 API 响应头
        )

        return response
    except Exception as e:
        response_time = time.time() - start_time
        collect_api_stat(
            api_path=request.url.path,
            api_method=request.method,
            status_code=500,
            response_time=response_time,
            token=token,
            error_message=str(e)
        )
        raise

@app.on_event("shutdown")
async def shutdown_event():
    from huace_aigc_auth_client import stop_api_stats_collector
    stop_api_stats_collector()

特性说明:

  • 异步队列收集,不阻塞主流程
  • 批量提交,减少网络开销
  • 自动过滤 3xx 重定向请求
  • 按 token 分组提交
  • 静默失败,不影响主业务

3. response_headers 说明

collect_api_statresponse_headers 参数用于传入上游 API 的响应头,鉴权中心会根据这些响应头自动创建计费规则:

响应头 说明 示例
X-Match-Dynamic 是否动态计费,值为 1 时创建 dynamic 规则 1
X-Match-Cost 单次调用费用(元),配合 X-Match-Dynamic=1 使用 0.0035
X-Match-Path 建议的规范化路径(可选,不传则自动识别) /api/chat/completions
X-Match-Rule 建议的规则名称(可选,不传则自动生成) GPT-4o 对话

规则创建逻辑:

  • 如果 X-Match-Dynamic=1 → 创建动态计费规则(费用来自每次上报的 cost)
  • 如果 X-Match-Dynamic 不存在或不为 1 → 创建静态计费规则(费用为 0)
  • 如果已有规则匹配该路径 → 直接使用已有规则,不重复创建

4. X-Match-Username 说明

对于未登录用户的异步任务上报场景,可以在 request_params.headers 中添加 X-Match-Username,鉴权中心会根据此 header 关联用户:

collect_api_stat(
    api_path='/api/task/run',
    api_method='POST',
    status_code=200,
    response_time=1.5,
    token=token,
    request_params={
        'headers': {
            'X-Match-Username': 'zhangsan',  # 用于未登录用户的身份标识
        }
    }
)

使用 auth_request / AuthSession 发起的请求会自动从 request_context 中提取 username 并注入 X-Match-Username,无需手动添加。


认证请求封装

SDK 提供了 auth_request 函数,封装标准的 HTTP 请求,自动添加认证信息并上报统计。

1. 基本使用

from huace_aigc_auth_client import auth_request, set_request_context

# 设置请求上下文(通常在中间件中完成)
set_request_context(
    app_id='your-app-id',
    app_secret='your-app-secret',
    token='user-access-token',
    ip_address='192.168.1.1',
    user_agent='Mozilla/5.0...',
    trace_id='trace-123'
)

# 发起请求(会自动添加认证信息)
response = auth_request('GET', 'https://api.example.com/data')
print(response.json())

2. 带参数的请求

# GET 请求带查询参数
response = auth_request(
    'GET',
    'https://api.example.com/users',
    params={'page': 1, 'size': 10}
)

# POST 请求带 JSON 数据
response = auth_request(
    'POST',
    'https://api.example.com/users',
    json={'name': 'John', 'email': 'john@example.com'}
)

# POST 请求带表单数据
response = auth_request(
    'POST',
    'https://api.example.com/upload',
    data={'field1': 'value1', 'field2': 'value2'}
)

3. 异步请求(httpx)

需要安装 httpx:pip install httpx

from huace_aigc_auth_client import async_auth_httpx_request, AsyncAuthClient
import asyncio

# 方式1:使用函数
async def example1():
    response = await async_auth_httpx_request('GET', 'https://api.example.com/data')
    print(response.json())

# 方式2:使用 AsyncAuthClient(推荐用于多个请求)
async def example2():
    async with AsyncAuthClient(timeout=10.0) as client:
        # POST 请求
        response = await client.post(
            'https://api.example.com/users',
            json={'name': 'John'}
        )
        
        # GET 请求
        response = await client.get(
            'https://api.example.com/users',
            params={'page': 1}
        )

asyncio.run(example1())

自动添加的请求头:

  • X-App-ID: 应用ID
  • X-App-Secret: 应用密钥
  • Authorization: Bearer token
  • x-real-ip: 客户端真实IP
  • user-agent: User Agent
  • X-Trace-ID: 追踪ID

用户上下文管理

SDK 提供了完整的用户上下文管理功能,支持同步和异步环境。

1. 设置和获取用户信息

from huace_aigc_auth_client import (
    set_current_user, 
    get_current_user,
    get_current_user_id,
    get_current_username,
    is_current_user_admin
)

# 在中间件中设置
set_current_user({
    'user_id': 123,
    'username': 'john',
    'app_id': 1,
    'app_code': 'myapp',
    'token': 'xxx',
    'roles': ['admin'],
    'permissions': ['user:read', 'user:write'],
    'is_admin': True
})

# 在业务代码中获取
def some_business_logic():
    user = get_current_user()
    user_id = get_current_user_id()
    username = get_current_username()
    
    if user:
        print(f"当前用户: {username} ({user_id})")
        
    if is_current_user_admin():
        print("当前用户是管理员")

2. 请求上下文管理

from huace_aigc_auth_client import (
    set_request_context,
    get_request_context,
    get_client_ip,
    get_user_agent,
    get_trace_id
)

# 设置请求上下文
set_request_context(
    ip_address='192.168.1.1',
    user_agent='Mozilla/5.0...',
    trace_id='trace-123',
    app_id='your-app-id',
    app_secret='your-app-secret',
    token='user-token'
)

# 获取请求上下文信息
ctx = get_request_context()
ip = get_client_ip()
ua = get_user_agent()
trace = get_trace_id()

3. 在 FastAPI 中集成

from fastapi import FastAPI, Request, Depends
from huace_aigc_auth_client import (
    set_current_user,
    set_request_context,
    get_current_user,
    clear_current_user,
    clear_request_context
)

app = FastAPI()

@app.middleware("http")
async def user_context_middleware(request: Request, call_next):
    # 从认证中获取用户信息
    user_info = request.state.user_info if hasattr(request.state, 'user_info') else None
    
    if user_info:
        # 设置用户上下文
        set_current_user({
            'user_id': user_info.id,
            'username': user_info.username,
            'app_id': getattr(user_info, 'app_id', None),
            'is_admin': user_info.is_admin
        })
        
        # 设置请求上下文
        set_request_context(
            ip_address=request.client.host,
            user_agent=request.headers.get('user-agent'),
            trace_id=request.headers.get('x-trace-id'),
            token=request.headers.get('authorization', '').replace('Bearer ', '')
        )
    
    try:
        response = await call_next(request)
        return response
    finally:
        clear_current_user()
        clear_request_context()

# 在任意地方使用
def my_service_function():
    user = get_current_user()
    if user:
        print(f"执行操作的用户: {user['username']}")

4. 装饰器中使用

from functools import wraps
from huace_aigc_auth_client import is_current_user_admin

def require_admin(func):
    @wraps(func)
    async def wrapper(*args, **kwargs):
        if not is_current_user_admin():
            raise HTTPException(status_code=403, detail="需要管理员权限")
        return await func(*args, **kwargs)
    return wrapper

@require_admin
async def admin_endpoint():
    return {"message": "管理员操作"}

特性说明:

  • 使用 threading.local()contextvars.ContextVar 实现
  • 同时支持同步和异步环境
  • 线程安全和异步任务隔离
  • 无需层层传递参数

导出清单

from huace_aigc_auth_client import (
    # 核心类
    AigcAuthClient,
    require_auth,
    AuthMiddleware,
    UserInfo,
    TokenVerifyResult,
    AccessKeyVerifyResult,
    AigcAuthError,
    ServiceRegistrationResult,
    create_fastapi_auth_dependency,
    
    # 旧系统接入
    LegacySystemAdapter,
    LegacyUserData,
    SyncConfig,
    SyncDirection,
    PasswordMode,
    FieldMapping,
    UserSyncService,
    WebhookSender,
    SyncResult,
    create_sync_config,
    create_default_field_mappings,
    
    # Webhook 接收
    register_webhook_router,
    verify_webhook_signature,
    
    # API 统计收集
    ApiStatsCollector,
    init_api_stats_collector,
    get_api_stats_collector,
    stop_api_stats_collector,
    collect_api_stat,
    
    # 认证请求封装
    auth_request,
    async_auth_httpx_request,
    AsyncAuthClient,
    
    # 用户上下文管理
    set_current_user,
    get_current_user,
    get_current_user_id,
    get_current_username,
    get_current_app_id,
    get_current_app_code,
    is_current_user_admin,
    clear_current_user,
    set_request_context,
    get_request_context,
    get_client_ip,
    get_user_agent,
    get_request_id,
    get_trace_id,
    get_request_token,
    clear_request_context,
)

---5 (2026-01-15)

新增功能

  1. Webhook 接收功能
    • 新增 register_webhook_router() 函数,快速注册 webhook 接收路由
    • 自动处理签名验证、请求解析和错误处理
    • 支持自定义前缀、密钥环境变量和标签
    • 新增 verify_webhook_signature() 工具函数

使用示例

from fastapi import APIRouter
from huace_aigc_auth_client import register_webhook_router

api_router = APIRouter()

async def my_handler(data: dict) -> dict:
    event = data.get("event")
    # 处理逻辑...
    return {"status": "ok"}

# 注册 webhook 路由
register_webhook_router(api_router, my_handler)

API 变更日志

v1.1.50 (2026-06-02)

新增功能

  1. 服务注册中心(Service Registry)

    • 新增 register_service() —— 服务启动时注册,自动检测本机所有非回环 IP(ifconfig / ip addr 兜底),也支持手动指定
    • 新增 service_heartbeat() —— 上报心跳并 Pull 拉取最新全局配置(返回 {config, config_version}
    • 新增 unregister_service() —— 服务关闭时注销实例
    • 新增 start_service() 一站式 API:自动生成 instance_id、自动检测 IP、调用注册、启动后台心跳守护线程(默认 60s 间隔,可配 heartbeat_interval
    • 新增 stop_service() / stop_all_services() —— 停止心跳线程并注销实例
    • 新增 auto_register_from_env() —— 一键读取环境变量并完成注册 + 心跳
    • 新增 get_service_config(name, environment) —— 从心跳缓存中获取最新配置,无需额外 API 调用
    • 心跳响应可通过 on_config_change(config, version) 回调监听配置变更
    • 后端超 180 秒未心跳的实例会被标记为 offline
  2. 环境变量驱动 + 自动注册

    • AigcAuthClient.__init__ 新增 auto_register_service 参数
    • 当环境变量 AIGC_SERVICE_NAME 存在时,构造客户端即自动注册并启动心跳,业务代码无需任何额外调用
    • 所有注册参数(端口、IP、标签、类型、环境、心跳间隔等)均可通过环境变量配置,遵循"代码只有一份"原则
  3. 稳定的实例 ID(重启可复用)

    • 默认 instance_id = "{hostname}:{port}",进程重启后保持一致 → 复用已有实例记录,不会产生孤儿
    • 同主机多实例:通过不同端口自然区分
    • 容器/集群场景:可显式设置 AIGC_SERVICE_INSTANCE_ID 覆盖
  4. 新增导出

    • ServiceRegistrationResult 数据类(含 service_id / instance_id / config / config_version

服务唯一性模型

服务注册采用两层模型,平台编辑服务信息不会与 SDK 上报冲突:

层级 唯一键 谁可以改哪些字段
Service(逻辑服务) app_id + name + environment 平台可改:昵称、描述、类型、标签、配置、状态
ServiceInstance(进程实例) service_id + instance_id SDK 上报:IP、端口、心跳、metadata

SDK 注册是幂等的(get_or_create_service):服务已存在时不会覆盖平台已编辑的字段,仅创建/更新本实例自己的实例记录。

环境变量

# === 鉴权(必需)===
AIGC_AUTH_APP_ID=your_app_id
AIGC_AUTH_APP_SECRET=your_app_secret
AIGC_AUTH_BASE_URL=https://auth.example.com

# === 服务注册(设置 AIGC_SERVICE_NAME 即启用自动注册)===
AIGC_SERVICE_NAME=my-service          # 必需;不设则不自动注册
AIGC_SERVICE_TYPE=website             # website / plugin / mcp / other(默认 other)
AIGC_SERVICE_ENV=pro                  # test / pre / pro(默认 pro)
AIGC_SERVICE_PORT=8000                # 服务对外端口
AIGC_SERVICE_IPS=10.0.0.1,10.0.0.2    # 逗号分隔,不设则自动检测
AIGC_SERVICE_TAGS=core,user-facing    # 逗号分隔
AIGC_SERVICE_DESCRIPTION=用户中心主服务
AIGC_SERVICE_INSTANCE_ID=my-host:8000 # 不设则从 hostname:port 派生(推荐)
AIGC_SERVICE_HEARTBEAT_INTERVAL=60    # 默认 60 秒

使用示例

from huace_aigc_auth_client import AigcAuthClient

# ===== 方式 1:环境变量自动注册(推荐)=====
# 只要在环境变量里设置好 AIGC_SERVICE_NAME 等参数,下面这一行就完成了
# 注册 + 后台心跳 + 配置 Pull,业务代码完全无感
client = AigcAuthClient()

# 应用退出时(可选,进程退出也会被服务端心跳超时检测到)
client.stop_all_services()

# 获取服务最新配置(从心跳缓存读取,无需额外请求)
# 不传参数时自动从环境变量 AIGC_SERVICE_NAME / AIGC_SERVICE_ENV 匹配
config = client.get_service_config()
db_url = config.get("DATABASE_URL")
log_level = config.get("LOG_LEVEL", "INFO")

# 多服务场景下显式指定
config = client.get_service_config("my-service", "pro")


# ===== 方式 2:显式调用 start_service(仍可用)=====
client = AigcAuthClient(auto_register_service=False)  # 禁用构造期自动注册

result = client.start_service(
    name="my-service",
    service_type="website",
    environment="pro",
    port=8000,
    tags=["core", "user-facing"],
    description="用户中心主服务",
    heartbeat_interval=60,
    on_config_change=lambda cfg, ver: print(f"配置更新 v{ver}:", cfg),
)
print(f"已注册 service_id={result.service_id} instance_id={result.instance_id}")

client.stop_service(name="my-service", environment="pro")


# ===== 方式 3:手动模式(很少用)=====
# 自行调用 register_service + service_heartbeat + unregister_service

Docker / 容器场景

在容器内 get_local_ips() 拿到的可能是 172.x.x.x 容器内网 IP,外网无法访问。两种应对:

  1. 环境变量注入宿主机 IPAIGC_SERVICE_IPS=203.0.113.10
  2. 显式传入参数start_service(ips=["203.0.113.10"], ...)

容器/k8s 场景如需稳定的跨重启实例标识,建议设置 AIGC_SERVICE_INSTANCE_ID(如 Pod 名 + 端口)。

v1.1.49 (2026-05-25)

新增功能

  1. 获取当前应用用户列表
    • 新增 list_users(keyword, page, page_size) 方法
    • 返回当前应用的有效用户(含全局用户),支持按用户名/昵称模糊搜索和分页
    • 仅返回 status = "active" 且未删除的用户

使用示例

# 获取第一页用户
result = client.list_users()
print(f"总数: {result['total']}")
for user in result["items"]:
    print(f"{user['id']}: {user['username']} ({user['nickname']})")

# 搜索用户
result = client.list_users(keyword="张", page=1, page_size=10)

# 分页获取
result = client.list_users(page=2, page_size=50)

v1.1.47 (2026-05-18)

新增功能

  1. response_headers 上游 API 响应头采集

    • collect_api_stat() / collector.collect() 新增 response_headers 参数
    • 鉴权中心根据 X-Match-DynamicX-Match-CostX-Match-PathX-Match-Rule 响应头自动创建计费规则
    • auth_request / AuthSession / AsyncAuthClient 自动从上游 response 提取响应头
  2. X-Match-Username 用户标识

    • request_params.headers 支持传入 X-Match-Username,用于未登录用户的异步任务上报场景
    • auth_request / AuthSession 自动从 request_context.username 注入此 header
    • 或者通过 request_context 上下文环境 set username,也能动态注入用户信息
  3. 自动创建 dynamic 计费规则

    • X-Match-Dynamic=1 且无匹配规则时,自动创建动态计费规则
    • 费用优先级:cost 字段 > X-Match-Cost 响应头

使用示例

手动示例: PS: 前提是全局开启了上报,会创建全局采集器,否则需要自行创建。

# 手动初始化全局采集器,如果初始化网关默认采集器构建,关闭才需要手动创建
try:
    from ..api_stats_collector import init_api_stats_collector, get_api_stats_collector
    
    # 检查是否已经初始化
    if get_api_stats_collector() is not None:
        return
    
    init_api_stats_collector(
        api_url=f"{self.client.base_url}/sdk",
        app_id=self.client.app_id,
        app_secret=self.client.app_secret,
        batch_size=10,
        flush_interval=5.0,
        enabled=True
    )
except Exception as e:
    logger.warning(f"初始化统计收集器失败: {e}")

# 方式一:手动上报,直接传入 response_headers
collect_api_stat(
    api_path='/api/chat/completions',
    api_method='POST',
    status_code=200,
    response_time=1.2,
    token=token,
    response_headers={
        'X-Match-Dynamic': '1',
        'X-Match-Cost': '0.0035',
        'X-Match-Path': '/api/chat/completions',
        'X-Match-Rule': 'GPT-4o 对话'
    }
)

# 方式二:通过 auth_request 自动采集(推荐)
response = auth_request('POST', 'https://api.openai.com/v1/chat/completions', json={...})
# response.headers 会被自动提取并上报

自动示例:

使用 AOP 全局拦截,注入 response header 和 request header 相关自定义动态匹配规则的参数。
注入和识别方式都是完全个性化的,自行处理。

v1.1.42 (2026-04-28)

新增功能

  1. 批量获取用户信息(按用户名)

    • 新增 batch_get_users(usernames) 方法
    • 通过用户名列表批量查询用户,返回属于当前应用或全局用户(app_idnull)的用户信息
    • 未找到的用户名会汇总到 not_found 列表
  2. 批量获取用户信息(按用户ID)

    • 新增 batch_get_users_by_ids(user_ids) 方法
    • 通过用户 ID 列表批量查询用户,按请求顺序返回结果
    • 未找到的用户返回 "未知用户" 占位信息

使用示例

# 按用户名批量查询
result = client.batch_get_users(["zhangsan", "lisi", "wangwu"])
for user in result["users"]:
    print(f"{user['username']}: {user['nickname']}")
print(f"未找到: {result['not_found']}")

# 按用户ID批量查询
result = client.batch_get_users_by_ids([1, 2, 3, 999])
for user in result["users"]:
    if user["username"] == "未知用户":
        print(f"用户 {user['id']} 不存在")
    else:
        print(f"{user['id']}: {user['username']}")

v1.1.0 (2026-01-13)

新增功能

  1. 支持直接同步已加密密码

    • SyncConfig 新增 password_is_hashed 参数
    • SyncUserRequest 新增 password_hashed 字段
    • 当两个系统使用相同的密码加密方式(bcrypt)时,可直接同步已加密密码
  2. _request 方法支持自定义 headers

    • sync_user_to_auth(user_data, headers=None) 支持传入自定义请求头
    • batch_sync_users_to_auth(users, headers=None) 支持传入自定义请求头
  3. get_password_for_sync 返回格式变更

    • 旧版:返回 str(密码字符串)
    • 新版:返回 tuple[str, bool](密码字符串, 是否已加密)

使用示例

# 直接同步已加密密码
sync_config = create_sync_config(
    password_mode=PasswordMode.CUSTOM_MAPPING,
    password_mapper=lambda user: user.get("hashed_password"),
    password_is_hashed=True,  # 新增参数
)

# SDK 调用
client.sync_user_to_auth({
    "username": "test",
    "password_hashed": "$2b$12$xxxxx...",  # 直接传递 bcrypt 哈希值
    "email": "test@example.com"
})

许可证

MIT License

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

huace_aigc_auth_client-1.1.53.tar.gz (73.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

huace_aigc_auth_client-1.1.53-py3-none-any.whl (56.0 kB view details)

Uploaded Python 3

File details

Details for the file huace_aigc_auth_client-1.1.53.tar.gz.

File metadata

  • Download URL: huace_aigc_auth_client-1.1.53.tar.gz
  • Upload date:
  • Size: 73.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.2

File hashes

Hashes for huace_aigc_auth_client-1.1.53.tar.gz
Algorithm Hash digest
SHA256 29a9befd74c4ce219507fb5cb32da351ed6128a9a059a48b3049eaf7c25aa46d
MD5 c6961501dfcbd8ab2e045f6b70573b4d
BLAKE2b-256 1601b4246f389395c66837542fffd6a2a606397000246eab28a7121d5129e93d

See more details on using hashes here.

File details

Details for the file huace_aigc_auth_client-1.1.53-py3-none-any.whl.

File metadata

File hashes

Hashes for huace_aigc_auth_client-1.1.53-py3-none-any.whl
Algorithm Hash digest
SHA256 0a4e38dc61c5f2b07ed3a62ecfbed06e3742841edee83e49cf47597613c9e8b2
MD5 f6ef3d2c0dfaf271271ba8936021e590
BLAKE2b-256 35d5826a418f9b88b88756c4bd367e0d8ed2003575a182bc24c22d4e0fec1618

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page