Skip to main content

一个python工具包,包括自动配置系统,常用工具包,控制台输出系统,日志系统,命名系统,缓存系统,返回值封装类

Project description

PyNomad

重要说明:本框架大部分代码由 AI 生成,README.md 文档完全由 AI 生成,可能存在部分说明错误。

具体使用方式可以参见 pynomad 项目的测试用例,仓库地址:https://gitee.com/nomadicooer/pynomad.git

也可以参见 findata 项目实际项目使用示例:https://gitee.com/nomadicooer/findata.git

关于配置 Schema

自动配置生成了部分 schema,可以在 settings.toml 文件中引用:

#:schema https://gitee.com/nomadicooer/schema/raw/master/settings.schema.json

注意:schema 文件对 logging 配置支持不完善,使用 #:schema url 方式引用时可能遇到问题。建议安装 Even Better TOML 插件以获得更好的支持。

本项目开发主要使用 VSCode,由于个人时间原因,代码更新频率可能较低,可能存在一些 bug。推荐的使用方式是下载源码,然后直接引用到项目中,如遇到 bug 请根据实际需求自行修改。

项目管理方式

项目使用 poetry 管理项目。

  • poetry 安装项目的方式

    poetry add pynomadic
    
  • poetry 引用本地项目的方式

    [tool.poetry]
    packages = [{ include = "pynomad", from = "src" }]
    

    必要时在 dependencies 中也应该引用,避免首次引用安装不了:

    dependencies = [
        "pynomadic @ file:///${PROJECT_ROOT}/../pynomad",
        "cryptography (>=46.0.5,<47.0.0)",
    ]
    

    其中 ${PROJECT_ROOT} 需要替换为你的项目根目录路径。


Python 通用工具库,提供常用工具函数、装饰器和类型定义。内置智能配置管理多级缓存系统,简化应用开发流程。

安装

pip install pynomadic

快速开始

from pynomad import dt, environ, get_logger, Result, memcached

# 日期时间工具
dt.parse_date("2024-01-15")  # date(2024, 1, 15)

# 环境信息
environ.project_dir  # 项目根目录

# 日志
logger = get_logger(__name__)
logger.info("Hello, PyNomad!")

# 缓存装饰器
@memcached
def expensive_func(x: int) -> int:
    return x * x

# 结果处理
result = Result.success(data="OK")
logger.info(result.data)  # "OK"

目录


Common - 通用工具

DateTimeUtil - 日期时间工具

智能日期时间解析工具,支持多种格式自动识别。

from pynomad import dt

# 智能识别日期格式
dt.parse_date("2024-01-15")      # date(2024, 1, 15)
dt.parse_date("2024/01/15")      # date(2024, 1, 15)
dt.parse_date("2024年01月15日")   # date(2024, 1, 15)
dt.parse_date("20240115")        # date(2024, 1, 15)

# 指定格式解析
dt.parse_date("01/15/2024", "%m/%d/%Y")

# 智能识别日期时间
dt.parse_datetime("2024-01-15 10:30:45")
dt.parse_datetime("2024/01/15 10:30:45")

# 格式化输出
dt.format_date(date(2024, 1, 15), "%Y-%m-%d")     # "2024-01-15"
dt.format_datetime(datetime.now(), "%Y-%m-%d %H:%M:%S")

# 年份边界
dt.year_start("2024-06-15")       # date(2024, 1, 1)
dt.year_end("2024-06-15")         # date(2024, 12, 31)

# 时间戳转换
dt.datetime_to_timestamp(datetime(2024, 1, 15, 10, 30, 45))  # 1705297845.0
dt.timestamp_to_datetime(1705297845)  # datetime(2024, 1, 15, 10, 30, 45)

# 获取当前时间
dt.today()

Environment - 环境信息

获取项目环境信息的单例类。

from pynomad import environ, get_logger

logger = get_logger(__name__)

# 获取项目根目录
project_dir = environ.project_dir
logger.info(project_dir)  # Path 对象,项目绝对路径

# 获取项目名称
project_name = environ.project_name
logger.info(project_name)  # 项目名称

# 获取平台信息
environ.is_windows   # 是否 Windows
environ.is_linux     # 是否 Linux
environ.is_macos     # 是否 macOS

# 获取机器码(用于设备唯一标识)
machine_code = environ.machine_code      # hex 格式,64字符
machine_code_b64 = environ.machine_code_base64  # base64 格式
logger.info(machine_code)  # "a1b2c3d4..."
logger.info(machine_code_b64)  # "abc123..."

EnhancedDict - 增强字典

支持属性访问和敏感信息脱敏的字典类。

from pynomad import EnhancedDict, get_logger

logger = get_logger(__name__)

# 创建增强字典
ed = EnhancedDict({"name": "Alice", "age": 30, "address": {"city": "Beijing"}})

# 属性访问
logger.info(ed.name)       # "Alice"
logger.info(ed.address.city)  # "Beijing"

# 嵌套访问
logger.info(ed.get_nested("address.city"))  # "Beijing"

# 敏感信息脱敏
ed["password"] = "123456"
ed["phone"] = "13800138000"
logger.info(ed.mask_sensitive())  # 脱敏后的字典

# 类型检查
ed.check_type("age", int)  # True
ed.check_type("name", str)  # True

Exceptions - 异常类

统一的异常定义,与 Result 的错误分类对应。

from pynomad import ex, get_logger

logger = get_logger(__name__)

# 客户端异常
try:
    raise ex.ClientException("用户输入的参数无效")
except ex.ClientException as e:
    logger.info(str(e))  # "用户输入的参数无效"

# 验证异常(继承自 ClientException)
try:
    raise ex.ValidationException("必填字段 name 不能为空")
except ex.ValidationException as e:
    logger.info(str(e))

# 未找到异常(继承自 ClientException)
try:
    raise ex.NotFoundException("用户 ID 12345 不存在")
except ex.ClientException as e:  # 可以用父类捕获
    logger.info(str(e))

# 冲突异常
raise ex.ConflictException("邮箱 alice@example.com 已被注册")

# 权限异常
raise ex.PermissionException("您没有权限访问此资源")

# 网络异常
raise ex.NetworkException("无法连接到服务器")

# 请求异常
raise ex.RequestException("HTTP 请求失败,状态码: 500")

# 登录异常(继承自 RequestException)
raise ex.LoginException("用户名或密码错误")

# 超时异常(继承自 NetworkException)
raise ex.TimeoutException("操作超时,等待时间超过 30 秒")

# 限流异常(继承自 NetworkException)
raise ex.RateLimitException("请求过于频繁,请稍后重试")

# 服务端异常
raise ex.ServerException("服务器内部错误")

# 第三方服务异常
raise ex.ThirdPartyException("微信 API 调用失败")

# 缓存相关异常
raise ex.CacheMissException("缓存键 'user:123' 不存在")
raise ex.CacheExpiredException("缓存项 'user:123' 已过期")
raise ex.CacheValueException("缓存值格式不正确,预期为 JSON 字符串")
raise ex.CacheTypeNotInitializedException("缓存类型无法推导,请先设置一个值")
raise ex.CacheException("序列化失败: 无法序列化对象")

# 缓存刷新需求异常(支持携带新参数重新调用)
raise ex.NeedsRefreshException(
    "部分数据缺失",
    new_args=("2024-01-01", "2024-12-31"),
    new_kwargs={"force_refresh": True}
)

异常继承层次:

  • ClientExceptionValidationException, NotFoundException, ConflictException, PermissionException, CacheMissException, CacheExpiredException, CacheValueException, CacheTypeNotInitializedException
  • NetworkExceptionTimeoutException, RateLimitException
  • RequestExceptionLoginException
  • ServerExceptionCacheException

Decorator - 装饰器

常用装饰器集合。

from pynomad import singleton, retry, thread_safe, get_logger

logger = get_logger(__name__)

# 单例装饰器
@singleton
class DatabaseConnection:
    def __init__(self):
        logger.info("初始化数据库连接")

db1 = DatabaseConnection()
db2 = DatabaseConnection()
logger.info(db1 is db2)  # True

# 重试装饰器
@retry(max_attempts=3, delay=1.0)
def fetch_data():
    # 可能失败的操作
    pass

# 线程安全装饰器
@thread_safe
class Counter:
    def __init__(self):
        self.value = 0

    def increment(self):
        self.value += 1

Network - 网络工具

网络连接检查工具。

from pynomad import check_network, stop_check_network, get_logger

logger = get_logger(__name__)

# 检查网络连接
result = check_network(host="www.google.com", port=80)
logger.info(result)  # True 或 False

# 停止后台网络检查线程
stop_check_network()

Types - 类型定义

通用类型常量。

from pynomad.common.types import Level, OutputTarget

# 日志级别
Level.TRACE    # 追踪
Level.DEBUG    # 调试
Level.INFO     # 信息
Level.WARN     # 警告
Level.ERROR    # 错误
Level.FATAL    # 致命

# 输出目标
OutputTarget.CONSOLE
OutputTarget.FILE
OutputTarget.BOTH

Naming - 命名规范

命名检查、风格检测和转换工具。

from pynomad import naming, get_logger

logger = get_logger(__name__)

# === 标识符和关键字检查 ===

# Python 标识符检查
naming.is_py_identifier("my_var")      # True
naming.is_py_identifier("123invalid")  # False

# 通用标识符检查
naming.is_identifier("valid-name")    # True

# Python 关键字检查
naming.is_py_keyword("class")        # True
naming.is_py_keyword("myvar")         # False

# 软关键字检查
naming.is_pysoftkeyword("match")     # True

# 其他语言关键字
naming.is_java_keyword("public")     # True
naming.is_ts_keyword("interface")    # True

# 通用关键字检查(所有语言)
naming.is_keyword("class")           # True
naming.is_keyword("myvar")           # False

# === 命名风格检测 ===

# 基本风格检测
naming.is_pascal("MyClass")         # True
naming.is_camel("myVariable")       # True
naming.is_snake("my_variable")      # True
naming.is_kebab("my-variable")      # True

# 短横线风格变体
naming.is_standard_kebab("my-variable")   # True
naming.is_upper_kebab("MY-VARIABLE")       # True
naming.is_pascal_kebab("My-Variable")      # True

# 蛇形风格变体
naming.is_standard_snake("my_variable")    # True
naming.is_upper_snake("MY_VARIABLE")       # True
naming.is_pascal_snake("My_Variable")      # True
naming.is_magic_snake("__magic__")         # True
naming.is_protected_snake("_protected")    # True
naming.is_private_snake("__private")      # True
naming.is_end_snake("end_")                # True

# 检测风格
naming.detect_style("MyClass")      # NameStyle.PASCAL
naming.detect_style("my_variable") # NameStyle.STANDARD_SNAKE
naming.detect_style("INVALID@NAME") # NameStyle.INVALID

# 检测可能的风格(返回多个匹配)
naming.detect_possible_styles("save")  # [NameStyle.STANDARD_SNAKE, NameStyle.STANDARD_KEBAB, ...]

# 无效命名检查
naming.is_invalid("INVALID@NAME")  # True

# === 命名转换 ===

# 转为帕斯卡命名
naming.pascalize("my_variable")    # "MyVariable"
naming.pascalize("my-variable")    # "MyVariable"
naming.pascalize("myVariable")     # "MyVariable"

# 转为驼峰命名
naming.camelize("MyClass")         # "myClass"
naming.camelize("my_variable")     # "myVariable"

# 转为短横线命名
naming.kebabize("MyClass")                     # "my-class"
naming.kebabize("my_variable")                 # "my-variable"
naming.kebabize("my_variable", to_style=naming.NameStyle.UPPER_KEBAB)  # "MY-VARIABLE"

# 转为蛇形命名
naming.snakeize("MyClass")                     # "my_class"
naming.snakeize("my-variable")                 # "my_variable"
naming.snakeize("MyClass", to_style=naming.NameStyle.UPPER_SNAKE)      # "MY_CLASS"
naming.snakeize("method", to_style=naming.NameStyle.MAGIC_SNAKE)       # "__method__"
naming.snakeize("protected", to_style=naming.NameStyle.PROTECTED_SNAKE)  # "_protected"
naming.snakeize("private", to_style=naming.NameStyle.PRIVATE_SNAKE)      # "__private"

# 从指定风格转换
naming.snakeize("MyVariable", from_style=naming.NameStyle.PASCAL)  # "my_variable"

# 异常处理
naming.snakeize("INVALID@NAME", exception=False, return_empty=True)  # ""

# === 字典转换 ===

config = {
    "UserName": "Alice",
    "UserProfile": {
        "EmailAddress": "alice@example.com",
        "PhoneNumber": "1234567890",
    },
    "UserSettings": ["Setting1", "Setting2"],
}

# 转换字典键为 snake_case
naming.snakeize_dict(config)  # {"user_name": "Alice", "user_profile": {...}, "user_settings": [...]}

Config - 配置管理

智能配置管理系统,支持环境切换和自动注入。

配置原理

环境配置读取原理

配置系统通过以下优先级加载配置:

  1. 环境后缀方式 ([dev.database]):

    • 读取 active 字段指定的环境
    • 只加载该环境下的配置项
    • 配置键为 [<环境>.<前缀>.<字段>]
  2. 环境嵌套方式 ([env.dev.database]):

    • 读取 active 字段指定的环境
    • [env.<环境>.<前缀>] 路径读取配置
    • 支持多个环境按顺序合并配置
  3. 默认配置方式 ([database]):

    • 不使用环境前缀,直接从根节点读取
    • 适用于不需要环境切换的场景

配置加载流程:

settings.toml → LoaderManager → 环境解析 → 配置字典 → 注入到配置类

自动配置原理

@inject 装饰器通过以下机制实现自动配置:

  1. 单例模式:

    • 每个配置类只创建一个实例
    • 多次调用构造函数返回同一对象
  2. 配置注入:

    • 根据 prefix 参数匹配配置键
    • 自动从配置字典读取并填充字段值
    • 支持类型转换(str → int/float/bool/Path)
  3. 嵌套配置:

    • 支持嵌套配置对象(属性也是 @inject 类)
    • 递归注入嵌套配置
  4. 配置缓存:

    • 使用 LRU 缓存配置字典
    • 避免重复读取文件

注入流程:

创建实例 → 检查单例 → 设置默认值 → 从配置字典注入值 → 返回实例

配置文件详解

默认配置文件名

配置系统支持以下默认配置文件名(按优先级从高到低):

  • pynomad / .pynomad
  • config / .config
  • setting / .setting
  • settings / .settings
  • <项目名> / .<项目名>

支持的文件格式

配置文件支持以下格式(按优先级从高到低):

  • .toml / .tom - TOML 格式(推荐)
  • .yaml / .yml - YAML 格式
  • .json - JSON 格式
  • .ini - INI 格式

默认解析路径

配置文件按以下路径顺序查找(优先级从高到低):

  1. 项目根目录

    • 项目根目录
    • config/ 子目录
    • .config/ 子目录
    • data/ 子目录
    • .data/ 子目录
    • .data/config/ 子目录
  2. 外部配置目录

    • ~/<项目名>/ - 用户目录
    • <localdata>/<项目名>/ - 本地数据目录
    • <config_home>/<项目名>/ - 配置目录
    • ~/.pynomad/ - pynomad 框架目录
    • <project_data>/pynomad/ - 项目数据目录
    • <config_home>/pynomad/ - 配置目录
  3. 内置资源

    • 项目包内的 data/config/ 目录
    • pynomad 框架包内的默认配置

配置加载优先级

不同加载器的优先级(数值越小优先级越高):

加载器类型 优先级 说明
命令行加载器 10 命令行参数(最高优先级)
环境变量加载器 19-20 环境变量
文件加载器 40-50 外部配置文件
资源加载器 sys.maxsize 项目内置资源(最低优先级)

配置合并规则

  • 相同层级:后加载的配置覆盖先加载的配置
  • 不同层级:深层配置不会被浅层配置覆盖
  • 多个环境:多个环境按顺序合并,后环境的配置覆盖先环境的配置

路径展开变量

Path 类型字段支持内置路径变量展开:

内置变量列表

变量名 说明 示例
{workspace} 项目根目录 {workspace}/cache
{projectdir} 项目根目录(同上) {projectdir}/data
{projectname} 项目名称 {projectname}.log
{homedir} 用户主目录 {homedir}/.config
{home} 用户主目录(同上) {home}/.cache
{localdata} 本地数据目录 {localdata}/app
{roamingdata} 漫游数据目录(仅 Windows) {roamingdata}/app
{config} 配置目录 {config}/myapp
{cache} 缓存目录 {cache}/temp
{state} 状态目录 {state}/app
{tmp} 临时目录 {tmp}/upload
{app} 应用安装目录 {app}/data
{env:VAR_NAME} 环境变量 {env:USER_HOME}/data

使用示例

from pynomad import inject, field

@inject(prefix="app")
class AppProperties:
    cache_dir: Path = field(default="{cache}/myapp")
    data_dir: Path = field(default="{workspace}/data")
    config_dir: Path = field(default="{config}/myapp")
[app]
cache_dir = "{cache}/myapp"
data_dir = "{workspace}/data"
config_dir = "{config}/myapp"

配置文件方式

默认配置方式

最简单的配置方式,直接在 settings.toml 中定义配置项,不需要环境配置:

[database]
host = "localhost"
port = 3306
name = "myapp"

[logging]
level.root = "INFO"
pretty = true

[app]
name = "MyApp"
version = "1.0.0"
from pynomad import inject, field, get_logger

logger = get_logger(__name__)

@inject(prefix="database")
class DatabaseProperties:
    host: str = field(default="localhost")
    port: int = field(default=3306)
    name: str = field()

db_config = DatabaseProperties()
logger.info(db_config.host)  # "localhost"
logger.info(db_config.port)  # 3306
logger.info(db_config.name)  # "myapp"

方式1:使用环境后缀

settings.toml 中定义多个环境:

environments = ["dev", "test", "prod"]
active = ["dev"]

[dev.database]
host = "localhost"
port = 3306
name = "myapp_dev"

[dev.logging]
level.root = "DEBUG"
pretty = true

[prod.database]
host = "prod.example.com"
port = 3306
name = "myapp_prod"

[prod.logging]
level.root = "INFO"
pretty = false

方式2:使用 [env.<环境名>] 嵌套

active = ["development"]

[default]
app_name = "MyApp"
version = "1.0.0"

[env.development.database]
host = "localhost"
port = 3306

[env.production.database]
host = "prod.example.com"
port = 3306

属性类使用

定义配置类并使用 @inject 装饰器自动注入配置:

from pynomad import inject, field, BuiltinProperties, get_logger

logger = get_logger(__name__)

@inject(prefix="database")
class DatabaseProperties:
    host: str = field(default="localhost")
    port: int = field(default=3306)
    name: str = field()

@inject(prefix="logging")
class LoggingProperties:
    level_root: str = field(default="INFO")
    pretty: bool = field(default=False)

# 使用配置
db_config = DatabaseProperties()
logger.info(db_config.host)  # "localhost" 或从配置文件读取
logger.info(db_config.port)  # 3306

field 参数:

# default: 默认值
field(default="localhost")

# default_factory: 默认值工厂(用于可变对象)
field(default_factory=list)

# converter: 自定义类型转换器
field(converter=lambda x: int(x))

# alias: 配置键别名(单个别名)
field(alias="db_host")

使用 field 别名示例

from pynomad import inject, field, get_logger

logger = get_logger(__name__)

@inject(prefix="database")
class DatabaseProperties:
    host: str = field(default="localhost")
    port: int = field(default=3306)
    # 配置文件使用 db_name 而不是 name
    name: str = field(alias="db_name", default="mydb")

# 配置文件中:
# [database]
# host = "localhost"
# port = 3306
# db_name = "myapp_db"  # 别名映射到 name 字段

@alias 装饰器

⚠️ 不建议使用: 建议在配置文件中直接使用与 TypedDict 字段名一致的键名,避免额外的别名映射。

为 TypedDict 定义字段别名映射(仅用于 TypedDict):

from typing import TypedDict
from pynomad import inject, field, alias, get_logger

logger = get_logger(__name__)

@alias(pynomad_logging="pynomad.logging")
class LevelTypeDict(TypedDict):
    root: str
    pynomad: str
    pynomad_logging: str

@inject(prefix="logging")
class LoggingProperties:
    level: LevelTypeDict = field()

# 配置文件中:
# [logging.level]
# root = "INFO"
# pynomad = "DEBUG"
# pynomad.logging = "TRACE"
# 
# 注意:配置文件使用 "pynomad.logging",但 TypedDict 字段名为 "pynomad_logging"

环境切换

⚠️ 不建议使用: 运行时切换环境可能导致配置不一致,建议在应用启动时确定环境。

from pynomad import set_active_environment, get_logger

logger = get_logger(__name__)

# 切换环境(不建议运行时切换)
set_active_environment("test")
set_active_environment("prod")

# 切换后,原有配置类引用自动更新
db_config = DatabaseProperties()
set_active_environment("prod")
logger.info(db_config.host)  # 自动更新为 production 环境配置

推荐做法: 通过配置文件的 active 字段指定环境,或者在应用启动时设置 active 环境变量。

自定义配置文件加载

⚠️ 不建议使用: 建议直接在 settings.toml 中定义所有配置,避免使用 add_file_loader 等方法。

from pynomad import add_file_loader, remove_loader, get_config
from pynomad import environ, get_logger

logger = get_logger(__name__)

# 添加自定义配置文件(不建议)
config_path = environ.project_dir / "config" / "custom.toml"
add_file_loader(config_path)

# 读取配置
config = get_config()
logger.info(config.get("custom.key"))

# 移除配置加载器
remove_loader(loader)

推荐做法: 直接在 settings.toml 中定义所有配置。

配置生成器

自动从配置文件生成配置类:

配置文件示例 (settings.toml):

[database]
host = "localhost"
port = 3306
name = "myapp"
username = "root"
password = "password"

[logging]
level.root = "INFO"
pretty = true

[app]
name = "MyApp"
version = "1.0.0"

生成配置类:

from pynomad import ConfigGenerator
from pathlib import Path

# 创建生成器
generator = ConfigGenerator(
    output_path=Path("config"),
    class_suffix="Properties",
    root_class_name="ProjectProperties",
)

# 生成配置类(从 settings.toml 自动读取)
generator.generate(class_type="dataclass")

⚠️ 注意: 配置生成器会从 settings.toml 自动读取配置,无需使用 add_file_loader

生成的配置类示例:

@inject(prefix="database")
class DatabaseProperties:
    host: str = field(default="localhost")
    port: int = field(default=3306)

@inject(prefix="logging")
class LoggingProperties:
    level_root: str = field(default="INFO")
    pretty: bool = field(default=True)

@inject()
class ProjectProperties(BuiltinProperties):
    database: DatabaseProperties = field()
    logging: LoggingProperties = field()

单例模式

@inject 装饰的配置类是单例模式:

from pynomad import inject, field, get_logger

logger = get_logger(__name__)

@inject(prefix="database")
class DatabaseProperties:
    host: str = field()

# 多次实例化返回同一对象
db1 = DatabaseProperties()
db2 = DatabaseProperties()
logger.info(db1 is db2)  # True

清除缓存

⚠️ 不建议使用: 清除配置缓存可能导致配置不一致,仅用于测试环境。

from pynomad import clear_config_cache

# 清除配置缓存(仅用于测试)
clear_config_cache()

Result - 结果处理

统一的结果处理模式,优雅的错误处理。

基本使用

from pynomad import Result, ResultAny, get_logger

logger = get_logger(__name__)

# 成功结果
result = Result.success(data={"id": 1, "name": "Alice"})
logger.info(result.data)  # {"id": 1, "name": "Alice"}
logger.info(result.msg)   # "操作成功"

# 失败结果
result = Result.fail(msg="操作失败", code=400)
logger.info(result.msg)   # "操作失败"
logger.info(result.code)  # 400

# 检查结果
if result.is_success():
    logger.info("成功!")
    logger.info(result.data)
else:
    logger.info(f"失败: {result.msg}")

# 链式处理
result = Result.success(10)
result = result.map(lambda x: x * 2)  # Result.success(20)
result = result.and_then(lambda x: Result.success(x + 5))  # Result.success(25)

错误处理

from pynomad import Result, get_logger

logger = get_logger(__name__)

# 使用 or_else 提供默认值
result = Result.fail("失败")
default_value = result.or_else({"default": "value"})

# 使用 raise_on_error 抛出异常
result = Result.fail("操作失败", code=500)
try:
    data = result.raise_on_error()
except Exception as e:
    logger.info(f"捕获异常: {e}")

Result 装饰器

from pynomad.result.decorator import (
    result,
    ignore_result,
    client_result,
    network_result,
    server_result,
    third_party_result,
    json_result,
    dict_result,
)

# 基本结果装饰器
@result
def divide(a: int, b: int) -> float:
    if b == 0:
        raise ValueError("除数不能为零")
    return a / b

# Result[divide(10, 2)]  # Result.success(5.0)
# Result[divide(10, 0)]  # Result.fail("除数不能为零")

# 客户端错误装饰器
@client_result
def validate_user(user_id: int) -> dict:
    if user_id <= 0:
        raise ValueError("无效的用户ID")
    return {"id": user_id}

# 网络错误装饰器
@network_result
def fetch_data(url: str) -> str:
    response = requests.get(url)
    return response.text

# 服务端错误装饰器
@server_result
def process_request(data: dict) -> dict:
    # 处理服务端逻辑
    pass

# 第三方服务错误装饰器
@third_party_result
def call_external_api(api_key: str) -> dict:
    # 调用外部API
    pass

# JSON 解析装饰器
@json_result
def parse_json(json_str: str) -> dict:
    return json.loads(json_str)

# 字典转换装饰器
@dict_result
def to_dict(obj: Any) -> dict:
    return vars(obj)

# 忽略结果装饰器
@ignore_result
def log_operation(message: str) -> None:
    logger.info(message)
    # 无论是否抛出异常,都返回 Result.success(None)

ResultHandler - 结果处理器

ResultHandler 允许自定义 Result 的初始化行为,实现:

  • 统计错误信息(错误类型、消息、分类)
  • 自定义错误码生成规则
  • 添加错误码前缀
  • 错误监控和告警

基本概念

from pynomad.result.result import ResultHandler, HandleResult, DefaultHandler
from pynomad.result.result import Result, StatusCategory

# ResultHandler 是抽象基类
class MyHandler(ResultHandler):
    def handle(
        self,
        data: object = None,
        category: StatusCategory = StatusCategory.SUCCESS,
        exception: Exception | None = None,
    ) -> HandleResult:
        # 自定义处理逻辑
        return HandleResult(
            code=category.exception_code(exception),
            msg=category.exception_message(exception),
            data=data,
        )

# 注册自定义 Handler
Result.set_handler(MyHandler())

# 恢复默认 Handler
Result.set_handler(DefaultHandler())

统计错误信息

from collections import Counter
from pynomad.result.result import ResultHandler, HandleResult, Result, StatusCategory
from pynomad import get_logger

logger = get_logger(__name__)

class StatisticsHandler(ResultHandler):
    """统计错误信息的处理器"""

    def __init__(self) -> None:
        self.exception_counter: Counter[str] = Counter()
        self.message_counter: Counter[str] = Counter()
        self.category_counter: Counter[str] = Counter()

    def handle(
        self,
        data: object = None,
        category: StatusCategory = StatusCategory.SUCCESS,
        exception: Exception | None = None,
    ) -> HandleResult:
        # 统计异常类型
        exception_type = type(exception).__name__ if exception else "None"
        self.exception_counter[exception_type] += 1

        # 统计错误消息
        msg = category.exception_message(exception)
        self.message_counter[msg] += 1

        # 统计分类
        self.category_counter[category.name] += 1

        return HandleResult(
            code=category.exception_code(exception),
            msg=msg,
            data=data,
        )

    def get_exception_stats(self) -> dict:
        """获取异常类型统计"""
        return dict(self.exception_counter)

    def get_message_stats(self) -> dict:
        """获取消息统计"""
        return dict(self.message_counter)

    def get_category_stats(self) -> dict:
        """获取分类统计"""
        return dict(self.category_counter)

# 使用示例
stats_handler = StatisticsHandler()
Result.set_handler(stats_handler)

# 模拟一些操作
Result.success({"id": 1})
Result.success({"id": 2})
Result.client_error(ValueError("参数错误"))
Result.client_error(ValueError("参数错误"))
Result.client_error(TypeError("类型错误"))
Result.server_error(RuntimeError("运行时错误"))
Result.network_error(ConnectionError("连接失败"))

# 查看统计结果
logger.info("异常类型统计:", stats_handler.get_exception_stats())
# {'None': 2, 'ValueError': 2, 'TypeError': 1, 'RuntimeError': 1, 'ConnectionError': 1}

logger.info("消息统计:", stats_handler.get_message_stats())
# {'success': 2, '参数错误': 2, '类型错误': 1, '运行时错误': 1, '连接失败': 1}

logger.info("分类统计:", stats_handler.get_category_stats())
# {'SUCCESS': 2, 'CLIENT_ERROR': 3, 'SERVER_ERROR': 1, 'NETWORK_ERROR': 1}

自定义错误码前缀

from pynomad.result.result import ResultHandler, HandleResult, Result, StatusCategory

class PrefixCodeHandler(ResultHandler):
    """为错误码添加自定义前缀,用于区分不同服务或模块"""

    def __init__(self, prefix: str = "APP") -> None:
        self.prefix = prefix

    def handle(
        self,
        data: object = None,
        category: StatusCategory = StatusCategory.SUCCESS,
        exception: Exception | None = None,
    ) -> HandleResult:
        base_code = category.exception_code(exception)
        # 生成带前缀的错误码
        code = f"{self.prefix}_{base_code}"

        return HandleResult(
            code=code,
            msg=category.exception_message(exception),
            data=data,
        )

# 使用示例
prefix_handler = PrefixCodeHandler(prefix="MYAPP")
Result.set_handler(prefix_handler)

success_result = Result.success({"status": "ok"})
error_result = Result.client_error(ValueError("错误"))

logger.info("success code:", success_result.code)  # MYAPP_10000000
logger.info("error code:", error_result.code)      # MYAPP_20000000...

作用说明

ResultHandler 的主要作用:

  1. 统计和监控:自动统计各类错误的出现次数,用于错误监控、趋势分析和常见错误识别
  2. 自定义错误码:根据业务需求自定义错误码格式,例如添加服务前缀、模块标识等
  3. 错误告警:结合统计功能,当某类错误达到阈值时触发告警
  4. 日志增强:在处理 Result 时自动记录详细的错误信息
  5. 全局统一处理:所有 Result 创建都经过 Handler,便于统一处理

适用场景

  • 错误监控系统:需要统计错误类型、频率和分布
  • 多服务系统:不同服务使用不同的错误码前缀区分
  • 日志分析:需要详细的错误统计信息进行分析
  • 错误告警:根据错误统计实现自动化告警
  • 调试和测试:收集错误信息用于问题定位

Logger - 日志系统

灵活的日志管理系统。

基本使用

from pynomad import get_logger

# 获取日志记录器
logger = get_logger(__name__)

# 不同级别的日志
logger.trace("追踪信息")
logger.debug("调试信息")
logger.info("普通信息")
logger.warn("警告信息")
logger.error("错误信息")
logger.fatal("致命错误")

日志自动配置

日志系统支持通过 settings.toml 自动配置,无需手动加载。

Root 级别配置

settings.toml 中配置 root 日志级别:

[logging]
# root 日志级别:TRACE | DEBUG | INFO | WARN | ERROR | FATAL
# 生产环境(active=prod/product/production)默认为 INFO,开发环境默认为 TRACE
pretty = true
from pynomad import get_logger

logger = get_logger(__name__)

# root 级别会从配置文件自动读取
logger.info("普通信息")
logger.debug("调试信息")

指定名称的级别配置

支持通配符和层级继承配置不同模块的日志级别:

[logging]
pretty = true

[logging.level]
# root 级别(会应用到所有 logger)
root = "INFO"

# 精确匹配:特定模块
"pynomad.config" = "DEBUG"
"pynomad.cache" = "TRACE"

# 父级匹配:会应用到子模块
"pynomad" = "DEBUG"  # 会应用到 pynomad.config, pynomad.cache 等

# 通配符匹配:使用 * 和 ?
"test.*" = "WARN"     # test 模块下的所有子模块
"*_test" = "ERROR"    # 以 _test 结尾的模块
from pynomad import get_logger

# 不同模块会自动应用匹配的日志级别
logger_root = get_logger()           # 使用 root = "INFO"
logger_config = get_logger("pynomad.config")    # 使用 "pynomad.config" = "DEBUG"
logger_cache = get_logger("pynomad.cache")      # 使用 "pynomad.cache" = "TRACE"
logger_test = get_logger("test.utils")          # 使用 "test.*" = "WARN"

Pretty 日志配置

启用 pretty 模式,支持结构化日志输出:

[logging]
pretty = true

[logging.level]
root = "INFO"
from pynomad import get_logger
import json

logger = get_logger(__name__)

# pretty 模式会自动格式化字典和列表
logger.info({"name": "Alice", "age": 30})
# 输出格式化的 JSON 对象

logger.info([1, 2, 3, 4, 5])
# 输出格式化的列表

环境自动调整

日志系统会根据激活环境自动调整日志级别:

[env]
active = "prod"
  • 生产环境(active 包含 prod/product/production):

    • root 日志级别:INFO
    • 文件日志级别:INFO
  • 开发环境(其他环境):

    • root 日志级别:TRACE
    • 文件日志级别:INFO

彩色日志生成器

使用 FormatBuilder 构建自定义彩色日志格式。

基本使用

from pynomad.logsystem.builder import FormatBuilder
from pynomad.logsystem.formatters import ColorFormatter
from pynomad.common.ansi import Color, AnsiStyle

# 构建格式字符串
builder = FormatBuilder()

# 添加时间戳(洋红色)
builder.add_fore(Color.MAGENTA)
builder.add_asctime(width=9)
builder.reset_style()

# 添加日志名称(青色,左对齐,宽度 25,超长中间截断)
builder.add_fore(Color.CYAN.bright_fore)
builder.add_name(align="<", width=25, truncate=True)
builder.reset_style()

# 添加分隔符
builder.add_string(":")

# 添加行号(蓝色,右对齐,零填充,宽度 4)
builder.add_style(AnsiStyle.DIM)
builder.add_fore(Color.BLUE)
builder.add_lineno(width=4, align=">", fill="0")
builder.reset_style()

# 添加日志级别(使用动态级别颜色,居中对齐,宽度 8)
builder.add_string(" [")
builder.add_levelname(width=8, fill="*", align="^", use_level_color=True)
builder.add_string("] ")

# 添加消息(使用动态级别颜色)
builder.add_message(use_level_color=True)

# 构建格式字符串
_, _, colorfmt_dynamic = builder.build()

# 创建格式化器
formatter = ColorFormatter(
    fmt=colorfmt_dynamic,
    datefmt="%H:%M:%S"
)

动态级别颜色

使用 use_level_color=True 启用动态级别颜色,不同级别显示不同颜色:

builder = FormatBuilder()

# 级别字段使用动态颜色
builder.add_levelname(width=8, use_level_color=True)
builder.add_string(": ")
builder.add_message(use_level_color=True)

_, _, fmt = builder.build()
formatter = ColorFormatter(fmt)

默认级别颜色映射:

  • TRACE: 青色 + 暗淡样式
  • DEBUG: 白色
  • INFO: 绿色
  • WARN: 黄色
  • ERROR: 红色
  • FATAL: 亮红色 + 粗体 + 下划线

自定义颜色和样式

from pynomad.common.ansi import Color, AnsiStyle

builder = FormatBuilder()

# 前景色
builder.add_fore(Color.RED)              # 红色
builder.add_fore(Color.GREEN.bright_fore)  # 亮绿色

# 背景色
builder.add_back(Color.DEFAULT)         # 默认背景
builder.add_back(41)                    # 红色背景

# 样式
builder.add_style(AnsiStyle.BOLD)        # 粗体
builder.add_style(AnsiStyle.UNDERLINE)   # 下划线
builder.add_style(AnsiStyle.DIM)        # 暗淡

# 组合使用
builder.add_fore(Color.YELLOW)
builder.add_back(41)
builder.add_style(AnsiStyle.BOLD)
builder.add_string("警告消息")
builder.reset_style()  # 重置所有颜色和样式

格式化选项

builder = FormatBuilder()

# 对齐方式:"<" 左对齐, ">" 右对齐, "^" 居中, "=" 填充
builder.add_name(align="<", width=20)    # 左对齐
builder.add_levelname(align=">", width=8)  # 右对齐
builder.add_funcname(align="^", width=15)   # 居中

# 填充字符
builder.add_lineno(width=5, fill="0")    # 用 0 填充
builder.add_string(width=10, fill="-")   # 用 - 填充

# 超长截断(仅在宽度限制时有效)
builder.add_module(width=20, truncate=True)  # 超长中间截断

# 使用动态级别颜色
builder.add_levelname(use_level_color=True)
builder.add_message(use_level_color=True)

配置文件中使用格式构建器

[logging]
pretty = true

[logging.formatters]
# 自定义彩色格式
[logging.formatters.console_colored]
class = "pynomad.logsystem.formatters.ColorFormatter"
# 使用 fmtbuild_method 动态生成格式
fmtbuild_method = "myapp.logger:build_custom_format"
# show 模式:plain/static/dynamic
show = "dynamic"
# myapp/logger.py
from pynomad.logsystem.builder import FormatBuilder
from pynomad.common.ansi import Color, AnsiStyle
from pynomad.logsystem.logtypes import FormatStrings

def build_custom_format() -> FormatStrings:
    """自定义日志格式构建函数"""
    builder = FormatBuilder()
    
    builder.add_fore(Color.MAGENTA)
    builder.add_asctime(width=9)
    builder.reset_style()
    builder.add_string(" ")
    builder.add_fore(Color.CYAN.bright_fore)
    builder.add_name(align="<", width=25, truncate=True)
    builder.reset_style()
    builder.add_string(":")
    builder.add_fore(Color.BLUE)
    builder.add_lineno(width=4, align=">", fill="0")
    builder.reset_style()
    builder.add_string(" [")
    builder.add_levelname(width=8, fill=" ", align="^", use_level_color=True)
    builder.add_string("] ")
    builder.add_message(use_level_color=True)
    
    return builder.build()

完整示例

from pynomad import get_logger
from pynomad.logsystem.builder import FormatBuilder
from pynomad.logsystem.formatters import ColorFormatter
from pynomad.common.ansi import Color, AnsiStyle
import logging

# 构建自定义格式
def build_custom_format():
    builder = FormatBuilder()
    
    # 时间戳 - 洋红色
    builder.add_fore(Color.MAGENTA)
    builder.add_asctime(width=9)
    builder.reset_style()
    
    # 日志器名 - 青色
    builder.add_fore(Color.CYAN.bright_fore)
    builder.add_name(align="<", width=25, truncate=True)
    builder.reset_style()
    
    # 分隔符
    builder.add_string(":")
    
    # 行号 - 蓝色
    builder.add_fore(Color.BLUE)
    builder.add_lineno(width=4, align=">", fill="0")
    builder.reset_style()
    
    # 级别 - 动态颜色
    builder.add_string(" [")
    builder.add_levelname(width=8, align="^", use_level_color=True)
    builder.add_string("] ")
    
    # 消息 - 动态颜色
    builder.add_message(use_level_color=True)
    
    return builder.build()

# 创建格式化器
_, _, fmt = build_custom_format()
formatter = ColorFormatter(fmt, datefmt="%H:%M:%S")

# 应用到日志器
logger = get_logger(__name__)
handler = logging.StreamHandler()
handler.setFormatter(formatter)
logger.addHandler(handler)

# 测试
logger.trace("追踪信息")  # 青色
logger.debug("调试信息")  # 白色
logger.info("普通信息")   # 绿色
logger.warn("警告信息")   # 黄色
logger.error("错误信息")  # 红色
logger.fatal("致命错误")  # 亮红色 + 粗体 + 下划线

Cache - 缓存系统

多级缓存系统,支持多种缓存存储方式。

缓存配置

缓存系统支持通过 settings.toml 自动配置,无需手动初始化。

配置文件示例

# ========== 缓存配置 ==========
# 环境配置
environments = ["dev", "test", "prod"]
active = ["dev"]

# 内存缓存配置
[cache.memory]
name = "memory_cache"
ttl = 3

# DataFrame 内存缓存配置
[cache.dfmemory]
name = "df_memory_cache"
ttl = 3

# Pickle 缓存配置
[cache.pickle]
name = "pickle_cache"
cache_dir = "{workspace}/data/cache/pickle"
enable_encryption = true
salt = "pynomad_pickle_cache_v1"
ttl = 3

# DataFrame Pickle 缓存配置
[cache.dfpickle]
cache_dir = "{workspace}/data/cache/df_pickle"
enable_encryption = true
salt = "pynomad_pickle_cache_v1"

# Redis 缓存配置
[cache.redis]
host = "localhost"
port = 6379
ttl = 3
db = 15

# DataFrame Redis 缓存配置
[cache.dfredis]
host = "localhost"
port = 6379
db = 15

# SQL 缓存配置(全局)
[cache.sql]
db_type = "sqlite"
db_url = "{workspace}/data/cache/database/cache.db"
ttl = 3600

# SQL 缓存配置(开发环境)
[dev.cache.sql]
db_type = "mysql"
db_name = "cache_db"
host = "localhost"
port = 3306
username = "root"
password = "your_password"

# SQL 缓存配置(测试环境)
[test.cache.sql]
db_type = "postgresql"
db_name = "cache_db"
host = "localhost"
port = 5432
username = "postgres"
password = "your_password"

# SQL 缓存配置(生产环境)
[prod.cache.sql]
db_type = "postgresql"
db_name = "cache_db"
host = "prod.example.com"
port = 5432
username = "postgres"
password = "prod_password"

Memory Cache - 内存缓存 (@memcached)

内存缓存是最快的缓存方式,数据存储在进程内存中。

基本使用

from pynomad import memcached, get_logger

logger = get_logger(__name__)

@memcached
def expensive_function(x: int) -> int:
    logger.info(f"计算 {x} 的平方")
    return x * x

# 第一次调用 - 缓存未命中
result1 = expensive_function(5)  # 输出: 计算 5 的平方

# 第二次调用 - 缓存命中
result2 = expensive_function(5)  # 无输出,直接返回缓存结果

装饰器参数

from pynomad import memcached

# 自定义缓存名称和 TTL
@memcached(name="custom_cache", ttl=60, maxsize=100)
def compute_factorial(n: int) -> int:
    return factorial(n)

参数说明

  • name: 缓存名称,用于区分不同缓存实例
  • ttl: 过期时间(秒),默认从配置文件读取
  • maxsize: 最大缓存条目数,默认从配置文件读取
  • keygenerator: 自定义键生成函数

配置说明

[cache.memory]
name = "memory_cache"      # 缓存名称
ttl = 3                    # 默认过期时间(秒)

Pickle Cache - Pickle 缓存 (@pickled)

Pickle 缓存将数据序列化到文件,支持数据持久化。

基本使用

from pynomad import pickled, get_logger

logger = get_logger(__name__)

@pickled
def complex_function(data: dict) -> list:
    logger.info(f"处理复杂数据: {data}")
    return [data.get("key", "default")]

# 缓存到文件
result1 = complex_function({"key": "value"})  # 执行计算
result2 = complex_function({"key": "value"})  # 从缓存读取

装饰器参数

from pynomad import pickled

@pickled(
    name="custom_pickle_cache",
    cache_dir="{workspace}/data/custom_cache",
    enable_encryption=False,
    ttl=3600
)
def process_data(data: dict) -> dict:
    return data

参数说明

  • name: 缓存名称
  • cache_dir: 缓存目录,支持路径变量展开
  • enable_encryption: 是否启用加密,默认为 true
  • ttl: 过期时间(秒)

配置说明

[cache.pickle]
name = "pickle_cache"
cache_dir = "{workspace}/data/cache/pickle"  # 缓存目录
enable_encryption = true                      # 是否启用加密
salt = "pynomad_pickle_cache_v1"           # 加密盐值
ttl = 3                                    # 默认过期时间(秒)

Redis Cache - Redis 缓存 (@rediscached)

Redis 缓存支持分布式缓存,适用于多进程或多服务器场景。

安装依赖

pip install redis

基本使用

from pynomad import rediscached, get_logger

logger = get_logger(__name__)

@rediscached
def fetch_from_api(endpoint: str) -> dict:
    logger.info(f"调用 API: {endpoint}")
    return {"data": "response", "endpoint": endpoint}

# 结果缓存到 Redis
result1 = fetch_from_api("/api/users")  # 执行 API 调用
result2 = fetch_from_api("/api/users")  # 从 Redis 缓存读取

装饰器参数

from pynomad import rediscached

@rediscached(
    name="api_cache",
    host="localhost",
    port=6379,
    db=1,
    ttl=300,
    maxsize=1000
)
def fetch_data(url: str) -> dict:
    return {"url": url}

参数说明

  • name: 缓存名称
  • host: Redis 服务器地址
  • port: Redis 端口
  • db: Redis 数据库编号
  • ttl: 过期时间(秒)
  • maxsize: 最大缓存条目数

配置说明

[cache.redis]
host = "localhost"  # Redis 服务器地址
port = 6379         # Redis 端口
ttl = 3             # 默认过期时间(秒)
db = 15             # Redis 数据库编号

SQL Cache - SQL DataFrame 缓存 (@sqlcached)

SQL 缓存专门用于 pandas DataFrame 对象的数据库缓存,支持多种数据库类型(SQLite、MySQL、PostgreSQL),适合大数据量、长期缓存的场景。

安装依赖

pip install sqlalchemy

基本使用

from pynomad import sqlcached, get_logger
from pandas import DataFrame

logger = get_logger(__name__)

@sqlcached(db_type="sqlite", db_url="sqlite:///data/cache.db", ttl=300)
def query_database(query: str) -> DataFrame:
    logger.info(f"执行查询: {query}")
    return DataFrame({"query": [query], "result": ["data"]})

# DataFrame 缓存到数据库
df1 = query_database("SELECT * FROM users")  # 执行查询
df2 = query_database("SELECT * FROM users")  # 从数据库缓存读取

装饰器参数

from pynomad import sqlcached
from pandas import DataFrame

@sqlcached(
    name="custom_sql_cache",
    db_type="mysql",
    db_url="mysql://user:pass@localhost/db",
    db_name="cache_db",
    host="localhost",
    port=3306,
    username="root",
    password="password",
    ttl=3600
)
def fetch_data(key: str) -> DataFrame:
    return DataFrame({"key": [key], "value": [100]})

参数说明

  • name: 缓存名称
  • db_type: 数据库类型(sqlite/mysql/postgresql)
  • db_url: 数据库连接 URL
  • db_name: 数据库名称
  • host: 数据库主机
  • port: 数据库端口
  • username: 用户名
  • password: 密码
  • ttl: 过期时间(秒)

配置说明

# 全局配置
[cache.sql]
db_type = "sqlite"
db_url = "{workspace}/data/cache/database/cache.db"
ttl = 3600

# 开发环境配置
[dev.cache.sql]
db_type = "mysql"
db_name = "cache_db"
host = "localhost"
port = 3306
username = "root"
password = "your_password"

# 测试环境配置
[test.cache.sql]
db_type = "postgresql"
db_name = "cache_db"
host = "localhost"
port = 5432
username = "postgres"
password = "your_password"

重要说明

  • sqlcachedDataFrame 专用 缓存装饰器,继承自 BaseDataFrameDecorator
  • 支持 DataFrameValueLoader 三阶段处理(GET、PUT、EXTRACT)
  • 返回值类型必须是 DataFrame
  • 适用于大数据量、长期缓存的 DataFrame 场景

DataFrame 缓存

DataFrame 缓存专门用于 pandas DataFrame 对象的缓存。

DataFrameValueLoader - 三阶段数据处理

DataFrame 缓存采用三阶段处理模式,通过 DataFrameValueLoader 协议实现灵活的数据处理。

三个阶段

  1. GET 阶段 (CacheStage.GET):从缓存提取数据,决定是否使用缓存
  2. PUT 阶段 (CacheStage.PUT):合并缓存数据和新数据
  3. EXTRACT 阶段 (CacheStage.EXTRACT):从合并数据中提取最终返回数据

DataFrameValueLoader 协议详解

DataFrameValueLoader 是一个 Protocol(协议),需要实现三个方法:

from pynomad.cache.dataframe.types import DataFrameValueLoader, CacheStage
from pandas import DataFrame
from pynomad.result.result import Result, ResultAny

class CustomDataFrameValueLoader(DataFrameValueLoader):
    """自定义 DataFrame 值加载器"""

    def get(
        self,
        cache_data: DataFrame,
        extra_params: dict[str, Any],
        args: tuple[Any, ...],
        kwargs: dict[str, Any],
    ) -> Result[DataFrame | dict[str, Any]]:
        """GET 阶段:从缓存提取数据

        参数说明:
            cache_data: 从缓存获取的 DataFrame(如果有缓存)
            extra_params: 装饰器传递的额外参数(通常包含上次的 args 和 kwargs)
            args: 当前调用的位置参数(被装饰函数的 args)
            kwargs: 当前调用的关键字参数(被装饰函数的 kwargs)

        返回格式:
            - 使用缓存:Result.success(data=cache_data 或 dict)
            - 刷新数据:Result.client_error(exception=NeedsRefreshException(...))

        使用场景:
            - 检查缓存是否过期(根据时间戳、版本号等)
            - 验证参数是否匹配(与 extra_params 中的旧参数比较)
            - 根据业务逻辑决定是否使用缓存
        """
        # 示例1:检查缓存是否过期
        if self._is_expired(cache_data):
            return Result.client_error(
                exception=NeedsRefreshException(message="缓存已过期")
            )
        return Result.success(data=cache_data)

        # 示例2:比较参数是否匹配
        last_args = tuple(extra_params.get("args", []))
        last_kwargs = extra_params.get("kwargs", {})
        if last_args != args or last_kwargs != kwargs:
            return Result.client_error(
                exception=NeedsRefreshException(message="参数已变化"),
                data={"args": (), "kwargs": kwargs}  # 新参数(用于重新调用)
            )
        return Result.success(data=cache_data)

    def put(
        self,
        cache_data: DataFrame,
        new_data: DataFrame,
        extra_params: dict[str, Any],
        args: tuple[Any, ...],
        kwargs: dict[str, Any],
    ) -> ResultAny[DataFrame]:
        """PUT 阶段:合并缓存数据和新数据

        参数说明:
            cache_data: 原缓存中的 DataFrame(可能为空 DataFrame)
            new_data: 新获取的 DataFrame(被装饰函数的返回值)
            extra_params: 装饰器传递的额外参数
            args: 当前调用的位置参数
            kwargs: 当前调用的关键字参数

        返回格式:
            - Result.success(data=DataFrame): 需要缓存到存储的 DataFrame

        使用场景:
            - 数据追加:旧数据 + 新数据
            - 数据去重:合并后删除重复行
            - 数据更新:根据某些字段更新旧数据
            - 完全覆盖:直接返回 new_data(不使用 cache_data)
        """
        # 示例1:数据追加
        merged = pd.concat([cache_data, new_data], ignore_index=True)
        return Result.success(data=merged)

        # 示例2:数据去重
        merged = pd.concat([cache_data, new_data], ignore_index=True)
        merged = merged.drop_duplicates()
        return Result.success(data=merged)

        # 示例3:根据列更新(如按 symbol 更新价格)
        cache_data.set_index('symbol', inplace=True)
        new_data.set_index('symbol', inplace=True)
        merged = cache_data.update(new_data).reset_index()
        return Result.success(data=merged)

        # 示例4:完全覆盖(不合并)
        return Result.success(data=new_data)

    def extract(
        self,
        merged_data: DataFrame,
        extra_params: dict[str, Any],
        args: tuple[Any, ...],
        kwargs: dict[str, Any],
    ) -> ResultAny[DataFrame]:
        """EXTRACT 阶段:提取最终返回数据

        参数说明:
            merged_data: 合并后的 DataFrame(将被缓存)
            extra_params: 装饰器传递的额外参数
            args: 当前调用的位置参数
            kwargs: 当前调用的关键字参数

        返回格式:
            - Result.success(data=DataFrame): 返回给调用方的最终 DataFrame

        使用场景:
            - 返回部分数据(如最新 N 行、特定列)
            - 数据过滤(根据参数筛选)
            - 数据排序(按时间、价格等排序)
            - 数据分组汇总
        """
        # 示例1:返回最新 N 行
        return Result.success(data=merged_data.tail(10))

        # 示例2:根据参数过滤
        symbol = kwargs.get('symbol')
        if symbol:
            filtered = merged_data[merged_data['symbol'] == symbol]
            return Result.success(data=filtered)
        return Result.success(data=merged_data)

        # 示例3:按时间排序返回
        sorted_data = merged_data.sort_values('timestamp', ascending=False)
        return Result.success(data=sorted_data)

        # 示例4:返回所有数据(不做处理)
        return Result.success(data=merged_data)

参数格式说明

传入参数

参数 类型 说明
cache_data DataFrame 从缓存读取的 DataFrame(GET/PUT 阶段有此参数)
new_data DataFrame 新获取的 DataFrame(仅 PUT 阶段有此参数)
merged_data DataFrame 合并后的 DataFrame(仅 EXTRACT 阶段有此参数)
extra_params dict[str, Any] 额外参数,通常包含:{"args": [...], "kwargs": {...}}
args tuple[Any, ...] 被装饰函数的位置参数
kwargs dict[str, Any] 被装饰函数的关键字参数

返回值格式

  • GET 阶段Result[DataFrame | dict[str, Any]]
    • 成功:Result.success(data=cache_data)Result.success(data={"key": "value"})
    • 失败:Result.client_error(exception=NeedsRefreshException(...))
  • PUT 阶段ResultAny[DataFrame]
    • 成功:Result.success(data=merged_data)
  • EXTRACT 阶段ResultAny[DataFrame]
    • 成功:Result.success(data=extracted_data)

完整使用示例

示例1:增量更新缓存(追加新数据)

from pynomad import df_memcached
from pynomad.cache.dataframe.types import DataFrameValueLoader
from pandas import DataFrame
from pynomad.result.result import Result, ResultAny

class IncrementalDataLoader(DataFrameValueLoader):
    """增量更新加载器:追加新数据,返回最新数据"""

    def get(self, cache_data: DataFrame, extra_params: dict,
            args: tuple, kwargs: dict) -> Result[DataFrame | dict]:
        # 总是重新获取数据(实现增量更新)
        return Result.client_error(
            exception=NeedsRefreshException("需要获取新数据"),
            data={"args": (), "kwargs": kwargs}
        )

    def put(self, cache_data: DataFrame, new_data: DataFrame,
            extra_params: dict, args: tuple, kwargs: dict) -> ResultAny[DataFrame]:
        # 合并旧数据和新数据
        merged = pd.concat([cache_data, new_data], ignore_index=True)
        # 去重
        merged = merged.drop_duplicates()
        return Result.success(data=merged)

    def extract(self, merged_data: DataFrame, extra_params: dict,
               args: tuple, kwargs: dict) -> ResultAny[DataFrame]:
        # 返回最新 100 条数据
        return Result.success(data=merged_data.tail(100))

@df_memcached(valueloader=IncrementalDataLoader())
def fetch_stock_data(symbol: str) -> DataFrame:
    """获取股票数据"""
    # 模拟获取最新数据
    return DataFrame({"symbol": [symbol], "price": [100.0], "timestamp": [datetime.now()]})

示例2:智能缓存(过期检查 + 参数匹配)

from datetime import datetime, timedelta

class SmartDataLoader(DataFrameValueLoader):
    """智能加载器:检查过期时间和参数匹配"""

    CACHE_DURATION = timedelta(hours=1)  # 缓存1小时

    def get(self, cache_data: DataFrame, extra_params: dict,
            args: tuple, kwargs: dict) -> Result[DataFrame | dict]:
        # 检查缓存是否为空
        if cache_data.empty:
            return Result.client_error("缓存为空")

        # 检查时间戳列是否存在
        if 'timestamp' not in cache_data.columns:
            return Result.client_error("缓存缺少时间戳")

        # 检查缓存是否过期
        last_time = cache_data['timestamp'].max()
        if datetime.now() - last_time > self.CACHE_DURATION:
            return Result.client_error(
                exception=NeedsRefreshException("缓存已过期")
            )

        # 检查参数是否匹配
        last_args = tuple(extra_params.get("args", []))
        last_kwargs = extra_params.get("kwargs", {})
        if last_args != args or last_kwargs != kwargs:
            return Result.client_error(
                exception=NeedsRefreshException("参数已变化")
            )

        # 使用缓存
        return Result.success(data=cache_data)

    def put(self, cache_data: DataFrame, new_data: DataFrame,
            extra_params: dict, args: tuple, kwargs: dict) -> ResultAny[DataFrame]:
        # 直接覆盖缓存
        return Result.success(data=new_data)

    def extract(self, merged_data: DataFrame, extra_params: dict,
               args: tuple, kwargs: dict) -> ResultAny[DataFrame]:
        # 直接返回所有数据
        return Result.success(data=merged_data)

示例3:数据过滤和分组

class FilteredDataLoader(DataFrameValueLoader):
    """过滤加载器:根据参数过滤返回数据"""

    def get(self, cache_data: DataFrame, extra_params: dict,
            args: tuple, kwargs: dict) -> Result[DataFrame | dict]:
        # 使用 DefaultDataFrameValueLoader 的逻辑
        last_args = tuple(extra_params.get("args", []))
        last_kwargs = extra_params.get("kwargs", {})
        if last_args == args and last_kwargs == kwargs:
            return Result.success(data=cache_data)
        return Result.client_error("参数已变化")

    def put(self, cache_data: DataFrame, new_data: DataFrame,
            extra_params: dict, args: tuple, kwargs: dict) -> ResultAny[DataFrame]:
        # 合并并去重
        merged = pd.concat([cache_data, new_data], ignore_index=True)
        return Result.success(data=merged.drop_duplicates())

    def extract(self, merged_data: DataFrame, extra_params: dict,
               args: tuple, kwargs: dict) -> ResultAny[DataFrame]:
        # 根据参数过滤
        symbol = kwargs.get('symbol')
        if symbol:
            return Result.success(data=merged_data[merged_data['symbol'] == symbol])

        # 根据 args 过滤
        if args and len(args) > 0:
            symbol = args[0]
            return Result.success(data=merged_data[merged_data['symbol'] == symbol])

        # 返回所有数据
        return Result.success(data=merged_data)

DefaultDataFrameValueLoader

默认的值加载器,提供简单的缓存逻辑:

  • GET:比较 extra_params 中的旧参数与当前参数
    • 相同:返回 Result.success(data=cache_data)
    • 不同:返回 Result.client_error(NeedsRefreshException(...))
  • PUT:直接返回 new_data(不合并,完全覆盖缓存)
  • EXTRACT:直接返回 merged_data(不做任何处理)

适用于不需要复杂合并逻辑、直接覆盖缓存的简单场景。

# 使用默认加载器
from pynomad import df_memcached

@df_memcached()  # 默认使用 DefaultDataFrameValueLoader
def get_data(symbol: str) -> DataFrame:
    return DataFrame({"symbol": [symbol], "price": [100.0]})

Key 生成规则 - 参数参与缓存键的最佳实践

在使用 DataFrame 缓存(特别是三阶段处理)时,合理设计缓存键是提高缓存命中率的关键。

基本概念

缓存键由以下几部分组成:

  • 模块名:函数所在模块
  • 类名:如果是类方法
  • 函数名:被装饰的函数名称
  • 参数:位置参数和关键字参数

参数参与原则

建议参与 Key 生成的参数

  • 唯一标识符:如股票代码、用户 ID、订单号等
  • 数据类型/分类:如复权类型、数据频率等
  • 影响数据内容的关键参数:直接决定返回数据的参数

不建议参与 Key 生成的参数

  • 过滤条件:如时间范围(start_date, end_date)、分页参数等
  • 排序参数:如 sort_by, order
  • 格式参数:如返回格式、字段选择等
  • 临时参数:如日志标记、调试参数等

示例:股票数据缓存

from pynomad import df_memcached
from pynomad.cache.decorator.keygenerator import ConfigableKeygenerator
from pandas import DataFrame

# ❌ 错误示例:所有参数都参与 Key 生成
@df_memcached()
def get_stock_data(code: str, start_date: str, end_date: str, fq: str) -> DataFrame:
    """
    问题:start_date 和 end_date 参与了 Key 生成
    - get_stock_data("AAPL", "2024-01-01", "2024-12-31", "qfq") 生成键 A
    - get_stock_data("AAPL", "2024-01-01", "2024-06-30", "qfq") 生成键 B
    结果:即使 AAPL 的完整数据已缓存,不同日期范围也会重新获取
    """
    # 实际实现会获取完整历史数据
    return DataFrame({"code": [code], "date": ["2024-01-01"], "price": [100.0]})

# ✅ 正确示例:使用 exclude_params 排除过滤条件
@df_memcached(
    keygenerator=ConfigableKeygenerator(
        include_module=True,
        include_function=True,
        include_args=True,
        include_kwargs=True,
        exclude_params={"start_date", "end_date"}  # 排除时间范围参数
    )
)
def get_stock_data(code: str, start_date: str, end_date: str, fq: str) -> DataFrame:
    """
    优点:只有 code 和 fq 参与 Key 生成
    - get_stock_data("AAPL", "2024-01-01", "2024-12-31", "qfq") 生成键 A
    - get_stock_data("AAPL", "2024-01-01", "2024-06-30", "qfq") 生成键 A(相同!)
    结果:首次调用获取完整数据并缓存,后续调用从缓存中过滤返回
    """
    # 获取完整历史数据
    full_data = fetch_full_history(code, fq)
    # 在 EXTRACT 阶段过滤日期范围
    return full_data

# ✅ 正确示例:使用 include_params 只包含必要参数
@df_memcached(
    keygenerator=ConfigableKeygenerator(
        include_params={"code", "fq"}  # 只包含 code 和 fq
    )
)
def get_stock_data(code: str, start_date: str, end_date: str, fq: str) -> DataFrame:
    """与上面效果相同,更明确"""
    return fetch_full_history(code, fq)

配合 ValueLoader 实现过滤

from pynomad.cache.dataframe.types import DataFrameValueLoader
from pandas import DataFrame
from pynomad.result.result import Result, ResultAny
from datetime import datetime

class StockDataLoader(DataFrameValueLoader):
    """股票数据加载器:缓存完整数据,按需过滤返回"""

    def get(self, cache_data: DataFrame, extra_params: dict,
            args: tuple, kwargs: dict) -> Result[DataFrame | dict]:
        # 简单返回缓存,不过滤(过滤在 extract 阶段)
        return Result.success(data=cache_data)

    def put(self, cache_data: DataFrame, new_data: DataFrame,
            extra_params: dict, args: tuple, kwargs: dict) -> ResultAny[DataFrame]:
        # 返回新数据(假设总是获取完整数据)
        return Result.success(data=new_data)

    def extract(self, merged_data: DataFrame, extra_params: dict,
               args: tuple, kwargs: dict) -> ResultAny[DataFrame]:
        # 根据参数过滤返回数据
        start_date = kwargs.get('start_date')
        end_date = kwargs.get('end_date')

        filtered_data = merged_data

        # 应用日期范围过滤
        if start_date:
            filtered_data = filtered_data[filtered_data['date'] >= start_date]
        if end_date:
            filtered_data = filtered_data[filtered_data['date'] <= end_date]

        return Result.success(data=filtered_data)

# 使用自定义加载器
@df_memcached(
    keygenerator=ConfigableKeygenerator(
        exclude_params={"start_date", "end_date"}  # 排除过滤参数
    ),
    valueloader=StockDataLoader()
)
def get_stock_data(code: str, start_date: str, end_date: str, fq: str) -> DataFrame:
    """
    工作流程:
    1. 首次调用 get_stock_data("AAPL", "2024-01-01", "2024-03-31", "qfq")
       - 生成 Key:包含 code="AAPL", fq="qfq"
       - 获取完整历史数据并缓存

    2. 二次调用 get_stock_data("AAPL", "2024-04-01", "2024-06-30", "qfq")
       - 生成 Key:相同(因为 start_date, end_date 不参与)
       - 从缓存读取完整数据
       - 在 extract 阶段过滤返回 2024-04-01 到 2024-06-30 的数据

    3. 三次调用 get_stock_data("TSLA", "2024-01-01", "2024-12-31", "qfq")
       - 生成 Key:不同(code="TSLA")
       - 获取 TSLA 的完整数据并缓存
    """
    return fetch_full_history(code, fq)

常见场景的 Key 生成策略

场景 参与参数 不参与参数 原因
股票历史数据 code, fq start_date, end_date 日期范围是过滤条件,不影响数据获取
用户列表查询 role, status page, page_size 分页参数只影响返回数量
订单查询 user_id start_time, end_time 时间范围是过滤条件
统计数据 metric, period fields 字段选择只影响返回列
配置获取 config_key version 版本号可作为 metadata 存储

ConfigableKeygenerator 参数说明

ConfigableKeygenerator(
    include_module=True,      # 是否包含模块名
    include_class=True,       # 是否包含类名
    include_function=True,     # 是否包含函数名
    include_args=True,        # 是否包含位置参数
    include_kwargs=True,      # 是否包含关键字参数
    show_args_names=True,     # 是否显示位置参数名
    include_params=None,      # 只包含这些参数(白名单)
    exclude_params=None,      # 排除这些参数(黑名单)
    hash_params=False,        # 是否对参数进行哈希
    hash_key=False,          # 是否对整个键进行哈希
    hash_algorithm="md5",    # 哈希算法:md5/sha1/sha256/sha512
    hash_truncate=16,        # 哈希值截断长度
    separator=":",           # 分隔符
)

推荐的 Key 生成器配置

from pynomad.cache.decorator.keygenerator import (
    default_key_generator,           # 完整信息(模块+类+函数+参数)
    hashed_params_key_generator,     # 参数哈希(减少键长度)
    compact_key_generator,           # 紧凑模式(只函数名+哈希参数)
    module_function_key_generator,   # 不含参数(共享缓存)
)

选择建议:

  • 大多数场景:hashed_params_key_generator(平衡可读性和性能)
  • 需要调试:default_key_generator(完整信息)
  • 极高并发:compact_key_generator(最短键)
  • 全局共享:module_function_key_generator(无参数影响)

Memory DataFrame Cache - DataFrame 内存缓存 (@df_memcached)

from pynomad import df_memcached, get_logger
from pandas import DataFrame

logger = get_logger(__name__)

@df_memcached
def get_stock_data(symbol: str) -> DataFrame:
    logger.info(f"获取股票数据: {symbol}")
    return DataFrame({"symbol": [symbol], "price": [100.0]})

# 第一次调用 - 缓存未命中
df1 = get_stock_data("AAPL")  # 输出: 获取股票数据: AAPL

# 第二次调用 - 缓存命中
df2 = get_stock_data("AAPL")  # 无输出,从缓存读取

装饰器参数

@df_memcached(
    name="custom_df_cache",
    maxsize=100,
    ttl=60,
    valueloader=CustomDataFrameValueLoader()
)
def load_dataframe(symbol: str) -> DataFrame:
    return DataFrame({"symbol": [symbol]})

配置说明

[cache.dfmemory]
name = "df_memory_cache"  # 缓存名称
ttl = 3                     # 默认过期时间(秒)

Pickle DataFrame Cache - DataFrame Pickle 缓存 (@df_pickled)

from pynomad import df_pickled, get_logger
from pandas import DataFrame

logger = get_logger(__name__)

@df_pickled
def load_dataframe(path: str) -> DataFrame:
    logger.info(f"加载数据: {path}")
    return DataFrame({"path": [path], "data": [1, 2, 3]})

# 缓存到文件
df1 = load_dataframe("data.csv")  # 执行加载
df2 = load_dataframe("data.csv")  # 从缓存读取

装饰器参数

@df_pickled(
    name="custom_df_pickle",
    cache_dir="{workspace}/data/custom_df_cache",
    enable_encryption=False,
    ttl=3600
)
def process_dataframe(id: str) -> DataFrame:
    return DataFrame({"id": [id]})

配置说明

[cache.dfpickle]
cache_dir = "{workspace}/data/cache/df_pickle"  # 缓存目录
enable_encryption = true                          # 是否启用加密
salt = "pynomad_pickle_cache_v1"               # 加密盐值

Redis DataFrame Cache - DataFrame Redis 缓存 (@df_rediscached)

from pynomad import df_rediscached, get_logger
from pandas import DataFrame

logger = get_logger(__name__)

@df_rediscached
def fetch_market_data(symbol: str) -> DataFrame:
    logger.info(f"获取市场数据: {symbol}")
    return DataFrame({"symbol": [symbol], "price": [150.0]})

# DataFrame 缓存到 Redis
df1 = fetch_market_data("BTC")  # 执行获取
df2 = fetch_market_data("BTC")  # 从 Redis 缓存读取

装饰器参数

@df_rediscached(
    name="custom_df_redis",
    host="localhost",
    port=6379,
    db=14,
    ttl=120,
    maxsize=200
)
def fetch_dataframe(key: str) -> DataFrame:
    return DataFrame({"key": [key]})

配置说明

[cache.dfredis]
host = "localhost"  # Redis 服务器地址
port = 6379         # Redis 端口
db = 15             # Redis 数据库编号

Multi-Level Cache - 多级缓存 (@multi_cached)

多级缓存组合器支持组合多个缓存装饰器,实现多级缓存策略。

读穿透和写穿透

读穿透

  • 按顺序从 L1 -> L2 -> L3 查找缓存
  • 命中后回填到上层缓存

写穿透

  • 所有缓存都未命中时执行函数
  • 将结果写入所有层级

基本使用

from pynomad import multi_cached, memcached, pickled, get_logger

logger = get_logger(__name__)

@multi_cached(
    memcached("l1", ttl=10),   # L1: 内存缓存,10秒
    pickled("l2", ttl=60)       # L2: 文件缓存,60秒
)
def expensive_computation(n: int) -> int:
    logger.info(f"计算斐波那契数列第 {n} 项")
    # 复杂计算...
    return n

# 查找顺序:L1 -> L2 -> 执行函数 -> 写入 L1 和 L2
expensive_computation(10)

DataFrame 多级缓存

from pynomad import multi_cached, df_memcached, df_pickled, df_rediscached
from pandas import DataFrame

@multi_cached(
    df_memcached("df_l1", ttl=10),    # L1: DataFrame 内存缓存
    df_pickled("df_l2", ttl=60),       # L2: DataFrame 文件缓存
    df_rediscached("df_l3", ttl=300)   # L3: DataFrame Redis 缓存
)
def get_multi_level_df(symbol: str) -> DataFrame:
    # 获取 DataFrame 数据
    return DataFrame({"symbol": [symbol], "price": [100.0]})

混合缓存组合

可以混合使用通用缓存和 DataFrame 缓存:

@multi_cached(
    memcached("hybrid_l1", ttl=10),       # L1: 通用内存缓存
    df_rediscached("hybrid_l2", ttl=300)  # L2: DataFrame Redis 缓存
)
def get_data(id: str) -> DataFrame:
    return DataFrame({"id": [id]})

多级缓存管理

# 获取多级缓存实例
multi_cache = expensive_computation.__multi_cache__

# 清空所有层级缓存
multi_cache.clear()

# 清空指定层级缓存
multi_cache.clear_decorator(index=1)

# 获取统计信息
stats = multi_cache.get_stats()
print(f"L1 命中次数: {stats[0]['hits']}")
print(f"L2 命中次数: {stats[1]['hits']}")

# 删除指定键
multi_cache.evict(key="cache_key")

# 获取装饰器数量
print(f"缓存层级数量: {multi_cache.count}")

TTL 优先级规则

  1. 装饰器传入参数 > 配置文件默认值
  2. 已存在的缓存键不受 TTL 变更影响
    • 如果某个键已经缓存,之后修改装饰器参数或更新配置文件的 ttl,不会影响已存在的键的过期时间
    • 只有在缓存过期后被删除,下次调用时才会使用新的 ttl
  3. TTL 计算时机:在 put() 时设置,过期后自然失效

缓存装饰器总结

装饰器 类型 用途 持久化 适用场景
memcached 通用 内存 短期缓存、频繁访问
pickled 通用 文件 中期缓存、需要持久化
rediscached 通用 Redis 是(Redis持久化) 分布式缓存、多进程
sqlcached DataFrame 数据库 DataFrame 长期缓存、大容量
df_memcached DataFrame 内存 DataFrame 临时缓存
df_pickled DataFrame 文件 DataFrame 持久化缓存
df_rediscached DataFrame Redis 是(Redis持久化) DataFrame 分布式缓存
multi_cached 组合器 多级 取决于组合 多级缓存策略

最佳实践

  1. 根据数据特点选择缓存

    • 频繁访问、数据量小:内存缓存 (memcached, df_memcached)
    • 需要持久化:Pickle 缓存 (pickled)、DataFrame SQL 缓存 (sqlcached)
    • 分布式场景:Redis 缓存 (rediscached, df_rediscached)
    • DataFrame 数据:使用 df_ 系列缓存或 sqlcached
  2. DataFrame 缓存选择策略

    基础缓存器(推荐用于简单场景)

    • 如果返回类型是普通对象(dictliststr 等):使用基础缓存器
    • 如果返回类型是 DataFrame 但不需要复杂处理:使用基础缓存器
    • 如果返回类型是 Result[DataFrame] 但不需要复杂处理:使用基础缓存器

    DataFrame 缓存器(仅在需要复杂处理时使用)

    • 需要增量更新缓存(追加、去重、更新数据)
    • 需要自定义缓存命中逻辑(过期检查、参数匹配等)
    • 需要从合并数据中提取部分返回数据(过滤、排序、分组等)
    • 需要三阶段处理(GET、PUT、EXTRACT)来实现复杂业务逻辑

    选择示例

    # ✅ 推荐:简单场景使用基础缓存器
    @memcached
    def get_simple_data() -> DataFrame:
        return DataFrame({"key": [1, 2, 3]})
    
    @pickled
    def fetch_from_api() -> Result[DataFrame]:
        return Result.success(DataFrame({"data": [1, 2, 3]}))
    
    # ✅ 仅在需要复杂处理时使用 DataFrame 缓存器
    @df_memcached(valueloader=IncrementalDataLoader())
    def get_incremental_data(symbol: str) -> DataFrame:
        # 需要增量更新、去重等复杂处理
        return DataFrame({"symbol": [symbol], "price": [100.0]})
    
  3. 多级缓存组合策略

    推荐组合模式

    • DataFrame 作为第一级(L1):使用 df_ 系列缓存器
    • 其他级别(L2、L3):使用基础缓存器

    原因

    • df_ 系列缓存器提供 DataFrame 特定的三阶段处理
    • 基础缓存器提供通用的序列化/反序列化能力
    • 这种组合既利用了 DataFrame 专用处理,又保持了良好的性能

    组合示例

    # ✅ 推荐:DataFrame 在第一级,其他级别使用基础缓存器
    @multi_cached(
        df_memcached("df_l1", ttl=10),   # L1: DataFrame 内存缓存(处理三阶段逻辑)
        pickled("l2", ttl=60),            # L2: 基础文件缓存
        rediscached("l3", ttl=300)        # L3: 基础 Redis 缓存
    )
    def get_data(symbol: str) -> DataFrame:
        return DataFrame({"symbol": [symbol]})
    
    # ❌ 不推荐:所有级别都使用 DataFrame 缓存器
    @multi_cached(
        df_memcached("df_l1", ttl=10),
        df_pickled("df_l2", ttl=60),     # 冗余:L2 不需要 DataFrame 专用处理
        df_rediscached("df_l3", ttl=300)  # 冗余:L3 不需要 DataFrame 专用处理
    )
    def get_data(symbol: str) -> DataFrame:
        return DataFrame({"symbol": [symbol]})
    
  4. 合理设置 TTL

    • 变化频繁的数据:短 TTL(如 10-60 秒)
    • 相对稳定的数据:长 TTL(如 300-3600 秒)
  5. 使用多级缓存

    • L1 使用内存缓存,提供最快访问
    • L2 使用 Redis/Pickle,提供持久化
    • L3 使用 SQL,提供长期存储
  6. 自定义 ValueLoader

    • 需要增量更新:自定义 PUT 阶段
    • 需要过滤返回:自定义 EXTRACT 阶段
    • 需要复杂命中逻辑:自定义 GET 阶段

缓存配置管理

from pynomad import cp

# 查看缓存属性配置
memory_props = cp.MemoryCacheProperties()
print(memory_props)

redis_props = cp.RedisCacheProperties()
print(redis_props)

df_pickle_props = cp.DataFramePickleCacheProperties()
print(df_pickle_props)

变更日志

0.1.8 (2026-03-16)

Features

  • DataFrame 缓存装饰器参数属性支持默认值填充(args, only_args, kwargs, full_kwargs)
  • 添加 RefreshParams.get_full_kwargs 方法,支持参数到参数名的映射
  • 添加 create_context 方法封装 DataFrameLoaderContext 创建逻辑
  • 添加 _determ_should_wrap_by_explicit_return_type 方法,根据显式返回类型判断是否包装
  • DataFrame 缓存装饰器添加 trace 级别日志,记录缓存命中、GET/PUT/EXTRACT 阶段

Miscellaneous

  • 优化参数比较逻辑,确保未传递参数使用默认值填充

0.1.7 (2026-03-15)

Bugfixes

  • 修复 SQL cache decorator 的 extra_params 未正确传递到 decorator_extra_params 的问题

0.1.6 (2026-03-14)

Features

  • 添加 logs_dir 配置支持,允许自定义日志目录
  • dataframe 缓存支持双前缀
  • autoconfig 添加 autowarid 标志

Miscellaneous

  • 添加 towncrier 配置和项目元数据到 PyPI

License

MIT License


贡献指南

欢迎使用本框架。不欢迎 Issue。

作者现在在黑厂打黑工,996 都算福报,实际是 777 工作制(早 7 点干到晚 7 点,一周 7 天),时间和精力都非常有限,根本没精力处理问题。如果你在使用中发现 bug 或有问题,建议自行下载源码修改。

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pynomadic-0.1.10.tar.gz (230.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

pynomadic-0.1.10-py3-none-any.whl (265.2 kB view details)

Uploaded Python 3

File details

Details for the file pynomadic-0.1.10.tar.gz.

File metadata

  • Download URL: pynomadic-0.1.10.tar.gz
  • Upload date:
  • Size: 230.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.2.1 CPython/3.13.7 Windows/11

File hashes

Hashes for pynomadic-0.1.10.tar.gz
Algorithm Hash digest
SHA256 c7c60ff23841f4da897298f40ed261084e2a4a4d9d14a91d47b73bfabf6ad55e
MD5 5f0f3a417ef4573ece5bc442dcb59dc5
BLAKE2b-256 9dfdf9fb1a9b5b042384b23417aedf662eb3fa0afeedf92452de31c209a84123

See more details on using hashes here.

File details

Details for the file pynomadic-0.1.10-py3-none-any.whl.

File metadata

  • Download URL: pynomadic-0.1.10-py3-none-any.whl
  • Upload date:
  • Size: 265.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.2.1 CPython/3.13.7 Windows/11

File hashes

Hashes for pynomadic-0.1.10-py3-none-any.whl
Algorithm Hash digest
SHA256 adf868b13a3c40f0302d21f85df3b470e76a31f44e678c59693f754aaa0f5473
MD5 15db70c2fe1e0d7f37c5db18e51d0a88
BLAKE2b-256 489a9335521cb7d7b2f53bb242e967f9d4a3b0c843d235aa3d66bea8c79fb87d

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page