Skip to main content

华策AIGC Auth Client - 提供 Token 验证、用户信息获取、权限检查、旧系统接入等功能

Project description

AIGC Auth Python SDK

PyPI version Python Version

Python 后端服务接入华策 AIGC 鉴权中心的 SDK 工具包。

安装

pip install huace-aigc-auth-client

环境变量配置

在项目根目录创建 .env 文件:

# 必填:应用 ID 和密钥(在鉴权中心创建应用后获取)
AIGC_AUTH_APP_ID=your_app_id
AIGC_AUTH_APP_SECRET=your_app_secret
# 鉴权服务地址(默认为生产环境)
AIGC_AUTH_BASE_URL=your-auth-api-url-prefix

如需通过 Nginx 代理:

location /aigc-auth/ {
    proxy_pass https://aigc-auth.huacemedia.com/aigc-auth/;
    proxy_set_header Host $host;
    proxy_set_header X-Real-IP $remote_addr;
    proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
    proxy_set_header X-Forwarded-Proto $scheme;
}

快速开始

1. 初始化客户端

from huace_aigc_auth_client import AigcAuthClient

# 方式一:从环境变量读取配置
client = AigcAuthClient()

# 方式二:直接传入参数
client = AigcAuthClient(
    app_id="your_app_id",
    app_secret="your_app_secret",
    base_url="your-auth-api-url-prefix"
)

2. 验证 Token

result = client.verify_token(token)

if result.valid:
    print(f"用户 ID: {result.user_id}")
    print(f"用户名: {result.username}")
    print(f"过期时间: {result.expires_at}")
else:
    print("Token 无效")

3. 获取用户信息

from huace_aigc_auth_client import AigcAuthClient, AigcAuthError

client = AigcAuthClient()

try:
    user = client.get_user_info(token)
    
    print(f"用户名: {user.username}")
    print(f"昵称: {user.nickname}")
    print(f"邮箱: {user.email}")
    print(f"角色: {user.roles}")
    print(f"权限: {user.permissions}")
    
    # 检查角色(在 Auth 里面配置)
    if user.has_role("admin") or user.is_admin:
        print("是管理员")
    
    # 检查权限(在 Auth 里面配置)
    if user.has_permission("user:write"):
        print("有用户写权限")
        
except AigcAuthError as e:
    print(f"错误: {e.message}")

4. 批量检查权限

results = client.check_permissions(token, ["user:read", "user:write", "admin:access"])

for permission, has_permission in results.items():
    print(f"{permission}: {'✓' if has_permission else '✗'}")

FastAPI 集成

方式一:使用中间件(推荐)

from fastapi import FastAPI, Request, HTTPException
from huace_aigc_auth_client import AigcAuthClient, AuthMiddleware

app = FastAPI()

# 初始化客户端和中间件
client = AigcAuthClient()
auth_middleware = AuthMiddleware(
    client,
    exclude_paths=["/health", "/docs", "/openapi.json"],
    exclude_prefixes=["/public/"]
)

# 注册中间件
@app.middleware("http")
async def auth(request: Request, call_next):
    return await auth_middleware.fastapi_middleware(request, call_next)

# 在路由中获取用户信息
@app.get("/me")
async def get_current_user(request: Request):
    user = request.state.user_info
    return {
        "username": user.username,
        "roles": user.roles
    }

@app.get("/admin")
async def admin_only(request: Request):
    user = request.state.user_info
    if not user.has_role("admin"):
        raise HTTPException(status_code=403, detail="需要管理员权限")
    return {"message": "欢迎管理员"}

方式二:使用依赖注入

from fastapi import FastAPI, Depends, HTTPException
from huace_aigc_auth_client import AigcAuthClient, create_fastapi_auth_dependency, UserInfo

app = FastAPI()
client = AigcAuthClient()

# 创建认证依赖
get_current_user = create_fastapi_auth_dependency(client)

@app.get("/me")
async def get_me(user: UserInfo = Depends(get_current_user)):
    return {"username": user.username}

# 创建权限检查依赖
def require_permission(permission: str):
    async def check(user: UserInfo = Depends(get_current_user)):
        if not user.has_permission(permission):
            raise HTTPException(status_code=403, detail="权限不足")
        return user
    return check

@app.get("/users")
async def list_users(user: UserInfo = Depends(require_permission("user:read"))):
    return {"users": [...]}

方式三:使用装饰器

from fastapi import FastAPI, Request
from huace_aigc_auth_client import AigcAuthClient, require_auth, UserInfo

app = FastAPI()
client = AigcAuthClient()

@app.get("/protected")
@require_auth(client)
async def protected_route(request: Request, user_info: UserInfo):
    return {"user": user_info.username}

@app.get("/admin")
@require_auth(client, permissions=["admin:access"])
async def admin_route(request: Request, user_info: UserInfo):
    return {"admin": True}

# 只需要任意一个权限
@app.get("/editor")
@require_auth(client, permissions=["article:write", "article:edit"], any_permission=True)
async def editor_route(request: Request, user_info: UserInfo):
    return {"editor": True}

Flask 集成

from flask import Flask, g, jsonify
from functools import wraps
from huace_aigc_auth_client import AigcAuthClient, AuthMiddleware

app = Flask(__name__)

# 初始化客户端和中间件
client = AigcAuthClient()
auth_middleware = AuthMiddleware(
    client,
    exclude_paths=["/health", "/login"],
    exclude_prefixes=["/public/"]
)

# 注册 before_request
@app.before_request
def before_request():
    return auth_middleware.flask_before_request()

# 在路由中获取用户信息
@app.route("/me")
def get_me():
    user = g.user_info
    return jsonify({
        "username": user.username,
        "roles": user.roles
    })

# 权限检查装饰器
def require_permission(permission):
    def decorator(f):
        @wraps(f)
        def decorated_function(*args, **kwargs):
            user = g.user_info
            if not user.has_permission(permission):
                return jsonify({"error": "权限不足"}), 403
            return f(*args, **kwargs)
        return decorated_function
    return decorator

@app.route("/admin")
@require_permission("admin:access")
def admin_only():
    return jsonify({"message": "欢迎管理员"})

上下文管理

SDK 提供了上下文管理功能,可以在任意位置(Service、Utility 等)获取当前请求的用户信息,无需层层传递参数。 支持 Flask 和 FastAPI,兼容同步和异步环境。

注意:使用此功能前必须先注册 AuthMiddleware

from huace_aigc_auth_client import get_current_user

def do_something_logic():
    # 获取当前用户信息(返回字典,非 UserInfo 对象)
    user = get_current_user()
    
    if user:
        print(f"当前操作用户ID: {user['id']}")
        print(f"当前操作用户名: {user['username']}")
        
        # 也可以获取 app_id 等信息(如果未过滤)
        if 'app_id' in user:
            print(f"应用ID: {user['app_id']}")
    else:
        print("无用户上下文")

API 参考

UserInfo 对象

属性 类型 说明
id int 用户 ID
username str 用户名
nickname str 昵称
email str 邮箱
phone str 手机号
avatar str 头像 URL
roles List[str] 角色代码列表
permissions List[str] 权限代码列表
department str 部门
company str 公司
方法 说明
has_role(role) 检查是否拥有指定角色
has_permission(permission) 检查是否拥有指定权限
has_any_permission(permissions) 检查是否拥有任意一个权限
has_all_permissions(permissions) 检查是否拥有所有权限

异常处理

from huace_aigc_auth_client import AigcAuthClient, AigcAuthError

client = AigcAuthClient()

try:
    user = client.get_user_info(token)
except AigcAuthError as e:
    print(f"错误码: {e.code}")
    print(f"错误信息: {e.message}")
错误码 说明
401 未授权(Token 无效或已过期)
403 禁止访问(用户被禁用或权限不足)
404 用户不存在
-1 网络请求失败

旧系统接入(用户同步)

如果你的系统已有用户表,可通过 SDK 提供的「旧系统适配器」实现低成本接入,无需修改历史代码和表结构

接入原理

┌─────────────────┐     ┌─────────────────┐     ┌─────────────────┐
│   aigc-auth     │────▶│   SDK 同步层     │────▶│   旧系统         │
│   (鉴权中心)     │     │   (字段映射)     │     │   (用户表)       │
└─────────────────┘     └─────────────────┘     └─────────────────┘
        │                       │                       │
        │   1. 初始化同步        │                       │
        │◀──────────────────────────────────────────────│
        │   (批量同步旧用户到 auth)                       │
        │                       │                       │
        │   2. 增量同步          │                       │
        │──────────────────────▶│──────────────────────▶│
        │   (auth 新用户自动同步到旧系统)                  │
        │                       │                       │
        │   3. Webhook 推送      │                       │
        │──────────────────────────────────────────────▶│
        │   (用户变更主动通知)                            │

快速开始

1. 配置环境变量

# .env 文件
AIGC_AUTH_APP_ID=your_app_id
AIGC_AUTH_APP_SECRET=your_app_secret
AIGC_AUTH_BASE_URL=your-auth-api-url-prefix

# 同步配置
AIGC_AUTH_SYNC_ENABLED=true
AIGC_AUTH_SYNC_PASSWORD=通用密码
AIGC_AUTH_WEBHOOK_URL=https://your-domain.com/api/v1/webhook/auth
AIGC_AUTH_WEBHOOK_SECRET=your_secret

2. 创建字段映射

from huace_aigc_auth_client import (
    FieldMapping,
    SyncConfig,
    PasswordMode,
    create_sync_config
)

# 定义字段映射
field_mappings = [
    FieldMapping(
        auth_field="username",
        legacy_field="username",
        required=True
    ),
    FieldMapping(
        auth_field="email",
        legacy_field="email"
    ),
    FieldMapping(
        auth_field="nickname",
        legacy_field="nickname"
    ),
    # 状态字段转换:auth 的 status 映射到旧系统的 is_active
    FieldMapping(
        auth_field="status",
        legacy_field="is_active",
        transform_to_legacy=lambda s: s == "active",
        transform_to_auth=lambda a: "active" if a else "disabled"
    ),
    # 角色字段转换:auth 的 roles 列表映射到旧系统的 role 字符串
    FieldMapping(
        auth_field="roles",
        legacy_field="role",
        transform_to_legacy=lambda r: r[0] if r else "viewer",
        transform_to_auth=lambda r: [r] if r else ["viewer"]
    ),
]

# 创建同步配置
sync_config = create_sync_config(
    field_mappings=field_mappings,
    password_mode=PasswordMode.UNIFIED,
    unified_password="通用密码",
    webhook_url="https://your-domain.com/api/v1/webhook/auth",
)

3. 实现旧系统适配器

from huace_aigc_auth_client import LegacySystemAdapter, LegacyUserData

class MyLegacyAdapter(LegacySystemAdapter):
    """实现与旧系统用户表的交互"""
    
    def __init__(self, db, sync_config, auth_client=None):
        super().__init__(sync_config, auth_client)
        self.db = db
    
    def get_user_by_unique_field(self, username: str):
        """通过用户名获取旧系统用户"""
        user = self.db.query(User).filter(User.username == username).first()
        if not user:
            return None
        return LegacyUserData({
            "id": user.id,
            "username": user.username,
            "email": user.email,
            # ... 其他字段
        })

    async def get_user_by_username_async(self, username: str) -> Optional[LegacyUserData]:
        """异步通过用户名获取旧系统用户"""
        user = await self.db.execute(
            select(User).where(User.username == username)
        )
        user = user.scalars().first()
        if not user:
            return None
        return LegacyUserData({
            "id": user.id,
            "username": user.username,
            "email": user.email,
            # ... 其他字段
        })
    
    async def _create_user_async(self, user_data: Dict[str, Any]) -> Optional[Any]:
        """在旧系统创建用户"""
        user = User(
            username=user_data["username"],
            email=user_data.get("email"),
            hashed_password=hash_password(user_data["password"]),
            # ... 其他字段
        )
        self.db.add(user)
        await self.db.commit()
        return user.id
    
    async def _update_user_async(self, username: str, user_data: Dict[str, Any]) -> bool:
        """异步更新旧系统用户"""
        user = await self.db.execute(
            select(User).where(User.username == username)
        )
        user = user.scalars().first()
        if user:
            for key, value in user_data.items():
                setattr(user, key, value)
            await self.db.commit()
            return True
        return False

    async def _delete_user_async(self, username: str) -> bool:
        """异步删除旧系统用户"""
        user = await self.db.execute(
            select(User).where(User.username == username)
        )
        user = user.scalars().first()
        if user:
            await self.db.delete(user) 
            # 或者软删除:user.is_active = False
            await self.db.commit()
            return True
        return False
    
    async def get_all_users_async(self) -> List[LegacyUserData]:
        """异步获取所有用户(用于初始化同步)"""
        result = await self.db.execute(select(User))
        users = result.scalars().all()
        return [LegacyUserData({"username": u.username, ...}) for u in users]

4. 集成到登录流程

from huace_aigc_auth_client import AigcAuthClient, UserSyncService

client = AigcAuthClient()
adapter = MyLegacyAdapter(db, sync_config, client)
sync_service = UserSyncService(client, adapter)

# 在获取用户信息后调用同步
@app.middleware("http")
async def auth_middleware(request, call_next):
    response = await auth.fastapi_middleware(request, call_next)
    
    # 登录成功后,同步用户到旧系统
    if hasattr(request.state, "user_info"):
        user_info = request.state.user_info
        await sync_service.sync_on_login_async(user_info)
    
    return response

5. 添加 Webhook 接收端点

推荐方式:使用 SDK 提供的通用 Webhook 路由

from fastapi import APIRouter
from huace_aigc_auth_client import register_webhook_router

api_router = APIRouter()
client = AigcAuthClient()
adapter = MyLegacyAdapter(db, sync_config, client)

# 定义 webhook 处理函数
async def handle_auth_webhook(request_data: Dict[str, Any]) -> Dict[str, Any]:
    """
    独立的 webhook 处理函数(用于 SDK 集成)
    
    这个函数不需要 db 参数,会在内部创建数据库会话。
    适用于通过 SDK 的 register_webhook_router 注册。
    """
    from app.db.session import get_db
    
    async for db in get_db():
        try:
            event = request_data.get("event")
            data = request_data.get("data", {})
            logger.info(f"Received webhook event: {event} with data: {data}")
            
            # 调用适配器的 handle_webhook 方法
            return await adapter.handle_webhook(event, data)
        except Exception as e:
            logger.exception(f"Failed to handle webhook: {e}")
            raise

# 注册 webhook 路由(自动处理签名验证)
register_webhook_router(
    api_router,
    handler=handle_auth_webhook,
    prefix="/webhook",  # 可选,默认 "/webhook"
    secret_env_key="aigc-auth-webhook-secret",  # 可选,在 auth 后台配置
    tags=["Webhook"]  # 可选,默认 ["Webhook"]
)

# webhook 端点将自动创建在: /webhook/auth

Webhook 签名验证说明

SDK 的 register_webhook_router 会自动处理签名验证:

  • 从请求头 X-Webhook-Signature 获取签名
  • 从环境变量读取密钥(默认 AIGC_AUTH_WEBHOOK_SECRET
  • 使用 HMAC-SHA256 算法验证签名
  • 签名不匹配时返回 401 错误

6. 执行初始化同步

# 一次性脚本:将旧系统用户同步到 aigc-auth
async def init_sync():
    adapter = MyLegacyAdapter(db, sync_config, client)
    users = adapter.get_all_users()
    
    for user in users:
        auth_data = adapter.transform_legacy_to_auth(user)
        
        # 获取密码(支持元组返回格式)
        password_result = adapter.get_password_for_sync(user)
        if isinstance(password_result, tuple):
            password, is_hashed = password_result
        else:
            password, is_hashed = password_result, False
        
        # 根据是否已加密选择不同的字段
        if is_hashed:
            auth_data["password_hashed"] = password  # 直接传递已加密密码
        else:
            auth_data["password"] = password  # 传递明文密码,服务端会加密
        
        client.sync_user_to_auth(auth_data)
    
    print(f"同步完成:{len(users)} 个用户")

# 运行
asyncio.run(init_sync())

密码处理策略

模式 说明 适用场景
UNIFIED 统一初始密码 新系统接入,安全性高
CUSTOM_MAPPING 自定义映射函数 需要保留原密码时
CUSTOM_MAPPING + password_is_hashed=True 直接同步已加密密码 推荐,两系统使用相同加密方式 (bcrypt)
from huace_aigc_auth_client import PasswordMode, create_sync_config

# 统一密码模式
sync_config = create_sync_config(
    password_mode=PasswordMode.UNIFIED,
    unified_password="Abc@123456"
)

# 自定义映射模式(返回明文密码)
def password_mapper(user_data):
    return user_data.get("password", "default")

sync_config = create_sync_config(
    password_mode=PasswordMode.CUSTOM_MAPPING,
    password_mapper=password_mapper
)

# 直接同步已加密密码(推荐,适用于两系统使用相同的 bcrypt 加密)
def hashed_password_mapper(user_data):
    return user_data.get("hashed_password", "")

sync_config = create_sync_config(
    password_mode=PasswordMode.CUSTOM_MAPPING,
    password_mapper=hashed_password_mapper,
    password_is_hashed=True  # 标记返回的是已加密密码
)

注意:当 password_is_hashed=True 时,SDK 会使用 password_hashed 字段传递到服务端, 服务端会直接使用该哈希值而不再重新加密。

同步方向配置

from huace_aigc_auth_client import SyncDirection, create_sync_config

# 仅从 auth 同步到旧系统(默认)
sync_config = create_sync_config(
    direction=SyncDirection.AUTH_TO_LEGACY
)

# 仅从旧系统同步到 auth(初始化用)
sync_config = create_sync_config(
    direction=SyncDirection.LEGACY_TO_AUTH
)

# 双向同步(需谨慎使用)
sync_config = create_sync_config(
    direction=SyncDirection.BIDIRECTIONAL
)

导出清单

from huace_aigc_auth_client import (
    # 核心类
    AigcAuthClient,
    require_auth,
    AuthMiddleware,
    UserInfo,
    TokenVerifyResult,
    AigcAuthError,
    create_fastapi_auth_dependency,
    
    # 旧系统接入
    LegacySystemAdapter,
    LegacyUserData,
    SyncConfig,
    SyncDirection,
    PasswordMode,
    FieldMapping,
    UserSyncService,
    WebhookSender,
    SyncResult,
    create_sync_config,
    create_default_field_mappings,
    
    # Webhook 接收
    register_webhook_router,
    verify_webhook_signature,
)

---5 (2026-01-15)

新增功能

  1. Webhook 接收功能
    • 新增 register_webhook_router() 函数,快速注册 webhook 接收路由
    • 自动处理签名验证、请求解析和错误处理
    • 支持自定义前缀、密钥环境变量和标签
    • 新增 verify_webhook_signature() 工具函数

使用示例

from fastapi import APIRouter
from huace_aigc_auth_client import register_webhook_router

api_router = APIRouter()

async def my_handler(data: dict) -> dict:
    event = data.get("event")
    # 处理逻辑...
    return {"status": "ok"}

# 注册 webhook 路由
register_webhook_router(api_router, my_handler)

v1.1.

API 变更日志

v1.1.0 (2026-01-13)

新增功能

  1. 支持直接同步已加密密码

    • SyncConfig 新增 password_is_hashed 参数
    • SyncUserRequest 新增 password_hashed 字段
    • 当两个系统使用相同的密码加密方式(bcrypt)时,可直接同步已加密密码
  2. _request 方法支持自定义 headers

    • sync_user_to_auth(user_data, headers=None) 支持传入自定义请求头
    • batch_sync_users_to_auth(users, headers=None) 支持传入自定义请求头
  3. get_password_for_sync 返回格式变更

    • 旧版:返回 str(密码字符串)
    • 新版:返回 tuple[str, bool](密码字符串, 是否已加密)

使用示例

# 直接同步已加密密码
sync_config = create_sync_config(
    password_mode=PasswordMode.CUSTOM_MAPPING,
    password_mapper=lambda user: user.get("hashed_password"),
    password_is_hashed=True,  # 新增参数
)

# SDK 调用
client.sync_user_to_auth({
    "username": "test",
    "password_hashed": "$2b$12$xxxxx...",  # 直接传递 bcrypt 哈希值
    "email": "test@example.com"
})

许可证

MIT License

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

huace_aigc_auth_client-1.1.30.tar.gz (39.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

huace_aigc_auth_client-1.1.30-py3-none-any.whl (38.3 kB view details)

Uploaded Python 3

File details

Details for the file huace_aigc_auth_client-1.1.30.tar.gz.

File metadata

  • Download URL: huace_aigc_auth_client-1.1.30.tar.gz
  • Upload date:
  • Size: 39.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.7

File hashes

Hashes for huace_aigc_auth_client-1.1.30.tar.gz
Algorithm Hash digest
SHA256 af4bb3961471a1055866b89fefd21de37b1d6fbfe6816c009d1f9b7b1ecce9ce
MD5 9f5c58322251e357fbd8183bcf9472a6
BLAKE2b-256 cd88c76acd7d0da8c6436ca239351766c613ad61ec5edb9083d5a0335df1d147

See more details on using hashes here.

File details

Details for the file huace_aigc_auth_client-1.1.30-py3-none-any.whl.

File metadata

File hashes

Hashes for huace_aigc_auth_client-1.1.30-py3-none-any.whl
Algorithm Hash digest
SHA256 ae6bbfbf931888f6a32d203f844539231271e3daabdb099d711efeea67391ab3
MD5 1486578463b433595e853dee90258bd6
BLAKE2b-256 04ec78f1786114abe892f55533fc0dd5965a35d1e719dd76c1c688d9c246c04e

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page