通用文件写入、打包、OSS 上传管线 SDK
Project description
FUploader
通用的「消费消息 → 写本地文件 → 打包归档 → 上传 OSS」管线 SDK。
核心流程
MessageSource MetaWriter MetaPacker
──────────── ────────────── ─────────────────
RabbitMQ ──► 解析消息 ──► 写盘到 slot ──► 封口 ──► 打包 .tar.zst ──► 上传 OSS ──► 清理本地
│
▼
Redis (StateStore)
· slot 计数 / 封口阈值
· 待打包队列
· 分布式锁
架构设计
三层抽象接口
SDK 通过抽象接口解耦外部依赖,方便替换不同中间件:
| 接口 | 职责 | 内置实现 |
|---|---|---|
MessageSource |
消息消费、ACK/NACK | RabbitMQSource (aio-pika) |
StateStore |
slot 状态、队列、分布式锁 | RedisStateStore (redis-py async) |
ObjectStore |
文件上传、存在性检查 | OssUploader (COS / 阿里云 OSS / S3) |
四种策略注入
业务逻辑通过 Callable 注入,无需继承任何类:
| 策略 | 签名 | 作用 |
|---|---|---|
MessageParser |
(bytes) -> dict |
将原始消息字节解析为结构化数据 |
FileNameGenerator |
(dict) -> str |
根据数据生成写入文件名 |
Packer |
(Path) -> Path | None |
将目录打包为归档文件 |
RemoteKeyGenerator |
(str) -> str |
根据归档文件名生成远端 OSS key |
Slot 分片机制
- 按文件名 hash 取模分配到不同 slot 目录(
slot_0_0、slot_1_0...) - 每个 slot 有独立的
active_dir_id和active_count - 支持多进程并行写入同一存储目录,互不冲突
Legacy 高并发模式
迁移原 main_meta_writer.py 场景时,推荐使用 legacy 模式:
- 主进程作为 supervisor,子进程作为 worker 执行业务。
META_WRITER_PROCESS_COUNT控制 worker 进程数。- 子进程
NODE_ID形如node_id-{本机IP}-meta-p0、node_id-{本机IP}-meta-p1。 - 每个 worker 固定写自己的 slot,避免共享写锁竞争。
- Redis key 对齐
{task_name}:meta:{node_id-{本机IP}}:*。 - 本地目录对齐
active/slot-0/dir-000001与sealed/slot-0/dir-000001。
封口(Seal)机制
- 每个 slot 目录内文件数达到
pack_threshold时自动触发封口 - 当前目录被标记为 ready,加入打包队列;同时创建新序号目录(如
slot_0_0→slot_0_1) - 阈值检查通过 Redis Lua 脚本保证原子性
残留恢复
启动时自动扫描 storage_root,找出非当前活跃的 slot_* 目录,重新加入待打包队列,防止进程崩溃后数据丢失。
安装
# 基础安装
pip install FUploader
# 按需安装 OSS 后端
pip install FUploader[cos] # 腾讯云 COS
pip install FUploader[oss] # 阿里云 OSS
pip install FUploader[s3] # AWS S3 / MinIO
pip install FUploader[adapters] # 全部 OSS 后端
以下示例展示抖音作品(aweme)元数据写入管线的完整用法 — 上游通过 FileMessage 编码投递作品数据到 RabbitMQ,下游消费并按 aweme_id 落盘、打包、上传 COS:
import asyncio
from pathlib import Path
from file_uploader import (
FileWriterConfig,
FileWriterPipeline,
OssProvider,
OssUploader,
RabbitMQSource,
RedisStateStore,
get_meta_writer_process_count,
parse_file_message,
tar_zstd_packer,
)
# 1. 业务策略函数
def douyin_message_parser(raw_body: bytes) -> dict:
"""解析上游通过 FileMessage 编码的消息(gzip + base64)。"""
decoded = parse_file_message(raw_body)
return {"aweme_id": decoded.file_name, **decoded.payload}
def douyin_file_name_generator(data: dict) -> str:
"""用作品 aweme_id 作为文件名。"""
aweme_id = data.get("aweme_id", "unknown")
return f"{aweme_id}.json"
def douyin_remote_key_generator(archive_name: str) -> str:
"""按日期分目录上传到 COS。"""
from datetime import datetime
date_str = datetime.now().strftime("%Y%m%d")
return f"douyin/aweme/{date_str}/{archive_name}"
TASK_NAME = "douyin_aweme"
def build_pipeline() -> FileWriterPipeline:
process_count = get_meta_writer_process_count() # 读取 META_WRITER_PROCESS_COUNT,默认 1
# 2. 配置
config = FileWriterConfig.legacy_meta(
task_name=TASK_NAME,
process_count=process_count,
pack_threshold=1000,
storage_root=Path("/data/douyin_output"),
save_timeout=5.0,
packer_concurrency=4,
packer_interval=2.0,
packer_max_retries=3,
)
# 3. Redis 状态存储
state_store = RedisStateStore(
host="localhost", port=6379, db=0,
key_style="legacy_meta",
task_name=TASK_NAME,
node_id=config.node_id,
slot_count=config.slot_count,
)
# 4. RabbitMQ 消息源
message_source = RabbitMQSource(
host="localhost", port=5672,
user="guest", password="guest",
queue_name="douyin_aweme_meta_queue",
)
# 5. OSS 上传器(腾讯云 COS)
object_store = OssUploader(
provider=OssProvider.COS,
bucket="douyin-data-1250000000",
region="ap-guangzhou",
endpoint="cos.ap-guangzhou.myqcloud.com",
access_key_id="AKID...",
access_key_secret="...",
)
# 6. 构建 Pipeline
return (
FileWriterPipeline
.with_config(config)
.with_state_store(state_store)
.with_message_source(message_source)
.with_object_store(object_store)
.with_message_parser(douyin_message_parser)
.with_file_name_generator(douyin_file_name_generator)
.with_packer(tar_zstd_packer)
.with_remote_key_generator(douyin_remote_key_generator)
)
async def async_main() -> None:
pipeline = build_pipeline()
await pipeline.start()
try:
await pipeline.wait()
finally:
await pipeline.stop()
if __name__ == "__main__":
asyncio.run(async_main())
配置参考
FileWriterConfig 所有字段:
| 字段 | 类型 | 默认值 | 说明 |
|---|---|---|---|
storage_root |
Path |
必填 | 本地存储根目录 |
slot_count |
int |
1 |
分片数(多进程写文件时避免文件锁竞争) |
pack_threshold |
int |
1000 |
每个目录最多文件数,触发封口(seal) |
write_concurrency |
int |
10 |
写文件 worker 协程数 |
packer_concurrency |
int |
2 |
打包上传 worker 协程数 |
save_timeout |
float |
30.0 |
单文件写入超时(秒) |
packer_interval |
float |
1.0 |
打包轮询间隔(秒) |
packer_max_retries |
int |
3 |
打包失败最大重试次数(0=不重试,直接丢弃) |
batch_size |
int |
100 |
批量 ACK 阈值 |
flush_interval |
float |
5.0 |
时间窗口 flush(秒) |
prefetch_count |
int |
50 |
消息预取数 |
meta_writer_max_tasks_per_child |
int |
1000 |
单个 worker 成功处理多少条后优雅轮转(0=禁用) |
node_id |
str |
NODE_ID 或 node_id-{本机IP} |
当前节点 ID |
task_name |
str |
"" |
legacy Redis key 使用的任务名 |
worker_index |
int | None |
从 NODE_ID 解析 |
当前 worker 下标 |
slot_strategy |
str |
"hash" |
hash 或 worker_index |
storage_layout |
str |
"flat_slot" |
flat_slot 或 legacy_meta |
ready_dir_format |
str |
"colon" |
colon 或 legacy_slot |
resume_orphan_archives |
bool |
True |
启动时是否恢复孤儿目录 |
环境变量:
| 变量 | 默认值 | 说明 |
|---|---|---|
META_WRITER_PROCESS_COUNT |
1 |
supervisor 启动的 worker 进程数,同时也是 legacy 模式 slot 数 |
META_WRITER_MAX_TASKS_PER_CHILD |
1000 |
可映射到 meta_writer_max_tasks_per_child,达到后 worker 优雅退出并由 supervisor 拉起 |
核心组件
FileWriterPipeline
Builder 模式的一站式入口,封装完整的生命周期管理:
| 方法 | 说明 |
|---|---|
.with_config(config) |
注入 FileWriterConfig 配置 |
.with_state_store(store) |
注入 StateStore 实现 |
.with_message_source(source) |
注入 MessageSource 实现 |
.with_object_store(store) |
注入 ObjectStore 实现 |
.with_message_parser(parser) |
注入消息解析策略 |
.with_file_name_generator(gen) |
注入文件名生成策略 |
.with_packer(packer) |
注入打包策略 |
.with_remote_key_generator(gen) |
注入远端 key 生成策略 |
.no_recovery() |
禁用启动时残留恢复 |
await pipeline.start() |
启动 Pipeline |
await pipeline.wait() |
阻塞直到收到 SIGINT / SIGTERM |
await pipeline.stop() |
优雅关闭 |
MetaWriter
文件写入服务,负责:
- 通过
MessageSource消费消息 - 调用
MessageParser解析为结构化数据 - 调用
FileNameGenerator生成文件名 - 按文件名 hash 取模路由到对应 slot 目录
- 将数据序列化为 JSON 写入磁盘
- 原子递增 slot 计数并检查封口阈值
- 达到阈值时封口(seal),将目录加入打包队列
MetaPacker
打包上传服务,负责:
- 轮询待打包队列
- 通过分布式锁防止重复打包
- 调用
Packer将目录打包为归档文件 - 调用
RemoteKeyGenerator生成远端 key - 通过
ObjectStore上传到 OSS - 上传成功后清理本地归档和源目录
- 打包失败时指数退避重试,全部失败后丢弃
ResidualRecovery
启动期间执行残留恢复:
- 扫描
storage_root下所有slot_*目录 - 排除当前活跃目录(
active_dir_id) - 跳过空目录
- 将孤儿目录重新加入待打包队列
自定义策略
所有策略都是普通函数(支持同步/异步),无需继承框架基类:
# 自定义消息解析器(异步版本)
async def my_parser(raw_body: bytes) -> dict:
data = json.loads(raw_body)
# 数据清洗、补全...
return data
# 自定义打包器(可替换 tar.zstd)
async def my_packer(source_dir: Path) -> Path | None:
"""7z 压缩示例"""
archive = source_dir.with_suffix(".7z")
result = await run_7z(source_dir, archive)
return archive if result else None
# 注入到 Pipeline
pipeline = (
FileWriterPipeline
.with_config(config)
# ...其他配置...
.with_message_parser(my_parser)
.with_packer(my_packer)
)
适配器
RabbitMQSource
基于 aio-pika 的 RabbitMQ 消费实现:
source = RabbitMQSource(
host="localhost", port=5672,
vhost="/", user="guest", password="guest",
queue_name="my_queue",
prefetch_count=50,
requeue_on_nack=True,
)
RedisStateStore
基于 redis.asyncio 的状态存储实现。使用 Lua 脚本保证阈值检查和分布式锁的原子性:
store = RedisStateStore(
host="localhost", port=6379, db=0,
password="",
key_prefix="file_uploader:task1:",
lock_ttl=300,
)
await store.connect()
OssUploader
支持三种 OSS 后端的统一上传接口,各后端依赖按需安装:
# 腾讯云 COS → pip install FUploader[cos]
uploader = OssUploader(
provider=OssProvider.COS,
bucket="my-bucket-1250000000",
region="ap-guangzhou",
endpoint="cos.ap-guangzhou.myqcloud.com",
access_key_id="AKID...",
access_key_secret="...",
)
# 阿里云 OSS → pip install FUploader[oss]
uploader = OssUploader(
provider=OssProvider.ALIYUN_OSS,
bucket="my-bucket",
endpoint="oss-cn-hangzhou.aliyuncs.com",
access_key_id="LTAI...",
access_key_secret="...",
)
# AWS S3 / MinIO → pip install FUploader[s3]
uploader = OssUploader(
provider=OssProvider.S3,
bucket="my-bucket",
region="us-east-1",
endpoint="https://s3.amazonaws.com",
access_key_id="AKIA...",
access_key_secret="...",
)
文件消息编码
SDK 内置一套「写入前处理」的消息编码格式,支持将任意 payload 经 gzip 压缩 + base64 编码后投递到消息队列,由 MetaWriter 消费时自动解码。适用于需要在上游提前准备消息内容的场景。
编码(生产者侧)
from file_uploader import (
FileMessage,
FileMessagePublisher,
encode_file_message,
publish_file_messages,
)
# 方式 1:直接编码单条消息
envelope = encode_file_message(FileMessage(file_name="data_001", payload={"key": "value"}))
# → {"file_name": "data_001.json", "compression": "gzip", "content_type": "json", "payload": "..."}
# 方式 2:批量发布到消息队列
async def my_publish(body: str) -> None:
await channel.default_exchange.publish(aio_pika.Message(body.encode()), routing_key="queue")
await publish_file_messages(
[FileMessage(file_name="a", payload={"x": 1}), FileMessage(file_name="b", payload="hello")],
publish=my_publish,
)
解码(消费者侧)
from file_uploader import parse_file_message, file_message_name
decoded = parse_file_message(raw_bytes) # → DecodedFilePayload(file_name="data_001.json", payload={...}, content_type="json")
file_name = file_message_name(decoded) # 等价于 decoded.file_name
作为 MessageParser 注入
from file_uploader import parse_file_message
def parser(raw_body: bytes) -> dict:
decoded = parse_file_message(raw_body)
return {"file_name": decoded.file_name, "payload": decoded.payload}
独立使用核心服务
MetaWriter 和 MetaPacker 均可脱离 Pipeline 独立使用,直接构造并调用 start() / stop():
writer = MetaWriter(
config=config,
state_store=store,
message_source=source,
message_parser=parser,
file_name_generator=name_gen,
)
await writer.start()
# ... 消费消息 ...
await writer.stop()
项目结构
src/file_uploader/
├── __init__.py # 公开 API 导出
├── config.py # FileWriterConfig 配置类
├── layout.py # 存储目录布局辅助函数
├── messages.py # 文件消息编码/解码 (FileMessage, parse_file_message, etc.)
├── pipeline.py # FileWriterPipeline 入口 + Builder + run_pipeline_supervised
├── runtime.py # 运行时环境变量读取 (NODE_ID, META_WRITER_PROCESS_COUNT, etc.)
├── interfaces/ # 抽象接口层
│ ├── message_source.py # MessageSource 抽象
│ ├── object_store.py # ObjectStore 抽象
│ ├── state_store.py # StateStore 抽象 + SlotRuntimeState
│ └── strategies.py # MessageParser / FileNameGenerator / Packer / RemoteKeyGenerator 类型
├── adapters/ # 适配器实现
│ ├── rabbitmq_source.py # RabbitMQSource
│ ├── redis_state_store.py # RedisStateStore
│ └── oss_uploader.py # OssUploader (COS / OSS / S3)
├── packers/ # 打包策略
│ └── tar_zstd.py # tar + zstd 打包器
├── services/ # 核心服务
│ ├── meta_writer.py # MetaWriter 写入服务
│ └── meta_packer.py # MetaPacker 打包上传服务
└── recovery/ # 残留恢复
└── residual.py # ResidualRecovery
公开 API 总览
| 类别 | 名称 | 说明 |
|---|---|---|
| 配置 | FileWriterConfig |
Pipeline 配置 dataclass |
| 接口 | MessageSource |
消息源抽象基类 |
| 接口 | StateStore |
状态存储抽象基类 |
| 接口 | ObjectStore |
对象存储抽象基类 |
| 接口 | SlotRuntimeState |
slot 运行时状态 dataclass |
| 策略类型 | MessageParser |
(bytes) -> dict(支持同步/异步) |
| 策略类型 | FileNameGenerator |
(dict) -> str |
| 策略类型 | Packer |
(Path) -> Path | None |
| 策略类型 | RemoteKeyGenerator |
(str) -> str |
| 适配器 | RabbitMQSource |
RabbitMQ 消费实现 |
| 适配器 | RedisStateStore |
Redis 状态存储实现 |
| 适配器 | OssUploader |
多后端 OSS 上传器 |
| 适配器 | OssProvider |
OSS 后端枚举 (COS / ALIYUN_OSS / S3) |
| 打包器 | tar_zstd_packer |
tar.zstd 打包函数 |
| 消息编码 | FileMessage |
原始文件消息 dataclass |
| 消息编码 | DecodedFilePayload |
解码后的文件载荷 dataclass |
| 消息编码 | FileMessagePublisher |
批量消息发布器 |
| 消息编码 | encode_file_message |
编码单条消息为队列 envelope |
| 消息编码 | parse_file_message |
解码队列 envelope |
| 消息编码 | file_message_name |
从 decoded payload 提取文件名 |
| 消息编码 | serialize_file_payload |
序列化 payload 为 bytes |
| 消息编码 | publish_file_messages |
批量编码并发布 |
| 服务 | MetaWriter |
文件写入服务 |
| 服务 | MetaPacker |
打包上传服务 |
| 恢复 | ResidualRecovery |
残留文件恢复 |
| Pipeline | FileWriterPipeline |
Builder 模式 Pipeline 入口 |
| Pipeline | run_pipeline_once |
构建并运行一次 Pipeline |
| Pipeline | run_pipeline_supervised |
Supervisor 模式守护 Pipeline 子进程 |
| 工具 | get_node_id |
获取当前节点 ID |
| 工具 | get_meta_writer_process_count |
读取 META_WRITER_PROCESS_COUNT 环境变量 |
| 工具 | get_meta_writer_max_tasks_per_child |
读取 META_WRITER_MAX_TASKS_PER_CHILD 环境变量 |
License
MIT
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file fuploader-1.0.1.tar.gz.
File metadata
- Download URL: fuploader-1.0.1.tar.gz
- Upload date:
- Size: 41.1 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.0
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
af20a5a215cf2d4944e8c816d8f15acaa3b60bb84fd9a38e4feb61c9c74f1fdd
|
|
| MD5 |
d2aa82902672b52d4d380bf283d3a007
|
|
| BLAKE2b-256 |
531be058520416bc5960b27735385aa88ee34a060f3e29ebd5ea06559a686fd7
|
File details
Details for the file fuploader-1.0.1-py3-none-any.whl.
File metadata
- Download URL: fuploader-1.0.1-py3-none-any.whl
- Upload date:
- Size: 42.9 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.0
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
f002f694fd95382ad9e64a2bd69a0d4e5fa266464558eb6b5762f6409e1f9ad0
|
|
| MD5 |
fc4c918433af85ffbe6fcd28016ec1d1
|
|
| BLAKE2b-256 |
e3c1684953e62b483894691a246d947013ae3d2d14c9379cff2436f5c317286e
|