Unified file storage component with S3 and FastDFS fusion support
Project description
s3py-helper
1. Component Overview
s3py-helper is a unified asynchronous file storage component that supports both S3 protocol object storage and the FastDFS distributed file system. It is designed for the Python 3.12+ asynchronous ecosystem, built on aioboto3 and aiofdfs respectively.
The component adopts a layered architecture. The s3raw package provides raw operations for the S3 protocol, the fastdfs package provides raw operations for FastDFS, and the fusion package implements automatic routing between the two at the upper layer. The core design of the fusion layer is automatic routing based on the file_id prefix: file_ids starting with s3: are routed to the S3 backend, while all others are routed to the FastDFS backend.
The component supports three storage modes via StorageModeConfig: s3 (S3 only), fastdfs (FastDFS only), and hybrid (both backends). In hybrid mode, upload operations go to S3 with the s3: prefix added, while download, delete, and metadata query operations automatically route based on the file_id prefix. In s3 mode, all operations go to S3. In fastdfs mode, all operations go to FastDFS. The StorageManager class serves as the unified entry point, automatically creating the required clients and services based on the configured mode.
This design allows business code to work with a single set of fusion service interfaces, transparently operating on one or both storage backends, and supports packaging files from mixed sources in batch download scenarios. The StorageManager manages the lifecycle of all asynchronous clients using a context manager pattern.
2. Integration and Configuration Guide
Dependency Installation
The project uses uv or pip for dependency management. Core dependencies include aioboto3 (S3 async client), aiofdfs (FastDFS async client), pydantic (configuration models), aiofiles (async file operations), and fastapi (UploadFile support).
pip install s3py-helper
S3 Protocol Storage Configuration
Configure S3-compatible object storage services (such as MinIO, AWS S3, etc.) using S3Config. endpoint is the service address, access_key_id and secret_access_key are the authentication credentials, default_bucket specifies the default bucket name, and region is the region identifier, defaulting to us-east-1.
from s3py_helper.storage import S3Config
s3_config = S3Config(
endpoint="http://192.168.30.36:9001",
access_key_id="your-access-key",
secret_access_key="your-secret-key",
default_bucket="my-bucket",
region="us-east-1",
)
FastDFS Configuration
Configure FastDFS connection parameters using FastdfsConfig. tracker_servers is a list of tracker server addresses (format host:port), connect_timeout and network_timeout control the connection timeout and network read/write timeout (in seconds) respectively, store_path_index specifies the storage path index, default -1 for automatic selection.
from s3py_helper.storage import FastdfsConfig
fastdfs_config = FastdfsConfig(
tracker_servers=["192.168.30.36:22122"],
connect_timeout=2,
network_timeout=30,
)
Storage Mode Configuration
StorageModeConfig defines which storage backend(s) to use via the mode field. Three modes are supported:
hybrid(default): Both S3 and FastDFS. Upload goes to S3, download/delete/info auto-route byfile_idprefix. Requires boths3_configandfastdfs_config.s3: S3 only. All operations go to S3. Requiress3_config. Accessing FastDFS-routed files raisesRouteNotSupportedException.fastdfs: FastDFS only. All operations go to FastDFS. Requiresfastdfs_config. Accessing S3-routed files raisesRouteNotSupportedException.
from s3py_helper.storage import StorageModeConfig, StorageMode
# Hybrid mode (default, both backends)
mode_config = StorageModeConfig(mode=StorageMode.HYBRID)
# S3 only mode
mode_config = StorageModeConfig(mode=StorageMode.S3)
# FastDFS only mode
mode_config = StorageModeConfig(mode=StorageMode.FASTDFS)
File Validation Configuration
FileTypeConfig is used for file type validation during uploads, supporting both whitelist and blacklist modes. When the whitelist is non-empty, only extensions in the whitelist are allowed. The blacklist always takes effect. Additionally, FileValidatorImpl supports limiting file size via the max_file_size parameter. If validation is not required, it can be omitted or default values can be passed.
from s3py_helper.storage import FileTypeConfig, FileValidatorImpl
file_type_config = FileTypeConfig(
whitelist=["pdf", "jpg", "png", "zip"],
blacklist=["exe", "bat"],
)
validator = FileValidatorImpl(file_type_config=file_type_config, max_file_size=100 * 1024 * 1024)
3. Guide for Fusion Usage of FastDFS and S3
The fusion package provides unified interfaces for upload, download, deletion, and metadata querying. Business code does not need to worry about whether the underlying storage is S3 or FastDFS. All fusion services are assembled from raw services built using S3Client and FastdfsClient.
Service Initialization
StorageManager is the recommended entry point for initializing all fusion services. It manages client lifecycles and automatically creates the required services based on the configured mode. Use the async with context manager to ensure proper resource cleanup.
from s3py_helper.storage import (
StorageManager, StorageModeConfig, StorageMode,
S3Config, FastdfsConfig,
)
s3_config = S3Config(
endpoint="http://192.168.30.36:9001",
access_key_id="your-access-key",
secret_access_key="your-secret-key",
default_bucket="my-bucket",
)
fastdfs_config = FastdfsConfig(tracker_servers=["192.168.30.36:22122"])
async with StorageManager(
mode_config=StorageModeConfig(mode=StorageMode.HYBRID),
s3_config=s3_config,
fastdfs_config=fastdfs_config,
) as manager:
# Access services via properties
fusion_upload = manager.upload_service
fusion_download = manager.download_service
fusion_delete = manager.delete_service
fusion_info = manager.info_service
For S3-only or FastDFS-only modes, only the corresponding config is required:
# S3 only
async with StorageManager(
mode_config=StorageModeConfig(mode=StorageMode.S3),
s3_config=s3_config,
) as manager:
attach = await manager.upload_service.upload_bytes("report.pdf", content)
# FastDFS only
async with StorageManager(
mode_config=StorageModeConfig(mode=StorageMode.FASTDFS),
fastdfs_config=fastdfs_config,
) as manager:
attach = await manager.upload_service.upload_bytes("report.pdf", content)
File Upload
The fusion upload service routes based on the storage mode. In s3 and hybrid modes, files are uploaded to S3 with the s3: prefix added to the file_id. In fastdfs mode, files are uploaded to FastDFS without the s3: prefix. Before uploading, it performs file name validity checks, file type validation, and file size validation. The returned FileAttach contains the fusion-format file_id and the original file name.
upload_bytes is used for uploading in-memory byte data, upload_file is used for uploading FastAPI UploadFile objects (streaming internally to avoid loading the entire file into memory), upload_stream is used for uploading custom binary streams or async iterators, and upload_files supports batch uploading multiple UploadFile objects. All methods support an optional folder parameter to specify the prefix directory for the S3 object key, a bucket parameter to override the default bucket, and a metadata parameter (dict[str, str]) to attach custom metadata to the uploaded file.
# Upload byte data
attach = await fusion_upload.upload_bytes("report.pdf", content, folder="reports")
# Upload FastAPI UploadFile (streaming)
attach = await fusion_upload.upload_file(upload_file, folder="uploads")
# Upload custom stream
import io
stream = io.BytesIO(content)
attach = await fusion_upload.upload_stream("data.csv", stream, len(content), folder="data")
# Upload with custom metadata
attach = await fusion_upload.upload_bytes(
"report.pdf", content, folder="reports",
metadata={"author": "john", "category": "finance"},
)
File Download
The fusion download service automatically routes based on the file_id prefix to the corresponding backend. download_bytes reads the entire file into memory and returns bytes, suitable for small files. download_to_stream writes the data streamingly into the specified BinaryIO output stream, suitable for directly writing download content to a file or buffer. download_to_generator returns an asynchronous generator that yields data in chunks, suitable for streaming response scenarios.
# Download as byte data
data = await fusion_download.download_bytes("s3:my-bucket/reports/uuid_report.pdf")
# Download to output stream
with open("output.pdf", "wb") as f:
await fusion_download.download_to_stream("s3:my-bucket/reports/uuid_report.pdf", f)
# Streaming download (async iterator)
async for chunk in fusion_download.download_to_generator(some_file_id):
process(chunk)
Mixed-Source Batch Download as Zip
download_files_as_zip supports packaging multiple files from S3 and FastDFS into a single ZIP archive written to an output stream. Each file is specified via FileDownload with its file_id and an optional display name. The service automatically identifies the source backend and streams each file's content into the ZIP archive. When file_name is missing, the filename part from the file_id is used as the entry name in the ZIP archive. Duplicate entry names are automatically suffixed with a sequence number to avoid overwrites.
from s3py_helper.storage import FileDownload
import io
files = [
FileDownload(file_id="s3:my-bucket/reports/uuid_report.pdf", file_name="report.pdf"),
FileDownload(file_id="group1/M00/00/00/old_contract.pdf", file_name="contract.pdf"),
]
buf = io.BytesIO()
await fusion_download.download_files_as_zip(files, buf)
For scenarios where the ZIP stream needs to be sent directly to a FastAPI StreamingResponse, the stream_zip method can be used. It returns an asynchronous generator that internally uses a queue to parallelize ZIP writing and network sending.
from starlette.responses import StreamingResponse
async def download_zip(request):
files = [FileDownload(file_id="s3:my-bucket/doc.pdf", file_name="doc.pdf")]
return StreamingResponse(
fusion_download.stream_zip(files),
media_type="application/zip",
headers={"Content-Disposition": "attachment; filename=files.zip"},
)
File Deletion
delete_file automatically routes to the S3 or FastDFS backend for deletion based on the file_id prefix. S3 deletion is idempotent (deleting a non-existent object does not raise an error). In s3 mode, accessing FastDFS-routed files raises a RouteNotSupportedException.
# Delete S3 file
await fusion_delete.delete_file("s3:my-bucket/reports/uuid_report.pdf")
# Delete FastDFS file (hybrid/fastdfs mode only)
await fusion_delete.delete_file("group1/M00/00/00/some_file.pdf")
Metadata Query
get_file_info routes to the corresponding backend based on the file_id prefix to query metadata, returning a unified FileBasicInfo object. The S3 backend uses head_object to get file size, last modification time, storage class, and custom metadata. The FastDFS backend uses get_meta_data to get the file name and size. For S3 files, the returned file_id retains the fusion-layer s3: prefix, making it convenient for subsequent operations to use directly. A FileNotFoundException is raised when the file does not exist.
info = await fusion_info.get_file_info("s3:my-bucket/reports/uuid_report.pdf")
print(info.file_name) # File name
print(info.file_size) # File size (bytes)
print(info.last_modify_time) # Last modification time (S3)
print(info.storage_class) # Storage class (S3)
print(info.metadata) # Custom metadata (S3)
4. Guide for Using Pure S3 Protocol Object Storage
The s3raw package provides direct operation interfaces for the S3 protocol without routing logic or file validation, suitable for scenarios requiring only S3 storage or scenarios needing finer control.
Client and Service Initialization
All S3 raw services share the same S3Client instance. The client manages the lifecycle of the underlying connection pool via the async with context manager. S3RawUploadServiceImpl requires S3Config to resolve the default bucket, while other services only depend on the client instance.
from s3py_helper.storage.s3raw.client.s3_client import S3Client
from s3py_helper.storage.s3raw.service.upload_service import S3RawUploadServiceImpl
from s3py_helper.storage.s3raw.service.download_service import S3RawDownloadServiceImpl
from s3py_helper.storage.s3raw.service.delete_service import S3RawDeleteServiceImpl
from s3py_helper.storage.s3raw.service.info_service import S3RawInfoServiceImpl
async with S3Client(s3_config) as s3:
upload_svc = S3RawUploadServiceImpl(s3, s3_config)
download_svc = S3RawDownloadServiceImpl(s3)
delete_svc = S3RawDeleteServiceImpl(s3)
info_svc = S3RawInfoServiceImpl(s3)
The returned file_id format is bucket/key (without a prefix). Subsequent operations can use this file_id directly.
Upload
upload_bytes uploads byte data to S3. upload_stream supports uploading from a BinaryIO or async iterator. upload_file accepts a FastAPI UploadFile object and streams it to save memory. upload_files supports batch uploading. All methods return a FileAttach, where file_id is in the format bucket/key and file_name is the original file name. When uploading, the S3 object key is automatically generated in the format [folder/]uuid_prefix_filename, where uuid_prefix is a 16-character random hexadecimal string ensuring uniqueness.
# Byte upload
attach = await upload_svc.upload_bytes("report.pdf", content, folder="reports")
# attach.file_id → "my-bucket/reports/a1b2c3d4e5f6g7h8_report.pdf"
# Stream upload
import io
attach = await upload_svc.upload_stream("data.bin", io.BytesIO(content), len(content))
# UploadFile upload (streaming, does not load entire file into memory)
attach = await upload_svc.upload_file(upload_file, bucket="other-bucket")
Download
download_bytes reads the entire content at once and returns bytes, suitable for small files. download_to_stream writes data in chunks to the specified BinaryIO output stream, keeping memory usage controllable. download_to_generator returns an asynchronous generator that yields data progressively in fixed block sizes (default 64KB), suitable for streaming responses or large file processing. Internally, it adapts to different versions of the aiobotocore response body interface, automatically falling back for versions that do not support chunked reading.
# Download as bytes
data = await download_svc.download_bytes("my-bucket/reports/uuid_report.pdf")
# Download to file
with open("output.pdf", "wb") as f:
await download_svc.download_to_stream("my-bucket/reports/uuid_report.pdf", f)
# Streaming read
async for chunk in download_svc.download_to_generator("my-bucket/reports/uuid_report.pdf"):
process(chunk)
Multi-File Packaged Download
download_files_as_zip streams multiple S3 files one by one into a ZIP archive, without loading all files into memory simultaneously. Each file is specified by a FileDownload containing its file_id and an optional display name. Entry names within the ZIP are path-sanitized (defending against Zip Slip), and sequence numbers are automatically added for duplicate names.
stream_zip is an asynchronous generator wrapper for FastAPI StreamingResponse. It internally uses asyncio.Queue to parallelize ZIP writing and data production, making it suitable for direct return as a streaming response body to the client.
from s3py_helper.storage import FileDownload
# Write to buffer
files = [
FileDownload(file_id="my-bucket/docs/a.pdf", file_name="Document A.pdf"),
FileDownload(file_id="my-bucket/docs/b.pdf", file_name="Document B.pdf"),
]
buf = io.BytesIO()
await download_svc.download_files_as_zip(files, buf)
# Direct streaming response
from starlette.responses import StreamingResponse
@app.get("/download")
async def download():
return StreamingResponse(
download_svc.stream_zip(files),
media_type="application/zip",
)
Deletion
delete_file deletes an S3 object using its file_id. S3 deletion operations are idempotent; deleting a non-existent object does not raise an error. The file_id format is bucket/key.
await delete_svc.delete_file("my-bucket/reports/uuid_report.pdf")
Metadata Query
get_file_info retrieves metadata of an object via the S3 head_object operation, returning a FileBasicInfo. This includes the file size, last modification time, storage class (e.g., STANDARD), and custom metadata. A FileNotFoundException is raised if the object does not exist.
info = await info_svc.get_file_info("my-bucket/reports/uuid_report.pdf")
print(info.file_size) # File size (bytes)
print(info.last_modify_time) # Last modification time
print(info.storage_class) # Storage class
print(info.metadata) # Custom metadata dictionary
Multipart Upload
S3Client also provides low-level interfaces for multipart uploads, suitable for very large file upload scenarios where manual control over part logic is needed. The complete workflow is: create_multipart_upload initializes the upload and obtains an UploadId -> upload_part is called multiple times to upload parts -> complete_multipart_upload completes the assembly. During the upload, abort_multipart_upload can be called to cancel and clean up already uploaded parts.
async with S3Client(s3_config) as s3:
# Initiate multipart upload
result = await s3.create_multipart_upload("my-bucket", "large-file.bin")
upload_id = result["UploadId"]
# Upload parts
parts = []
for part_num, chunk in enumerate(read_chunks("large-file.bin", 8 * 1024 * 1024), start=1):
result = await s3.upload_part("my-bucket", "large-file.bin", upload_id, part_num, chunk)
parts.append({"PartNumber": part_num, "ETag": result["ETag"]})
# Complete assembly
await s3.complete_multipart_upload("my-bucket", "large-file.bin", upload_id, parts)
5. Guide for Integration with FastAPI
This section provides a complete FastAPI application example demonstrating how to integrate fusion storage services within request handling. The core idea is to use FastAPI's dependency injection system to manage client lifecycles and service instances, allowing route functions to simply declare dependencies for direct usage.
Dependency Injection and Service Management
It is recommended to provide fusion services to routes via FastAPI's dependency injection (Depends). Clients are created and cached when the application starts and released when the application shuts down, avoiding re-establishing connections for each request. The following example uses global variables to hold client instances and uses yield-style dependencies to ensure request-level service assembly.
from contextlib import asynccontextmanager
from fastapi import FastAPI, Depends, UploadFile, File
from starlette.responses import StreamingResponse
from s3py_helper.storage import (
StorageManager, StorageModeConfig, StorageMode,
S3Config, FastdfsConfig, FileTypeConfig,
FileDownload,
)
# ---- Configuration ----
s3_config = S3Config(
endpoint="http://192.168.30.36:9001",
access_key_id="your-access-key",
secret_access_key="your-secret-key",
default_bucket="my-bucket",
)
fastdfs_config = FastdfsConfig(tracker_servers=["192.168.30.36:22122"])
# ---- StorageManager Lifecycle ----
_manager: StorageManager | None = None
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Create StorageManager on application startup, release on shutdown."""
global _manager
_manager = StorageManager(
mode_config=StorageModeConfig(mode=StorageMode.HYBRID),
s3_config=s3_config,
fastdfs_config=fastdfs_config,
)
async with _manager:
yield
app = FastAPI(lifespan=lifespan)
# ---- Access Services ----
def get_manager() -> StorageManager:
return _manager
File Upload Endpoint
The upload endpoint receives an UploadFile, streams it to S3 via the fusion service, and returns the fusion-format file_id.
@app.post("/files/upload")
async def upload_file(file: UploadFile = File(...)):
attach = await get_manager().upload_service.upload_file(file, folder="uploads")
return {"file_id": attach.file_id, "file_name": attach.file_name}
@app.post("/files/batch-upload")
async def batch_upload(files: list[UploadFile] = File(...)):
results = await get_manager().upload_service.upload_files(files, folder="batch")
return [{"file_id": r.file_id, "file_name": r.file_name} for r in results]
File Download Endpoints
Single file downloads retrieve content via download_bytes and return it directly. Batch downloads use stream_zip to return a streaming ZIP response. Data is written to a queue asynchronously in the background and read from the queue on the foreground side, keeping memory usage constant.
from starlette.responses import Response
@app.get("/files/{file_id:path}/download")
async def download_file(file_id: str):
data = await get_manager().download_service.download_bytes(file_id)
return Response(content=data, media_type="application/octet-stream")
@app.post("/files/download-zip")
async def download_zip(body: list[FileDownload]):
return StreamingResponse(
get_manager().download_service.stream_zip(body),
media_type="application/zip",
headers={"Content-Disposition": "attachment; filename=files.zip"},
)
The return value of stream_zip can be used directly as the content body of a StreamingResponse. The list of FileDownload objects passed by the requester can contain both S3 files (with s3: prefix) and FastDFS files; the fusion layer automatically identifies the source and pulls data from the corresponding backend.
File Deletion and Metadata Query Endpoints
The deletion and metadata query endpoints directly accept fusion-format file_ids. The services automatically route based on the prefix.
@app.delete("/files/{file_id:path}")
async def delete_file(file_id: str):
await get_manager().delete_service.delete_file(file_id)
return {"detail": "deleted"}
@app.get("/files/{file_id:path}/info")
async def get_file_info(file_id: str):
info = await get_manager().info_service.get_file_info(file_id)
return {
"file_id": info.file_id,
"file_name": info.file_name,
"file_size": info.file_size,
"last_modify_time": info.last_modify_time.isoformat() if info.last_modify_time else None,
}
Exception Handling
The component defines a clear exception hierarchy. It is recommended to register global exception handlers in FastAPI to map storage exceptions to appropriate HTTP status codes. The base class StorageException maps to 500, FileNotFoundException maps to 404, InvalidArgumentException and InvalidFileTypeException map to 400, and DeleteNotSupportedException and RouteNotSupportedException map to 403.
from fastapi.responses import JSONResponse
from s3py_helper.storage import (
StorageException, FileNotFoundException,
InvalidArgumentException, InvalidFileTypeException,
DeleteNotSupportedException, RouteNotSupportedException,
)
@app.exception_handler(FileNotFoundException)
async def not_found_handler(request, exc):
return JSONResponse(status_code=404, content={"detail": str(exc)})
@app.exception_handler(InvalidArgumentException)
@app.exception_handler(InvalidFileTypeException)
async def bad_request_handler(request, exc):
return JSONResponse(status_code=400, content={"detail": str(exc)})
@app.exception_handler(DeleteNotSupportedException)
@app.exception_handler(RouteNotSupportedException)
async def forbidden_handler(request, exc):
return JSONResponse(status_code=403, content={"detail": str(exc)})
@app.exception_handler(StorageException)
async def storage_error_handler(request, exc):
return JSONResponse(status_code=500, content={"detail": str(exc)})
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file s3py_helper-0.0.13.tar.gz.
File metadata
- Download URL: s3py_helper-0.0.13.tar.gz
- Upload date:
- Size: 25.4 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.8.8
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
93dc2981831f4484fc1956ac9bf3bba6c0b5d31abaf376e55abb8513c1acdaac
|
|
| MD5 |
996af672a796d630b988df0bd7cf6a16
|
|
| BLAKE2b-256 |
41c32e7ac8702b4e09f35f8d45301885918f8911e822716a63e5654a2003184f
|
File details
Details for the file s3py_helper-0.0.13-py3-none-any.whl.
File metadata
- Download URL: s3py_helper-0.0.13-py3-none-any.whl
- Upload date:
- Size: 41.4 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.8.8
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
229f9a4c1b442f837d1a136ac73fc2aba842684609101fb2ef7edae36094c868
|
|
| MD5 |
7796bd39519f10a09e6d4801eba67e69
|
|
| BLAKE2b-256 |
aa817bc0ecbb0d060ddbeab4e0e8fd202d096931d5885bd67a5f2566e2f10d7f
|