Skip to main content

通用文件写入、打包、OSS 上传管线 SDK

Project description

FUploader

通用的「消费消息 → 写本地文件 → 打包归档 → 上传 OSS」管线 SDK。

核心流程

 MessageSource          MetaWriter                  MetaPacker
 ────────────         ──────────────            ─────────────────
 RabbitMQ ──► 解析消息 ──► 写盘到 slot ──► 封口 ──► 打包 .tar.zst ──► 上传 OSS ──► 清理本地
                          │
                          ▼
                    Redis (StateStore)
                    · slot 计数 / 封口阈值
                    · 待打包队列
                    · 分布式锁

架构设计

三层抽象接口

SDK 通过抽象接口解耦外部依赖,方便替换不同中间件:

接口 职责 内置实现
MessageSource 消息消费、ACK/NACK RabbitMQSource (aio-pika)
StateStore slot 状态、队列、分布式锁 RedisStateStore (redis-py async)
ObjectStore 文件上传、存在性检查 OssUploader (COS / 阿里云 OSS / S3)

四种策略注入

业务逻辑通过 Callable 注入,无需继承任何类:

策略 签名 作用
MessageParser (bytes) -> dict 将原始消息字节解析为结构化数据
FileNameGenerator (dict) -> str 根据数据生成写入文件名
Packer (Path) -> Path | None 将目录打包为归档文件
RemoteKeyGenerator (str) -> str 根据归档文件名生成远端 OSS key

Slot 分片机制

  • 按文件名 hash 取模分配到不同 slot 目录(slot_0_0slot_1_0 ...)
  • 每个 slot 有独立的 active_dir_idactive_count
  • 支持多进程并行写入同一存储目录,互不冲突

Legacy 高并发模式

迁移原 main_meta_writer.py 场景时,推荐使用 legacy 模式:

  • 主进程作为 supervisor,子进程作为 worker 执行业务。
  • META_WRITER_PROCESS_COUNT 控制 worker 进程数。
  • 子进程 NODE_ID 形如 node_id-{本机IP}-meta-p0node_id-{本机IP}-meta-p1
  • 每个 worker 固定写自己的 slot,避免共享写锁竞争。
  • Redis key 对齐 {task_name}:meta:{node_id-{本机IP}}:*
  • 本地目录对齐 active/slot-0/dir-000001sealed/slot-0/dir-000001

封口(Seal)机制

  • 每个 slot 目录内文件数达到 pack_threshold 时自动触发封口
  • 当前目录被标记为 ready,加入打包队列;同时创建新序号目录(如 slot_0_0slot_0_1
  • 阈值检查通过 Redis Lua 脚本保证原子性

残留恢复

启动时自动扫描 storage_root,找出非当前活跃的 slot_* 目录,重新加入待打包队列,防止进程崩溃后数据丢失。

安装

# 基础安装
pip install FUploader

# 按需安装 OSS 后端
pip install FUploader[cos]      # 腾讯云 COS
pip install FUploader[oss]      # 阿里云 OSS
pip install FUploader[s3]       # AWS S3 / MinIO
pip install FUploader[adapters] # 全部 OSS 后端

以下示例展示抖音作品(aweme)元数据写入管线的完整用法 — 上游通过 FileMessage 编码投递作品数据到 RabbitMQ,下游消费并按 aweme_id 落盘、打包、上传 COS:

import asyncio
from pathlib import Path

from file_uploader import (
    FileWriterConfig,
    FileWriterPipeline,
    OssProvider,
    OssUploader,
    RabbitMQSource,
    RedisStateStore,
    get_meta_writer_process_count,
    parse_file_message,
    tar_zstd_packer,
)


# 1. 业务策略函数
def douyin_message_parser(raw_body: bytes) -> dict:
    """解析上游通过 FileMessage 编码的消息(gzip + base64)。"""
    decoded = parse_file_message(raw_body)
    return {"aweme_id": decoded.file_name, **decoded.payload}


def douyin_file_name_generator(data: dict) -> str:
    """用作品 aweme_id 作为文件名。"""
    aweme_id = data.get("aweme_id", "unknown")
    return f"{aweme_id}.json"


def douyin_remote_key_generator(archive_name: str) -> str:
    """按日期分目录上传到 COS。"""
    from datetime import datetime
    date_str = datetime.now().strftime("%Y%m%d")
    return f"douyin/aweme/{date_str}/{archive_name}"


TASK_NAME = "douyin_aweme"


def build_pipeline() -> FileWriterPipeline:
    process_count = get_meta_writer_process_count()  # 读取 META_WRITER_PROCESS_COUNT,默认 1

    # 2. 配置
    config = FileWriterConfig.legacy_meta(
        task_name=TASK_NAME,
        process_count=process_count,
        pack_threshold=1000,
        storage_root=Path("/data/douyin_output"),
        save_timeout=5.0,
        packer_concurrency=4,
        packer_interval=2.0,
        packer_max_retries=3,
    )

    # 3. Redis 状态存储
    state_store = RedisStateStore(
        host="localhost", port=6379, db=0,
        key_style="legacy_meta",
        task_name=TASK_NAME,
        node_id=config.node_id,
        slot_count=config.slot_count,
    )

    # 4. RabbitMQ 消息源
    message_source = RabbitMQSource(
        host="localhost", port=5672,
        user="guest", password="guest",
        queue_name="douyin_aweme_meta_queue",
    )

    # 5. OSS 上传器(腾讯云 COS)
    object_store = OssUploader(
        provider=OssProvider.COS,
        bucket="douyin-data-1250000000",
        region="ap-guangzhou",
        endpoint="cos.ap-guangzhou.myqcloud.com",
        access_key_id="AKID...",
        access_key_secret="...",
    )

    # 6. 构建 Pipeline
    return (
        FileWriterPipeline
        .with_config(config)
        .with_state_store(state_store)
        .with_message_source(message_source)
        .with_object_store(object_store)
        .with_message_parser(douyin_message_parser)
        .with_file_name_generator(douyin_file_name_generator)
        .with_packer(tar_zstd_packer)
        .with_remote_key_generator(douyin_remote_key_generator)
    )


async def async_main() -> None:
    pipeline = build_pipeline()
    await pipeline.start()
    try:
        await pipeline.wait()
    finally:
        await pipeline.stop()


if __name__ == "__main__":
    asyncio.run(async_main())

配置参考

FileWriterConfig 所有字段:

字段 类型 默认值 说明
storage_root Path 必填 本地存储根目录
slot_count int 1 分片数(多进程写文件时避免文件锁竞争)
pack_threshold int 1000 每个目录最多文件数,触发封口(seal)
write_concurrency int 10 写文件 worker 协程数
packer_concurrency int 2 打包上传 worker 协程数
save_timeout float 30.0 单文件写入超时(秒)
packer_interval float 1.0 打包轮询间隔(秒)
packer_max_retries int 3 打包失败最大重试次数(0=不重试,直接丢弃)
batch_size int 100 批量 ACK 阈值
flush_interval float 5.0 时间窗口 flush(秒)
prefetch_count int 50 消息预取数
meta_writer_max_tasks_per_child int 1000 单个 worker 成功处理多少条后优雅轮转(0=禁用)
node_id str NODE_IDnode_id-{本机IP} 当前节点 ID
task_name str "" legacy Redis key 使用的任务名
worker_index int | None NODE_ID 解析 当前 worker 下标
slot_strategy str "hash" hashworker_index
storage_layout str "flat_slot" flat_slotlegacy_meta
ready_dir_format str "colon" colonlegacy_slot
resume_orphan_archives bool True 启动时是否恢复孤儿目录

环境变量:

变量 默认值 说明
META_WRITER_PROCESS_COUNT 1 supervisor 启动的 worker 进程数,同时也是 legacy 模式 slot 数
META_WRITER_MAX_TASKS_PER_CHILD 1000 可映射到 meta_writer_max_tasks_per_child,达到后 worker 优雅退出并由 supervisor 拉起

核心组件

FileWriterPipeline

Builder 模式的一站式入口,封装完整的生命周期管理:

方法 说明
.with_config(config) 注入 FileWriterConfig 配置
.with_state_store(store) 注入 StateStore 实现
.with_message_source(source) 注入 MessageSource 实现
.with_object_store(store) 注入 ObjectStore 实现
.with_message_parser(parser) 注入消息解析策略
.with_file_name_generator(gen) 注入文件名生成策略
.with_packer(packer) 注入打包策略
.with_remote_key_generator(gen) 注入远端 key 生成策略
.no_recovery() 禁用启动时残留恢复
await pipeline.start() 启动 Pipeline
await pipeline.wait() 阻塞直到收到 SIGINT / SIGTERM
await pipeline.stop() 优雅关闭

MetaWriter

文件写入服务,负责:

  1. 通过 MessageSource 消费消息
  2. 调用 MessageParser 解析为结构化数据
  3. 调用 FileNameGenerator 生成文件名
  4. 按文件名 hash 取模路由到对应 slot 目录
  5. 将数据序列化为 JSON 写入磁盘
  6. 原子递增 slot 计数并检查封口阈值
  7. 达到阈值时封口(seal),将目录加入打包队列

MetaPacker

打包上传服务,负责:

  1. 轮询待打包队列
  2. 通过分布式锁防止重复打包
  3. 调用 Packer 将目录打包为归档文件
  4. 调用 RemoteKeyGenerator 生成远端 key
  5. 通过 ObjectStore 上传到 OSS
  6. 上传成功后清理本地归档和源目录
  7. 打包失败时指数退避重试,全部失败后丢弃

ResidualRecovery

启动期间执行残留恢复:

  • 扫描 storage_root 下所有 slot_* 目录
  • 排除当前活跃目录(active_dir_id
  • 跳过空目录
  • 将孤儿目录重新加入待打包队列

自定义策略

所有策略都是普通函数(支持同步/异步),无需继承框架基类:

# 自定义消息解析器(异步版本)
async def my_parser(raw_body: bytes) -> dict:
    data = json.loads(raw_body)
    # 数据清洗、补全...
    return data

# 自定义打包器(可替换 tar.zstd)
async def my_packer(source_dir: Path) -> Path | None:
    """7z 压缩示例"""
    archive = source_dir.with_suffix(".7z")
    result = await run_7z(source_dir, archive)
    return archive if result else None

# 注入到 Pipeline
pipeline = (
    FileWriterPipeline
    .with_config(config)
    # ...其他配置...
    .with_message_parser(my_parser)
    .with_packer(my_packer)
)

适配器

RabbitMQSource

基于 aio-pika 的 RabbitMQ 消费实现:

source = RabbitMQSource(
    host="localhost", port=5672,
    vhost="/", user="guest", password="guest",
    queue_name="my_queue",
    prefetch_count=50,
    requeue_on_nack=True,
)

RedisStateStore

基于 redis.asyncio 的状态存储实现。使用 Lua 脚本保证阈值检查和分布式锁的原子性:

store = RedisStateStore(
    host="localhost", port=6379, db=0,
    password="",
    key_prefix="file_uploader:task1:",
    lock_ttl=300,
)
await store.connect()

OssUploader

支持三种 OSS 后端的统一上传接口,各后端依赖按需安装:

# 腾讯云 COS → pip install FUploader[cos]
uploader = OssUploader(
    provider=OssProvider.COS,
    bucket="my-bucket-1250000000",
    region="ap-guangzhou",
    endpoint="cos.ap-guangzhou.myqcloud.com",
    access_key_id="AKID...",
    access_key_secret="...",
)

# 阿里云 OSS → pip install FUploader[oss]
uploader = OssUploader(
    provider=OssProvider.ALIYUN_OSS,
    bucket="my-bucket",
    endpoint="oss-cn-hangzhou.aliyuncs.com",
    access_key_id="LTAI...",
    access_key_secret="...",
)

# AWS S3 / MinIO → pip install FUploader[s3]
uploader = OssUploader(
    provider=OssProvider.S3,
    bucket="my-bucket",
    region="us-east-1",
    endpoint="https://s3.amazonaws.com",
    access_key_id="AKIA...",
    access_key_secret="...",
)

文件消息编码

SDK 内置一套「写入前处理」的消息编码格式,支持将任意 payload 经 gzip 压缩 + base64 编码后投递到消息队列,由 MetaWriter 消费时自动解码。适用于需要在上游提前准备消息内容的场景。

编码(生产者侧)

from file_uploader import (
    FileMessage,
    FileMessagePublisher,
    encode_file_message,
    publish_file_messages,
)

# 方式 1:直接编码单条消息
envelope = encode_file_message(FileMessage(file_name="data_001", payload={"key": "value"}))
# → {"file_name": "data_001.json", "compression": "gzip", "content_type": "json", "payload": "..."}

# 方式 2:批量发布到消息队列
async def my_publish(body: str) -> None:
    await channel.default_exchange.publish(aio_pika.Message(body.encode()), routing_key="queue")

await publish_file_messages(
    [FileMessage(file_name="a", payload={"x": 1}), FileMessage(file_name="b", payload="hello")],
    publish=my_publish,
)

解码(消费者侧)

from file_uploader import parse_file_message, file_message_name

decoded = parse_file_message(raw_bytes)   # → DecodedFilePayload(file_name="data_001.json", payload={...}, content_type="json")
file_name = file_message_name(decoded)    # 等价于 decoded.file_name

作为 MessageParser 注入

from file_uploader import parse_file_message

def parser(raw_body: bytes) -> dict:
    decoded = parse_file_message(raw_body)
    return {"file_name": decoded.file_name, "payload": decoded.payload}

独立使用核心服务

MetaWriterMetaPacker 均可脱离 Pipeline 独立使用,直接构造并调用 start() / stop()

writer = MetaWriter(
    config=config,
    state_store=store,
    message_source=source,
    message_parser=parser,
    file_name_generator=name_gen,
)
await writer.start()
# ... 消费消息 ...
await writer.stop()

项目结构

src/file_uploader/
├── __init__.py              # 公开 API 导出
├── config.py                # FileWriterConfig 配置类
├── layout.py                # 存储目录布局辅助函数
├── messages.py              # 文件消息编码/解码 (FileMessage, parse_file_message, etc.)
├── pipeline.py              # FileWriterPipeline 入口 + Builder + run_pipeline_supervised
├── runtime.py               # 运行时环境变量读取 (NODE_ID, META_WRITER_PROCESS_COUNT, etc.)
├── interfaces/              # 抽象接口层
│   ├── message_source.py    #   MessageSource 抽象
│   ├── object_store.py      #   ObjectStore 抽象
│   ├── state_store.py       #   StateStore 抽象 + SlotRuntimeState
│   └── strategies.py        #   MessageParser / FileNameGenerator / Packer / RemoteKeyGenerator 类型
├── adapters/                # 适配器实现
│   ├── rabbitmq_source.py   #   RabbitMQSource
│   ├── redis_state_store.py #   RedisStateStore
│   └── oss_uploader.py      #   OssUploader (COS / OSS / S3)
├── packers/                 # 打包策略
│   └── tar_zstd.py          #   tar + zstd 打包器
├── services/                # 核心服务
│   ├── meta_writer.py       #   MetaWriter 写入服务
│   └── meta_packer.py       #   MetaPacker 打包上传服务
└── recovery/                # 残留恢复
    └── residual.py          #   ResidualRecovery

公开 API 总览

类别 名称 说明
配置 FileWriterConfig Pipeline 配置 dataclass
接口 MessageSource 消息源抽象基类
接口 StateStore 状态存储抽象基类
接口 ObjectStore 对象存储抽象基类
接口 SlotRuntimeState slot 运行时状态 dataclass
策略类型 MessageParser (bytes) -> dict(支持同步/异步)
策略类型 FileNameGenerator (dict) -> str
策略类型 Packer (Path) -> Path | None
策略类型 RemoteKeyGenerator (str) -> str
适配器 RabbitMQSource RabbitMQ 消费实现
适配器 RedisStateStore Redis 状态存储实现
适配器 OssUploader 多后端 OSS 上传器
适配器 OssProvider OSS 后端枚举 (COS / ALIYUN_OSS / S3)
打包器 tar_zstd_packer tar.zstd 打包函数
消息编码 FileMessage 原始文件消息 dataclass
消息编码 DecodedFilePayload 解码后的文件载荷 dataclass
消息编码 FileMessagePublisher 批量消息发布器
消息编码 encode_file_message 编码单条消息为队列 envelope
消息编码 parse_file_message 解码队列 envelope
消息编码 file_message_name 从 decoded payload 提取文件名
消息编码 serialize_file_payload 序列化 payload 为 bytes
消息编码 publish_file_messages 批量编码并发布
服务 MetaWriter 文件写入服务
服务 MetaPacker 打包上传服务
恢复 ResidualRecovery 残留文件恢复
Pipeline FileWriterPipeline Builder 模式 Pipeline 入口
Pipeline run_pipeline_once 构建并运行一次 Pipeline
Pipeline run_pipeline_supervised Supervisor 模式守护 Pipeline 子进程
工具 get_node_id 获取当前节点 ID
工具 get_meta_writer_process_count 读取 META_WRITER_PROCESS_COUNT 环境变量
工具 get_meta_writer_max_tasks_per_child 读取 META_WRITER_MAX_TASKS_PER_CHILD 环境变量

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

fuploader-1.0.1.tar.gz (41.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

fuploader-1.0.1-py3-none-any.whl (42.9 kB view details)

Uploaded Python 3

File details

Details for the file fuploader-1.0.1.tar.gz.

File metadata

  • Download URL: fuploader-1.0.1.tar.gz
  • Upload date:
  • Size: 41.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.0

File hashes

Hashes for fuploader-1.0.1.tar.gz
Algorithm Hash digest
SHA256 af20a5a215cf2d4944e8c816d8f15acaa3b60bb84fd9a38e4feb61c9c74f1fdd
MD5 d2aa82902672b52d4d380bf283d3a007
BLAKE2b-256 531be058520416bc5960b27735385aa88ee34a060f3e29ebd5ea06559a686fd7

See more details on using hashes here.

File details

Details for the file fuploader-1.0.1-py3-none-any.whl.

File metadata

  • Download URL: fuploader-1.0.1-py3-none-any.whl
  • Upload date:
  • Size: 42.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.0

File hashes

Hashes for fuploader-1.0.1-py3-none-any.whl
Algorithm Hash digest
SHA256 f002f694fd95382ad9e64a2bd69a0d4e5fa266464558eb6b5762f6409e1f9ad0
MD5 fc4c918433af85ffbe6fcd28016ec1d1
BLAKE2b-256 e3c1684953e62b483894691a246d947013ae3d2d14c9379cff2436f5c317286e

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page