Skip to main content

通用文件写入、打包、OSS 上传管线 SDK

Project description

FUploader

通用的「消费消息 → 写本地文件 → 打包归档 → 上传 OSS」管线 SDK。

核心流程

 MessageSource          MetaWriter                  MetaPacker
 ────────────         ──────────────            ─────────────────
 RabbitMQ ──► 解析消息 ──► 写盘到 slot ──► 封口 ──► 打包 .tar.zst ──► 上传 OSS ──► 清理本地
                          │
                          ▼
                    Redis (StateStore)
                    · slot 计数 / 封口阈值
                    · 待打包队列
                    · 分布式锁

架构设计

三层抽象接口

SDK 通过抽象接口解耦外部依赖,方便替换不同中间件:

接口 职责 内置实现
MessageSource 消息消费、ACK/NACK RabbitMQSource (aio-pika)
StateStore slot 状态、队列、分布式锁 RedisStateStore (redis-py async)
ObjectStore 文件上传、存在性检查 OssUploader (COS / 阿里云 OSS / S3)

四种策略注入

业务逻辑通过 Callable 注入,无需继承任何类:

策略 签名 作用
MessageParser (bytes) -> dict 将原始消息字节解析为结构化数据
FileNameGenerator (dict) -> str 根据数据生成写入文件名
Packer (Path) -> Path | None 将目录打包为归档文件
RemoteKeyGenerator (str) -> str 根据归档文件名生成远端 OSS key

Slot 分片机制

  • 按文件名 hash 取模分配到不同 slot 目录(slot_0_0slot_1_0 ...)
  • 每个 slot 有独立的 active_dir_idactive_count
  • 支持多进程并行写入同一存储目录,互不冲突

封口(Seal)机制

  • 每个 slot 目录内文件数达到 pack_threshold 时自动触发封口
  • 当前目录被标记为 ready,加入打包队列;同时创建新序号目录(如 slot_0_0slot_0_1
  • 阈值检查通过 Redis Lua 脚本保证原子性

残留恢复

启动时自动扫描 storage_root,找出非当前活跃的 slot_* 目录,重新加入待打包队列,防止进程崩溃后数据丢失。

安装

# 基础安装
pip install FUploader

# 按需安装 OSS 后端
pip install FUploader[cos]      # 腾讯云 COS
pip install FUploader[oss]      # 阿里云 OSS
pip install FUploader[s3]       # AWS S3 / MinIO
pip install FUploader[adapters] # 全部 OSS 后端

快速开始

以下示例展示抖音评论数据写入管线的完整用法(源码见 examples/douyin_writer.py):

import asyncio
import json
from pathlib import Path

from file_uploader import (
    FileWriterConfig,
    FileWriterPipeline,
    OssProvider,
    OssUploader,
    RabbitMQSource,
    RedisStateStore,
    tar_zstd_packer,
)


# 1. 业务策略函数
def douyin_message_parser(raw_body: bytes) -> dict:
    return json.loads(raw_body.decode("utf-8"))


def douyin_file_name_generator(data: dict) -> str:
    comment_id = data.get("cid", "unknown")
    return f"comment_{comment_id}.json"


def douyin_remote_key_generator(archive_name: str) -> str:
    from datetime import datetime
    date_str = datetime.now().strftime("%Y%m%d")
    return f"douyin/comments/{date_str}/{archive_name}"


async def main():
    # 2. 配置
    config = FileWriterConfig(
        slot_count=16,
        pack_threshold=1000,
        storage_root=Path("/data/douyin_output"),
        save_timeout=5.0,
        packer_concurrency=4,
        packer_interval=2.0,
        packer_max_retries=3,
    )

    # 3. Redis 状态存储
    state_store = RedisStateStore(
        host="localhost", port=6379, db=0,
        key_prefix="file_uploader:douyin:",
    )
    await state_store.connect()

    # 4. RabbitMQ 消息源
    message_source = RabbitMQSource(
        host="localhost", port=5672,
        user="guest", password="guest",
        queue_name="douyin_comment_queue",
    )

    # 5. OSS 上传器(腾讯云 COS)
    object_store = OssUploader(
        provider=OssProvider.COS,
        bucket="douyin-data-1250000000",
        region="ap-guangzhou",
        endpoint="cos.ap-guangzhou.myqcloud.com",
        access_key_id="AKID...",
        access_key_secret="...",
    )

    # 6. 构建 Pipeline
    pipeline = (
        FileWriterPipeline
        .with_config(config)
        .with_state_store(state_store)
        .with_message_source(message_source)
        .with_object_store(object_store)
        .with_message_parser(douyin_message_parser)
        .with_file_name_generator(douyin_file_name_generator)
        .with_packer(tar_zstd_packer)
        .with_remote_key_generator(douyin_remote_key_generator)
    )

    # 7. 启动 & 等待终止信号
    await pipeline.start()
    await pipeline.wait()  # 阻塞直到 SIGINT / SIGTERM
    await pipeline.stop()


if __name__ == "__main__":
    asyncio.run(main())

配置参考

FileWriterConfig 所有字段:

字段 类型 默认值 说明
storage_root Path 必填 本地存储根目录
slot_count int 1 分片数(多进程写文件时避免文件锁竞争)
pack_threshold int 1000 每个目录最多文件数,触发封口(seal)
write_concurrency int 10 写文件 worker 协程数
packer_concurrency int 2 打包上传 worker 协程数
save_timeout float 30.0 单文件写入超时(秒)
packer_interval float 1.0 打包轮询间隔(秒)
packer_max_retries int 3 打包失败最大重试次数(0=不重试,直接丢弃)
batch_size int 100 批量 ACK 阈值
flush_interval float 5.0 时间窗口 flush(秒)
prefetch_count int 50 消息预取数
resume_orphan_archives bool True 启动时是否恢复孤儿目录

核心组件

FileWriterPipeline

Builder 模式的一站式入口,封装完整的生命周期管理:

方法 说明
.with_config(config) 注入 FileWriterConfig 配置
.with_state_store(store) 注入 StateStore 实现
.with_message_source(source) 注入 MessageSource 实现
.with_object_store(store) 注入 ObjectStore 实现
.with_message_parser(parser) 注入消息解析策略
.with_file_name_generator(gen) 注入文件名生成策略
.with_packer(packer) 注入打包策略
.with_remote_key_generator(gen) 注入远端 key 生成策略
.no_recovery() 禁用启动时残留恢复
await pipeline.start() 启动 Pipeline
await pipeline.wait() 阻塞直到收到 SIGINT / SIGTERM
await pipeline.stop() 优雅关闭

MetaWriter

文件写入服务,负责:

  1. 通过 MessageSource 消费消息
  2. 调用 MessageParser 解析为结构化数据
  3. 调用 FileNameGenerator 生成文件名
  4. 按文件名 hash 取模路由到对应 slot 目录
  5. 将数据序列化为 JSON 写入磁盘
  6. 原子递增 slot 计数并检查封口阈值
  7. 达到阈值时封口(seal),将目录加入打包队列

MetaPacker

打包上传服务,负责:

  1. 轮询待打包队列
  2. 通过分布式锁防止重复打包
  3. 调用 Packer 将目录打包为归档文件
  4. 调用 RemoteKeyGenerator 生成远端 key
  5. 通过 ObjectStore 上传到 OSS
  6. 上传成功后清理本地归档和源目录
  7. 打包失败时指数退避重试,全部失败后丢弃

ResidualRecovery

启动期间执行残留恢复:

  • 扫描 storage_root 下所有 slot_* 目录
  • 排除当前活跃目录(active_dir_id
  • 跳过空目录
  • 将孤儿目录重新加入待打包队列

自定义策略

所有策略都是普通函数(支持同步/异步),无需继承框架基类:

# 自定义消息解析器(异步版本)
async def my_parser(raw_body: bytes) -> dict:
    data = json.loads(raw_body)
    # 数据清洗、补全...
    return data

# 自定义打包器(可替换 tar.zstd)
async def my_packer(source_dir: Path) -> Path | None:
    """7z 压缩示例"""
    archive = source_dir.with_suffix(".7z")
    result = await run_7z(source_dir, archive)
    return archive if result else None

# 注入到 Pipeline
pipeline = (
    FileWriterPipeline
    .with_config(config)
    # ...其他配置...
    .with_message_parser(my_parser)
    .with_packer(my_packer)
)

适配器

RabbitMQSource

基于 aio-pika 的 RabbitMQ 消费实现:

source = RabbitMQSource(
    host="localhost", port=5672,
    vhost="/", user="guest", password="guest",
    queue_name="my_queue",
    prefetch_count=50,
    requeue_on_nack=True,
)

RedisStateStore

基于 redis.asyncio 的状态存储实现。使用 Lua 脚本保证阈值检查和分布式锁的原子性:

store = RedisStateStore(
    host="localhost", port=6379, db=0,
    password="",
    key_prefix="file_uploader:task1:",
    lock_ttl=300,
)
await store.connect()

OssUploader

支持三种 OSS 后端的统一上传接口,各后端依赖按需安装:

# 腾讯云 COS → pip install FUploader[cos]
uploader = OssUploader(
    provider=OssProvider.COS,
    bucket="my-bucket-1250000000",
    region="ap-guangzhou",
    endpoint="cos.ap-guangzhou.myqcloud.com",
    access_key_id="AKID...",
    access_key_secret="...",
)

# 阿里云 OSS → pip install FUploader[oss]
uploader = OssUploader(
    provider=OssProvider.ALIYUN_OSS,
    bucket="my-bucket",
    endpoint="oss-cn-hangzhou.aliyuncs.com",
    access_key_id="LTAI...",
    access_key_secret="...",
)

# AWS S3 / MinIO → pip install FUploader[s3]
uploader = OssUploader(
    provider=OssProvider.S3,
    bucket="my-bucket",
    region="us-east-1",
    endpoint="https://s3.amazonaws.com",
    access_key_id="AKIA...",
    access_key_secret="...",
)

项目结构

src/file_uploader/
├── __init__.py              # 公开 API 导出
├── config.py                # FileWriterConfig 配置类
├── pipeline.py              # FileWriterPipeline 入口 + Builder
├── interfaces/              # 抽象接口层
│   ├── message_source.py    #   MessageSource 抽象
│   ├── object_store.py      #   ObjectStore 抽象
│   ├── state_store.py       #   StateStore 抽象 + SlotRuntimeState
│   └── strategies.py        #   MessageParser / FileNameGenerator / Packer / RemoteKeyGenerator 类型
├── adapters/                # 适配器实现
│   ├── rabbitmq_source.py   #   RabbitMQSource
│   ├── redis_state_store.py #   RedisStateStore
│   └── oss_uploader.py      #   OssUploader (COS / OSS / S3)
├── packers/                 # 打包策略
│   └── tar_zstd.py          #   tar + zstd 打包器
├── services/                # 核心服务
│   ├── meta_writer.py       #   MetaWriter 写入服务
│   └── meta_packer.py       #   MetaPacker 打包上传服务
└── recovery/                # 残留恢复
    └── residual.py          #   ResidualRecovery

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

fuploader-1.0.0.tar.gz (26.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

fuploader-1.0.0-py3-none-any.whl (30.7 kB view details)

Uploaded Python 3

File details

Details for the file fuploader-1.0.0.tar.gz.

File metadata

  • Download URL: fuploader-1.0.0.tar.gz
  • Upload date:
  • Size: 26.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.0

File hashes

Hashes for fuploader-1.0.0.tar.gz
Algorithm Hash digest
SHA256 507a6ab4a4e324437d693fa5a558e003e975ab6c9fecdf0c51b5001783db6edd
MD5 4915b17451e4a2784115b227cb4d7702
BLAKE2b-256 2a734a9a301f9dd28ae76f0d3e53abc5f91feb0be2c2c889784c184a509f8f92

See more details on using hashes here.

File details

Details for the file fuploader-1.0.0-py3-none-any.whl.

File metadata

  • Download URL: fuploader-1.0.0-py3-none-any.whl
  • Upload date:
  • Size: 30.7 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.0

File hashes

Hashes for fuploader-1.0.0-py3-none-any.whl
Algorithm Hash digest
SHA256 e2bec6ca8f6e7c44ade403146acad478ebf99611a713a58341580ca157b1897c
MD5 fbd52a8def58df366f0e7cb7d16dc549
BLAKE2b-256 f3ef266368a037f0a701235708fd15e2c8e755d2d2614194111f26d7db7f6e4a

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page