Skip to main content

fintech module for x17 echosystem

Project description

lib-x17-fintech

专业的金融数据处理库 - 提供统一的存储抽象、数据获取、模式管理和序列化能力。

Python License Tests Coverage

概述

lib-x17-fintech 是一个功能完整的金融数据处理库,为 X17 生态系统提供核心数据基础设施。通过模块化设计,提供从数据获取到存储的完整解决方案。

核心特性

  • 🔌 存储抽象 - 统一的接口支持本地文件系统和 AWS S3
  • 📊 数据获取 - 完整的 Job 系统,支持 Tushare、Baostock 等数据源
  • 📐 模式管理 - 灵活的表结构定义和验证
  • 💾 序列化 - 多格式支持(Parquet、CSV、JSON、Pickle)
  • 高性能 - 智能缓存、重试和分页机制
  • 🛡️ 类型安全 - 完整的类型提示和协议接口

快速开始

安装

# 基础安装
pip install lib-x17-fintech

# 开发环境(包含测试工具)
pip install lib-x17-fintech[dev]

# 从源码安装
git clone <repository-url>
cd lib-x17-fintech
pip install -e .

基本用法

1. 获取金融数据

from xfintech.data.source.tushare import Session, Stock

# 创建会话
session = Session(credential="your_tushare_token")

# 获取股票列表
job = Stock(session=session)
result = job.run()

print(result.data)  # pandas DataFrame

2. 保存数据到存储

from xfintech.connect import MacOSConnect, Artifact, ConnectRef
from xfintech.serde import PandasSerialiser, DataFormat

# 创建存储连接
connect = MacOSConnect()

# 保存数据
artifact = Artifact(
    ref=ConnectRef("~/data/stocks.parquet"),
    data=result.data
)
artifact.write(connect, PandasSerialiser, DataFormat.PARQUET)

3. 定义数据模式

from xfintech.fabric import TableInfo, ColumnInfo, ColumnKind

# 定义表结构
schema = TableInfo(
    name="stock_daily",
    columns=[
        ColumnInfo(name="ts_code", kind=ColumnKind.STRING),
        ColumnInfo(name="trade_date", kind=ColumnKind.DATE),
        ColumnInfo(name="open", kind=ColumnKind.FLOAT),
        ColumnInfo(name="close", kind=ColumnKind.FLOAT),
    ]
)

# 验证数据
schema.validate(dataframe)

4. 完整数据管道

from xfintech.data.source.tushare import Session, Dayline
from xfintech.data.common import Cache, Retry
from xfintech.connect import S3Connect, Artifact, ConnectRef
from xfintech.serde import PandasSerialiser, DataFormat

# 1. 获取数据(带缓存和重试)
session = Session(credential="token")
job = Dayline(
    session=session,
    params={"ts_code": "000001.SZ", "start_date": "20260101"},
    cache=Cache(),
    retry=Retry(retry=3, wait=1.0, rate=2.0)
)
result = job.run()

# 2. 保存到 S3
connect = S3Connect(bucket="my-bucket")
artifact = Artifact(
    ref=ConnectRef("data/dayline/000001.parquet"),
    data=result.data,
    meta={"job": job.name, "duration": job.metric.duration}
)
artifact.write(connect, PandasSerialiser, DataFormat.PARQUET)

print(f"✅ 保存 {len(result.data)} 行数据,耗时 {job.metric.duration:.2f} 秒")

模块架构

xfintech/
├── connect/        # 存储连接抽象
│   ├── MacOSConnect    - 本地文件系统
│   ├── S3Connect       - AWS S3
│   └── Artifact        - 数据工件
│
├── data/           # 数据获取基础设施
│   ├── common/         - 通用工具(Cache, Retry, Metric)
│   ├── job/            - Job 系统和注册中心
│   ├── relay/          - 中继客户端
│   └── source/         - 数据源(Tushare, Baostock)
│
├── fabric/         # 数据模式管理
│   ├── TableInfo       - 表结构定义
│   ├── ColumnInfo      - 列信息
│   └── ColumnKind      - 数据类型枚举
│
└── serde/          # 序列化/反序列化
    ├── PythonSerialiser    - Python 对象
    ├── PandasSerialiser    - DataFrame
    └── DataFormat          - 格式枚举

模块详解

xfintech.connect - 存储连接

统一的存储抽象层,支持多种后端。

主要功能:

  • MacOSConnect - 本地文件系统操作
  • S3Connect - AWS S3 对象存储
  • Artifact - 数据工件管理
  • ConnectRef - 路径引用

快速示例:

from xfintech.connect import MacOSConnect, Artifact, ConnectRef

connect = MacOSConnect()
artifact = Artifact(ref=ConnectRef("~/data/file.parquet"), data=df)
artifact.write(connect, serialiser, format)

📖 完整文档


xfintech.data - 数据获取

完整的数据获取基础设施,Job 系统管理数据请求生命周期。

主要功能:

  • Job - 作业基类
  • TushareJob - Tushare 数据源
  • Cache - 智能缓存
  • Retry - 指数退避重试
  • Metric - 性能监控

快速示例:

from xfintech.data.source.tushare import Session, Stock
from xfintech.data.common import Cache, Retry

session = Session(credential="token")
job = Stock(
    session=session,
    cache=Cache(),
    retry=Retry(retry=3, wait=1.0)
)
result = job.run()

支持的数据源:

  • ✅ Tushare (28+ 股票数据类)
  • ✅ Baostock
  • 🔄 更多数据源开发中...

📖 完整文档


xfintech.fabric - 数据模式

灵活的表结构定义和验证系统。

主要功能:

  • TableInfo - 表结构定义
  • ColumnInfo - 列信息管理
  • ColumnKind - 7 种数据类型
  • 数据验证和文档生成

快速示例:

from xfintech.fabric import TableInfo, ColumnInfo, ColumnKind

schema = TableInfo(
    name="stock_daily",
    columns=[
        ColumnInfo(name="ts_code", kind=ColumnKind.STRING),
        ColumnInfo(name="close", kind=ColumnKind.FLOAT),
    ]
)

# 验证数据
schema.validate(df)

📖 完整文档


xfintech.serde - 序列化

多格式序列化和反序列化支持。

主要功能:

  • PythonSerialiser - Python 对象(Pickle)
  • PandasSerialiser - DataFrame(Parquet/CSV/JSON)
  • DataFormat - 格式枚举
  • 自动类型检测

快速示例:

from xfintech.serde import PandasSerialiser, DataFormat

# 序列化
serialised = PandasSerialiser.serialise(df, DataFormat.PARQUET)

# 反序列化
df = PandasSerialiser.deserialise(serialised, DataFormat.PARQUET)

支持格式:

格式 扩展名 适用场景
Parquet .parquet 大数据集(推荐)
CSV .csv 文本导入/导出
JSON .json API 交互
Pickle .pkl Python 对象

📖 完整文档

使用场景

场景 1: 构建数据仓库

from xfintech.data.source.tushare import Session, Dayline
from xfintech.connect import S3Connect, Artifact, ConnectRef
from xfintech.serde import PandasSerialiser, DataFormat
from datetime import datetime, timedelta

def build_data_warehouse():
    """构建股票日线数据仓库"""
    session = Session(credential="token")
    connect = S3Connect(bucket="datawarehouse")
    
    # 获取过去 30 天的数据
    end_date = datetime.now()
    start_date = end_date - timedelta(days=30)
    
    symbols = ["000001.SZ", "600000.SH", "000002.SZ"]
    
    for symbol in symbols:
        job = Dayline(
            session=session,
            params={
                "ts_code": symbol,
                "start_date": start_date.strftime("%Y%m%d"),
                "end_date": end_date.strftime("%Y%m%d")
            }
        )
        result = job.run()
        
        # 保存到 S3
        path = f"stocks/daily/{symbol}/{end_date.strftime('%Y%m%d')}.parquet"
        artifact = Artifact(ref=ConnectRef(path), data=result.data)
        artifact.write(connect, PandasSerialiser, DataFormat.PARQUET)
        
        print(f"✅ {symbol}: {len(result.data)} 行")

场景 2: 数据质量监控

from xfintech.data.source.tushare import Session, Stock
from xfintech.fabric import TableInfo, ColumnInfo, ColumnKind

def monitor_data_quality():
    """监控数据质量"""
    # 1. 定义期望的模式
    expected_schema = TableInfo(
        name="stock_basic",
        columns=[
            ColumnInfo(name="ts_code", kind=ColumnKind.STRING),
            ColumnInfo(name="name", kind=ColumnKind.STRING),
            ColumnInfo(name="list_date", kind=ColumnKind.DATE),
        ]
    )
    
    # 2. 获取数据
    session = Session(credential="token")
    job = Stock(session=session)
    result = job.run()
    
    # 3. 验证
    try:
        expected_schema.validate(result.data)
        print("✅ 数据质量检查通过")
    except Exception as e:
        print(f"❌ 数据质量问题: {e}")

场景 3: 增量数据更新

from xfintech.data.source.tushare import Session, TradeDate, Dayline
from xfintech.connect import MacOSConnect, Artifact, ConnectRef
from xfintech.serde import PandasSerialiser, DataFormat
import pandas as pd

def incremental_update():
    """增量更新每日数据"""
    session = Session(credential="token")
    connect = MacOSConnect()
    
    # 1. 获取最新交易日
    trade_date_job = TradeDate(
        session=session,
        params={"exchange": "SSE"}
    )
    trade_dates = trade_date_job.run().data
    latest_date = trade_dates['cal_date'].max()
    
    # 2. 检查是否已存在
    ref = ConnectRef(f"~/data/daily/{latest_date}.parquet")
    if connect.exists(ref):
        print(f"📌 {latest_date} 数据已存在,跳过")
        return
    
    # 3. 获取并保存新数据
    job = Dayline(
        session=session,
        params={"trade_date": latest_date}
    )
    result = job.run()
    
    artifact = Artifact(ref=ref, data=result.data)
    artifact.write(connect, PandasSerialiser, DataFormat.PARQUET)
    
    print(f"✅ 更新 {latest_date}: {len(result.data)} 行")

测试

# 运行所有测试
pytest

# 带覆盖率报告
pytest --cov=xfintech --cov-report=html

# 运行特定模块测试
pytest xfintech/connect/
pytest xfintech/data/
pytest xfintech/fabric/
pytest xfintech/serde/

# 性能基准测试
pytest --benchmark-only

测试统计:

  • ✅ 2,467 测试全部通过
  • 📊 98% 代码覆盖率
  • ⚡ 所有测试 < 5 秒

开发

项目结构

lib-x17-fintech/
├── xfintech/              # 源代码   ├── connect/           # 存储模块   ├── data/              # 数据模块   ├── fabric/            # 模式模块   └── serde/             # 序列化模块
├── tests/                 # 测试(镜像源码结构)
├── example/               # 示例代码
├── pyproject.toml         # 项目配置
├── pytest.ini             # 测试配置
├── environment.yml        # Conda 环境
└── makefile               # 构建脚本

环境设置

# 使用 Conda
conda env create -f environment.yml
conda activate xfintech

# 或使用 pip
python -m venv venv
source venv/bin/activate  # Linux/Mac
pip install -e .[dev]

代码规范

# 格式化和 Lint
ruff check xfintech/
ruff format xfintech/

# 类型检查
mypy xfintech/

构建和发布

# 构建包
python -m build

# 发布到 PyPI
twine upload dist/*

配置

环境变量

# Tushare 配置
export TUSHARE_TOKEN="your_token_here"

# AWS 配置(用于 S3Connect)
export AWS_ACCESS_KEY_ID="your_key"
export AWS_SECRET_ACCESS_KEY="your_secret"
export AWS_DEFAULT_REGION="us-east-1"

# 缓存目录
export XFINTECH_CACHE_DIR="/custom/cache/path"

配置文件

创建 ~/.xfintech/config.toml:

[tushare]
token = "your_tushare_token"

[cache]
path = "/data/cache"
ttl = 86400  # 24 hours

[s3]
bucket = "my-data-bucket"
region = "us-east-1"

最佳实践

✅ 推荐做法

# 1. 始终使用缓存
from xfintech.data.common import Cache
job = SomeJob(session=session, cache=Cache())

# 2. 配置重试策略
from xfintech.data.common import Retry
job = SomeJob(session=session, retry=Retry(retry=3, wait=1.0, rate=2.0))

# 3. 使用上下文管理器
with connect:
    artifact.write(connect, serialiser, format)

# 4. 验证数据模式
schema.validate(dataframe)

# 5. 监控性能
result = job.run()
print(f"耗时: {job.metric.duration} 秒")

❌ 避免的做法

# 1. 不使用缓存(重复 API 调用)
job = SomeJob(session=session)  # 缺少 cache

# 2. 硬编码路径
path = "/Users/john/data/file.csv"  # 使用 ConnectRef

# 3. 忽略错误
result = job.run()  # 不检查 job.metric.errors

# 4. 不验证数据
artifact.write(connect, serialiser, format)  # 先验证

常见问题

Q: 如何选择存储后端?

A:

  • 本地开发/小数据集 → MacOSConnect
  • 生产环境/大数据集/团队协作 → S3Connect

Q: 哪种序列化格式最适合?

A:

  • 性能优先(大数据)→ Parquet
  • 人类可读 → CSV
  • API 交互 → JSON
  • Python 对象 → Pickle

Q: 如何提高数据获取性能?

A:

  1. 启用缓存: Cache()
  2. 调大分页: Paginate(pagesize=5000)
  3. 并发获取: 使用 ThreadPoolExecutor
  4. 使用中继: Session(mode="relay")

Q: 测试失败怎么办?

A:

# 查看详细错误
pytest -v

# 只运行失败的测试
pytest --lf

# 查看标准输出
pytest -s

贡献

欢迎贡献!请遵循以下流程:

  1. Fork 项目
  2. 创建特性分支 (git checkout -b feature/amazing)
  3. 提交更改 (git commit -m 'Add amazing feature')
  4. 推送到分支 (git push origin feature/amazing)
  5. 创建 Pull Request

许可证

MIT License - 详见 LICENSE 文件

作者

Xing Xing - x.xing.work@gmail.com

相关项目

  • lib-x17-cloudcdk - AWS CDK 基础设施
  • lib-x17-cloudsdk - 云服务 SDK
  • lib-x17-cloudmeta - 云元数据管理

更新日志

v2.0.0 (2026-01-09)

  • ✨ 完整的四模块架构(connect, data, fabric, serde)
  • 📚 完善的文档和示例
  • ✅ 2,467 测试,98% 覆盖率
  • 🚀 性能优化和缓存机制

让金融数据处理变得简单、高效、可靠! 🚀

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

lib_x17_fintech-2.1.8.tar.gz (299.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

lib_x17_fintech-2.1.8-py3-none-any.whl (394.1 kB view details)

Uploaded Python 3

File details

Details for the file lib_x17_fintech-2.1.8.tar.gz.

File metadata

  • Download URL: lib_x17_fintech-2.1.8.tar.gz
  • Upload date:
  • Size: 299.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.14

File hashes

Hashes for lib_x17_fintech-2.1.8.tar.gz
Algorithm Hash digest
SHA256 47fc5d566e798d439c661e80958d0b8c65cb3aa6000718844d8ae1a375413717
MD5 1763da1bc8c0dca717affd73f209459d
BLAKE2b-256 ee9490420309df4a24e49a99a9090fde04486ce19bdf912577eb2777fd4e4f65

See more details on using hashes here.

File details

Details for the file lib_x17_fintech-2.1.8-py3-none-any.whl.

File metadata

File hashes

Hashes for lib_x17_fintech-2.1.8-py3-none-any.whl
Algorithm Hash digest
SHA256 09ff22c509599b6b3bb6a4a046f9a222383791b3a7d090aca6d6833889c59df7
MD5 974fdaa01071a8a622a39114b8cc39ed
BLAKE2b-256 106f112eca228957dace0d13fd15432194fe7216e05f4c1220fcb7767ad89c52

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page