通用文件写入、打包、OSS 上传管线 SDK
Project description
FUploader
通用的「消费消息 → 写本地文件 → 打包归档 → 上传 OSS」管线 SDK。
核心流程
MessageSource MetaWriter MetaPacker
──────────── ────────────── ─────────────────
RabbitMQ ──► 解析消息 ──► 写盘到 slot ──► 封口 ──► 打包 .tar.zst ──► 上传 OSS ──► 清理本地
│
▼
Redis (StateStore)
· slot 计数 / 封口阈值
· 待打包队列
· 分布式锁
架构设计
三层抽象接口
SDK 通过抽象接口解耦外部依赖,方便替换不同中间件:
| 接口 | 职责 | 内置实现 |
|---|---|---|
MessageSource |
消息消费、ACK/NACK | RabbitMQSource (aio-pika) |
StateStore |
slot 状态、队列、分布式锁 | RedisStateStore (redis-py async) |
ObjectStore |
文件上传、存在性检查 | OssUploader (COS / 阿里云 OSS / S3) |
四种策略注入
业务逻辑通过 Callable 注入,无需继承任何类:
| 策略 | 签名 | 作用 |
|---|---|---|
MessageParser |
(bytes) -> dict |
将原始消息字节解析为结构化数据 |
FileNameGenerator |
(dict) -> str |
根据数据生成写入文件名 |
Packer |
(Path) -> Path | None |
将目录打包为归档文件 |
RemoteKeyGenerator |
(str) -> str |
根据归档文件名生成远端 OSS key |
Slot 分片机制
- 按文件名 hash 取模分配到不同 slot 目录(
slot_0_0、slot_1_0...) - 每个 slot 有独立的
active_dir_id和active_count - 支持多进程并行写入同一存储目录,互不冲突
封口(Seal)机制
- 每个 slot 目录内文件数达到
pack_threshold时自动触发封口 - 当前目录被标记为 ready,加入打包队列;同时创建新序号目录(如
slot_0_0→slot_0_1) - 阈值检查通过 Redis Lua 脚本保证原子性
残留恢复
启动时自动扫描 storage_root,找出非当前活跃的 slot_* 目录,重新加入待打包队列,防止进程崩溃后数据丢失。
安装
# 基础安装
pip install FUploader
# 按需安装 OSS 后端
pip install FUploader[cos] # 腾讯云 COS
pip install FUploader[oss] # 阿里云 OSS
pip install FUploader[s3] # AWS S3 / MinIO
pip install FUploader[adapters] # 全部 OSS 后端
快速开始
以下示例展示抖音评论数据写入管线的完整用法(源码见 examples/douyin_writer.py):
import asyncio
import json
from pathlib import Path
from file_uploader import (
FileWriterConfig,
FileWriterPipeline,
OssProvider,
OssUploader,
RabbitMQSource,
RedisStateStore,
tar_zstd_packer,
)
# 1. 业务策略函数
def douyin_message_parser(raw_body: bytes) -> dict:
return json.loads(raw_body.decode("utf-8"))
def douyin_file_name_generator(data: dict) -> str:
comment_id = data.get("cid", "unknown")
return f"comment_{comment_id}.json"
def douyin_remote_key_generator(archive_name: str) -> str:
from datetime import datetime
date_str = datetime.now().strftime("%Y%m%d")
return f"douyin/comments/{date_str}/{archive_name}"
async def main():
# 2. 配置
config = FileWriterConfig(
slot_count=16,
pack_threshold=1000,
storage_root=Path("/data/douyin_output"),
save_timeout=5.0,
packer_concurrency=4,
packer_interval=2.0,
packer_max_retries=3,
)
# 3. Redis 状态存储
state_store = RedisStateStore(
host="localhost", port=6379, db=0,
key_prefix="file_uploader:douyin:",
)
await state_store.connect()
# 4. RabbitMQ 消息源
message_source = RabbitMQSource(
host="localhost", port=5672,
user="guest", password="guest",
queue_name="douyin_comment_queue",
)
# 5. OSS 上传器(腾讯云 COS)
object_store = OssUploader(
provider=OssProvider.COS,
bucket="douyin-data-1250000000",
region="ap-guangzhou",
endpoint="cos.ap-guangzhou.myqcloud.com",
access_key_id="AKID...",
access_key_secret="...",
)
# 6. 构建 Pipeline
pipeline = (
FileWriterPipeline
.with_config(config)
.with_state_store(state_store)
.with_message_source(message_source)
.with_object_store(object_store)
.with_message_parser(douyin_message_parser)
.with_file_name_generator(douyin_file_name_generator)
.with_packer(tar_zstd_packer)
.with_remote_key_generator(douyin_remote_key_generator)
)
# 7. 启动 & 等待终止信号
await pipeline.start()
await pipeline.wait() # 阻塞直到 SIGINT / SIGTERM
await pipeline.stop()
if __name__ == "__main__":
asyncio.run(main())
配置参考
FileWriterConfig 所有字段:
| 字段 | 类型 | 默认值 | 说明 |
|---|---|---|---|
storage_root |
Path |
必填 | 本地存储根目录 |
slot_count |
int |
1 |
分片数(多进程写文件时避免文件锁竞争) |
pack_threshold |
int |
1000 |
每个目录最多文件数,触发封口(seal) |
write_concurrency |
int |
10 |
写文件 worker 协程数 |
packer_concurrency |
int |
2 |
打包上传 worker 协程数 |
save_timeout |
float |
30.0 |
单文件写入超时(秒) |
packer_interval |
float |
1.0 |
打包轮询间隔(秒) |
packer_max_retries |
int |
3 |
打包失败最大重试次数(0=不重试,直接丢弃) |
batch_size |
int |
100 |
批量 ACK 阈值 |
flush_interval |
float |
5.0 |
时间窗口 flush(秒) |
prefetch_count |
int |
50 |
消息预取数 |
resume_orphan_archives |
bool |
True |
启动时是否恢复孤儿目录 |
核心组件
FileWriterPipeline
Builder 模式的一站式入口,封装完整的生命周期管理:
| 方法 | 说明 |
|---|---|
.with_config(config) |
注入 FileWriterConfig 配置 |
.with_state_store(store) |
注入 StateStore 实现 |
.with_message_source(source) |
注入 MessageSource 实现 |
.with_object_store(store) |
注入 ObjectStore 实现 |
.with_message_parser(parser) |
注入消息解析策略 |
.with_file_name_generator(gen) |
注入文件名生成策略 |
.with_packer(packer) |
注入打包策略 |
.with_remote_key_generator(gen) |
注入远端 key 生成策略 |
.no_recovery() |
禁用启动时残留恢复 |
await pipeline.start() |
启动 Pipeline |
await pipeline.wait() |
阻塞直到收到 SIGINT / SIGTERM |
await pipeline.stop() |
优雅关闭 |
MetaWriter
文件写入服务,负责:
- 通过
MessageSource消费消息 - 调用
MessageParser解析为结构化数据 - 调用
FileNameGenerator生成文件名 - 按文件名 hash 取模路由到对应 slot 目录
- 将数据序列化为 JSON 写入磁盘
- 原子递增 slot 计数并检查封口阈值
- 达到阈值时封口(seal),将目录加入打包队列
MetaPacker
打包上传服务,负责:
- 轮询待打包队列
- 通过分布式锁防止重复打包
- 调用
Packer将目录打包为归档文件 - 调用
RemoteKeyGenerator生成远端 key - 通过
ObjectStore上传到 OSS - 上传成功后清理本地归档和源目录
- 打包失败时指数退避重试,全部失败后丢弃
ResidualRecovery
启动期间执行残留恢复:
- 扫描
storage_root下所有slot_*目录 - 排除当前活跃目录(
active_dir_id) - 跳过空目录
- 将孤儿目录重新加入待打包队列
自定义策略
所有策略都是普通函数(支持同步/异步),无需继承框架基类:
# 自定义消息解析器(异步版本)
async def my_parser(raw_body: bytes) -> dict:
data = json.loads(raw_body)
# 数据清洗、补全...
return data
# 自定义打包器(可替换 tar.zstd)
async def my_packer(source_dir: Path) -> Path | None:
"""7z 压缩示例"""
archive = source_dir.with_suffix(".7z")
result = await run_7z(source_dir, archive)
return archive if result else None
# 注入到 Pipeline
pipeline = (
FileWriterPipeline
.with_config(config)
# ...其他配置...
.with_message_parser(my_parser)
.with_packer(my_packer)
)
适配器
RabbitMQSource
基于 aio-pika 的 RabbitMQ 消费实现:
source = RabbitMQSource(
host="localhost", port=5672,
vhost="/", user="guest", password="guest",
queue_name="my_queue",
prefetch_count=50,
requeue_on_nack=True,
)
RedisStateStore
基于 redis.asyncio 的状态存储实现。使用 Lua 脚本保证阈值检查和分布式锁的原子性:
store = RedisStateStore(
host="localhost", port=6379, db=0,
password="",
key_prefix="file_uploader:task1:",
lock_ttl=300,
)
await store.connect()
OssUploader
支持三种 OSS 后端的统一上传接口,各后端依赖按需安装:
# 腾讯云 COS → pip install FUploader[cos]
uploader = OssUploader(
provider=OssProvider.COS,
bucket="my-bucket-1250000000",
region="ap-guangzhou",
endpoint="cos.ap-guangzhou.myqcloud.com",
access_key_id="AKID...",
access_key_secret="...",
)
# 阿里云 OSS → pip install FUploader[oss]
uploader = OssUploader(
provider=OssProvider.ALIYUN_OSS,
bucket="my-bucket",
endpoint="oss-cn-hangzhou.aliyuncs.com",
access_key_id="LTAI...",
access_key_secret="...",
)
# AWS S3 / MinIO → pip install FUploader[s3]
uploader = OssUploader(
provider=OssProvider.S3,
bucket="my-bucket",
region="us-east-1",
endpoint="https://s3.amazonaws.com",
access_key_id="AKIA...",
access_key_secret="...",
)
项目结构
src/file_uploader/
├── __init__.py # 公开 API 导出
├── config.py # FileWriterConfig 配置类
├── pipeline.py # FileWriterPipeline 入口 + Builder
├── interfaces/ # 抽象接口层
│ ├── message_source.py # MessageSource 抽象
│ ├── object_store.py # ObjectStore 抽象
│ ├── state_store.py # StateStore 抽象 + SlotRuntimeState
│ └── strategies.py # MessageParser / FileNameGenerator / Packer / RemoteKeyGenerator 类型
├── adapters/ # 适配器实现
│ ├── rabbitmq_source.py # RabbitMQSource
│ ├── redis_state_store.py # RedisStateStore
│ └── oss_uploader.py # OssUploader (COS / OSS / S3)
├── packers/ # 打包策略
│ └── tar_zstd.py # tar + zstd 打包器
├── services/ # 核心服务
│ ├── meta_writer.py # MetaWriter 写入服务
│ └── meta_packer.py # MetaPacker 打包上传服务
└── recovery/ # 残留恢复
└── residual.py # ResidualRecovery
License
MIT
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file fuploader-1.0.0.tar.gz.
File metadata
- Download URL: fuploader-1.0.0.tar.gz
- Upload date:
- Size: 26.9 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.0
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
507a6ab4a4e324437d693fa5a558e003e975ab6c9fecdf0c51b5001783db6edd
|
|
| MD5 |
4915b17451e4a2784115b227cb4d7702
|
|
| BLAKE2b-256 |
2a734a9a301f9dd28ae76f0d3e53abc5f91feb0be2c2c889784c184a509f8f92
|
File details
Details for the file fuploader-1.0.0-py3-none-any.whl.
File metadata
- Download URL: fuploader-1.0.0-py3-none-any.whl
- Upload date:
- Size: 30.7 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.0
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
e2bec6ca8f6e7c44ade403146acad478ebf99611a713a58341580ca157b1897c
|
|
| MD5 |
fbd52a8def58df366f0e7cb7d16dc549
|
|
| BLAKE2b-256 |
f3ef266368a037f0a701235708fd15e2c8e755d2d2614194111f26d7db7f6e4a
|