Skip to main content

A configuration-driven stock data storage framework

Project description

Stock Storage Frame

一个配置驱动的股票数据存储框架,专注于后端数据处理流程。

特性

  • 配置驱动完全通过YAML配置文件定义数据处理流程
  • 模块化设计:采集器、处理器、存储器分离,易于扩展
  • 灵活的数据处理:支持自定义Python脚本进行数据转换
  • 多存储支持:支持SQLite、MySQL、PostgreSQL、CSV等多种存储后端
  • 简单易用:无需编写复杂代码,通过配置即可完成数据流程

架构

框架采用分层架构设计,各层职责清晰,便于扩展和维护。整体架构如下:

graph TB
    subgraph "配置层"
        A1[主配置文件 config.yaml]
        A2[Workflow配置文件]
        A3[环境变量配置]
    end
    
    subgraph "调度层"
        B1[定时调度器]
        B2[条件检查器]
        B3[任务执行器]
    end
    
    subgraph "采集层"
        C1[akshare采集器]
        C2[tushare采集器]
        C3[自定义脚本采集器]
    end
    
    subgraph "处理层"
        D1[数据清洗]
        D2[数据转换]
        D3[自定义脚本处理]
    end
    
    subgraph "存储层"
        E1[SQLite存储]
        E2[MySQL存储]
        E3[CSV存储]
        E4[PostgreSQL存储]
    end
    
    subgraph "数据层"
        F1[原始数据源]
        F2[处理后的数据]
        F3[持久化存储]
    end
    
    A1 --> B1
    A2 --> B1
    A3 --> B1
    B1 --> B2
    B2 --> B3
    B3 --> C1
    B3 --> C2
    B3 --> C3
    C1 --> D1
    C2 --> D1
    C3 --> D1
    D1 --> D2
    D2 --> D3
    D3 --> E1
    D3 --> E2
    D3 --> E3
    D3 --> E4
    F1 --> C1
    F1 --> C2
    F1 --> C3
    E1 --> F3
    E2 --> F3
    E3 --> F3
    E4 --> F3
    D3 --> F2

架构说明

  1. 配置层

    • 主配置文件 (config.yaml):定义全局的采集器、处理器和存储器配置。
    • Workflow配置文件:定义具体的数据处理流程,包括调度计划、采集器选择、处理器配置和存储目标。
    • 环境变量配置:通过 .env 文件管理敏感信息,支持 ${ENV_VAR} 变量替换。
  2. 调度层

    • 定时调度器:根据 Workflow 中的 schedule 字段(cron表达式)自动触发任务执行。
    • 条件检查器:执行条件脚本,验证是否满足执行条件(如是否为交易日)。
    • 任务执行器:协调整个数据处理流程,按顺序调用采集层、处理层和存储层。
  3. 采集层

    • akshare采集器:通过 akshare 库获取股票数据,支持多种数据接口。
    • tushare采集器:通过 tushare 库获取专业金融数据,需要 token 认证。
    • 自定义脚本采集器:执行用户自定义的 Python 脚本,实现灵活的数据采集逻辑。
  4. 处理层

    • 数据清洗:对原始数据进行去重、缺失值处理、格式标准化等操作。
    • 数据转换:计算技术指标、数据聚合、字段映射等转换操作。
    • 自定义脚本处理:通过用户自定义的 Python 脚本实现复杂的数据处理逻辑。
  5. 存储层

    • SQLite存储:轻量级本地数据库,适合单机部署和小规模数据。
    • MySQL存储:关系型数据库,支持多用户并发访问和大规模数据存储。
    • CSV存储:文件存储格式,便于数据导出和外部工具处理。
    • PostgreSQL存储:高级关系型数据库,支持复杂查询和地理空间数据。
  6. 数据层

    • 原始数据源:外部数据源,如股票交易所、金融数据 API 等。
    • 处理后的数据:经过清洗和转换的标准化数据,准备存储。
    • 持久化存储:最终存储位置,数据可供后续分析和应用使用。

数据处理流程

  1. 调度层根据配置触发任务执行。
  2. 采集层从原始数据源获取数据。
  3. 处理层对数据进行清洗、转换和自定义处理。
  4. 存储层将处理后的数据持久化到指定存储后端。
  5. 整个流程完全由配置文件驱动,无需编写复杂代码。

安装

从源码安装

## 从源码安装
git clone https://gitee.com/panhuachao/stock-storage-frame.git
cd stock-storage-frame
python3 -m venv venv
source venv/bin/activate

## 国内源
pip install -i https://mirrors.aliyun.com/pypi/simple -r requirements.txt
## 官方源
pip install -i https://pypi.org/simple/ -r requirements.txt

从PyPI安装

# 使用pip安装
pip install stock-storage-frame
# 如使用postgresql数据库,pg数据库并不默认安装,需要额外安装
pip install stock-storage-frame[postgresql]

快速开始

1. 创建配置文件

创建 config.yaml

app:
  name: "stock-data-pipeline"
  version: "1.0.0"
  log_level: "INFO"
  log_dir: "./logs"

# 数据采集器配置
collectors:
  akshare1:
    type: "akshare"
    # method: "stock_zh_a_hist"  # 默认方法,可在workflow中设置
    config:
      timeout: 30
      retry_times: 3
  
  tushare1:
    type: "tushare"
    config:
      token: "${TUSHARE_TOKEN}" # 环境变量,.env文件中定义,参考.env.example,下述相同
      timeout: 30

  gainiancollector:
    type: "custom"
    # script: "./scripts/collector_demo.py" #默认配置脚本,可在workflow中设置
    config:
      host: "${MYSQL_HOST}" # 环境变量,.env文件中定义,参考.env.example,下述相同
      port: "${MYSQL_PORT}" # 环境变量,.env文件中定义,参考.env.example,下述相同
      database: "${MYSQL_DB}" # 环境变量,.env文件中定义,参考.env.example,下述相同
      username: "${MYSQL_USER}" # 环境变量,.env文件中定义,参考.env.example,下述相同
      password: "${MYSQL_PASSWORD}" # 环境变量,.env文件中定义,参考.env.example,下述相同

  customcollector:
    type: "custom"
    # script: "./scripts/collector_demo.py" #默认配置脚本,可在workflow中设置
    config:
      api_url: "http://api_url"
      api_key: "http_api_key"

# 数据存储配置,根据自身需要进行配置
storages:
  sqlite1:
    type: "sqlite"
    config:
      database: "./data/stock_data.db"
  
  csv1:
    type: "csv"
    config:
      directory: "./data/csv"
  
  mysql1:
    type: "mysql"
    config:
      host: "${MYSQL_HOST}"
      port: "${MYSQL_PORT}"
      database: "${MYSQL_DB}"
      username: "${MYSQL_USER}"
      password: "${MYSQL_PASSWORD}"
  
  postgresql1:
    type: "postgresql"
    config:
      host: "${POSTGRES_HOST}"
      port: "${POSTGRES_PORT}"
      database: "${POSTGRES_DB}"
      username: "${POSTGRES_USER}"
      password: "${POSTGRES_PASSWORD}"
      schema: "${POSTGRES_SCHEMA}"

2. 创建Workflow配置

创建 workflows/daily_stock_data.yaml

# workflow: stock_index_data.yaml
name: "stock_index_data"
description: "新浪财经-当天中国股票指数数据"
schedule: "40 20 * * *"  # 每天20:40执行,命令--schedule开启定时任务

# 条件配置(可选),如判定是否是交易日,只有定时任务才判断
condition:
  script: "./scripts/condition_trade_day.py"

# 数据采集配置 - 使用不同的akshare方法
collector:
  name: "akshare1" #对应config.yaml中定义的akshare1名称,用于支持不同yaml不同数据源
  type: "akshare"
  method: "stock_zh_index_spot_sina"  # 股票指数日线数据

# 数据处理配置(可选)
processor:
  # 使用Python脚本处理数据
  script: "./scripts/process_add_date.py"

# 数据存储配置
storage:
  name: "sqlite1" #对应config.yaml中定义的sqlite1名称,用于支持不同yaml使用相同类型但位置不同的数据存储
  type: "sqlite"
  config:
    table_name: "stock_index_data"

3. 创建自定义处理脚本

创建 scripts/process_add_date.py

import pandas as pd
from datetime import datetime

def process(df: pd.DataFrame) -> pd.DataFrame:
    """自定义数据处理逻辑"""
    # 计算技术指标,根据需要自身添加
    # df['ma5'] = df['close'].rolling(5).mean()
    # df['ma10'] = df['close'].rolling(10).mean()
    
    # 添加自定义字段,如日期,akshare中基本的数据都需要增加日期
    df['日期'] = datetime.now().strftime("%Y-%m-%d") # 添加日期
    
    return df

4. 执行Workflow

a. 库使用(通过pip安装)

如通过pip安装,则可以使用命令stock-storage-frame,如下:

# 执行单个workflow
stock-storage-frame --workflow workflows/stock_daily.yaml

# 对特定workflow开启定时任务
stock-storage-frame --workflow workflows/stock_daily.yaml --schedule

# 执行特定目录下所有workflow
stock-storage-frame --workflows-dir workflows

# 对目录下所有workflow执行定时任务
stock-storage-frame --workflows-dir workflows --schedule

# 执行所有workflow(默认目录)
stock-storage-frame --all

# 测试所有组件
stock-storage-frame --test

# 验证workflow配置
stock-storage-frame --validate workflows/stock_daily.yaml

# 查看调度器状态
stock-storage-frame --scheduler-status

b. 源码使用(从源码项目)

如直接下载本项目源码进行使用,则可以使用命令python -m src.stock_storage.main:

# 执行单个workflow
python -m src.stock_storage.main --workflow workflows/stock_daily.yaml

# 对特定workflow开启定时任务
python -m src.stock_storage.main --workflow workflows/stock_daily.yaml --schedule

# 执行特定目录下所有workflow
python -m src.stock_storage.main --workflows-dir workflows

# 对目录下所有workflow执行定时任务
python -m src.stock_storage.main --workflows-dir workflows --schedule

# 执行所有workflow(默认目录)
python -m src.stock_storage.main --all

# 测试所有组件
python -m src.stock_storage.main --test

# 验证workflow配置
python -m src.stock_storage.main --validate workflows/stock_daily.yaml

# 查看调度器状态
python -m src.stock_storage.main --scheduler-status

5. 定时器调度

框架提供了内置的定时器调度功能,可以根据workflow配置中的schedule字段自动执行任务。

# 启动调度器(后台运行)- 调度所有有schedule的workflow
python -m src.stock_storage.main --schedule

# 对特定workflow开启定时任务
python -m src.stock_storage.main --workflow workflows/stock_daily.yaml --schedule

# 对目录下所有workflow执行定时任务
python -m src.stock_storage.main --workflows-dir workflows --schedule

# 查看调度器状态
python -m src.stock_storage.main --scheduler-status

# 使用自定义配置和workflow目录
python -m src.stock_storage.main --schedule --config custom_config.yaml --workflows-dir custom_workflows

调度器功能:

  • 自动加载所有包含schedule字段的workflow配置
  • 根据cron表达式计算下一次执行时间
  • 支持优雅关闭(Ctrl+C)
  • 实时日志记录执行结果
  • 支持多workflow并发调度
  • 支持调度特定workflow或整个目录

项目结构

stock-storage-frame/
├── README.md
├── pyproject.toml
├── config.yaml                    # 主配置文件
├── workflows/                     # workflow配置目录
│   ├── daily_stock_data.yaml
│   ├── weekly_report.yaml
│   └── realtime_data.yaml
├── scripts/                       # 自定义处理脚本
│   ├── process_daily_data.py
│   └── calculate_indicators.py
├── src/
│   └── stock_storage/
│       ├── __init__.py
│       ├── main.py                # 主程序入口
│       ├── engine.py              # Workflow引擎
│       ├── models.py              # 数据模型
│       ├── factories.py           # 组件工厂
│       ├── collectors/            # 采集器实现
│       │   ├── __init__.py
│       │   ├── base.py
│       │   ├── akshare.py
│       │   └── tushare.py
│       ├── processors/            # 处理器实现
│       │   ├── __init__.py
│       │   ├── base.py
│       │   ├── pandas.py
│       │   └── custom.py
│       └── storages/              # 存储器实现
│           ├── __init__.py
│           ├── base.py
│           ├── sqlite.py
│           ├── mysql.py
│           └── csv.py
└── data/                          # 数据存储目录
    ├── stock_data.db              # SQLite数据库
    └── csv/                       # CSV文件

配置说明

主配置文件 (config.yaml)

主配置文件定义了全局的采集器、处理器和存储器配置。支持环境变量替换 ${ENV_VAR}

Workflow配置文件

每个workflow配置文件定义了一个完整的数据处理流程,包括:

  • name: workflow名称
  • description: 描述信息
  • schedule: 执行计划(cron表达式)
  • collector: 数据采集配置
  • processor: 数据处理配置
  • storage: 数据存储配置

模板变量

workflow配置支持以下模板变量:

  • { today:YYYYMMDD }}或{{ today:YYYY-MM-DD }}: 今天日期
  • {{ yesterday:YYYYMMDD }}或{{ yesterday:YYYY-MM-DD }}: 昨天日期
  • {{ now }}: 当前时间

扩展开发

添加新的采集器

  1. src/stock_storage/collectors/ 目录下创建新的采集器类
  2. 继承 BaseCollector 类并实现必要的方法
  3. factories.py 中注册新的采集器

添加新的存储器

  1. src/stock_storage/storages/ 目录下创建新的存储器类
  2. 继承 BaseStorage 类并实现必要的方法
  3. factories.py 中注册新的存储器

添加新的处理器

  1. src/stock_storage/processors/ 目录下创建新的处理器类
  2. 继承 BaseProcessor 类并实现必要的方法
  3. factories.py 中注册新的处理器

许可证

MIT License

贡献

欢迎提交Issue和Pull Request!

联系方式

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

stock_storage_frame-1.0.2.tar.gz (231.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

stock_storage_frame-1.0.2-py3-none-any.whl (52.8 kB view details)

Uploaded Python 3

File details

Details for the file stock_storage_frame-1.0.2.tar.gz.

File metadata

  • Download URL: stock_storage_frame-1.0.2.tar.gz
  • Upload date:
  • Size: 231.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.9.13

File hashes

Hashes for stock_storage_frame-1.0.2.tar.gz
Algorithm Hash digest
SHA256 da25fe5e96c730b5a05a6a83a040b9df9391ce1b40ea8cdb3b1340cc750d4986
MD5 99e4e72b3e746bb0313000420cca6ce5
BLAKE2b-256 0e0501dd1f5bd876c071f49579abf676b3bd17d8b5629aae1f6059b334ed2a6f

See more details on using hashes here.

File details

Details for the file stock_storage_frame-1.0.2-py3-none-any.whl.

File metadata

File hashes

Hashes for stock_storage_frame-1.0.2-py3-none-any.whl
Algorithm Hash digest
SHA256 58b0ffcf01115c6e289891babb02192e60f6001167db7f636731efaa341117f5
MD5 3f429a037386b66077f5e4c62b28a245
BLAKE2b-256 e2d51fcf9eb0c0e165136fa4bfff981d1fe530fc52ee3c2e13e7cfe862a7fcaf

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page