Skip to main content

Unified file storage component with S3 and FastDFS fusion support

Project description

s3py-helper

1. Component Overview

s3py-helper is a unified asynchronous file storage component that supports both S3 protocol object storage and the FastDFS distributed file system. It is designed for the Python 3.12+ asynchronous ecosystem, built on aioboto3 and aiofdfs respectively.

The component adopts a layered architecture. The s3raw package provides raw operations for the S3 protocol, the fastdfs package provides raw operations for FastDFS, and the fusion package implements automatic routing between the two at the upper layer. The core design of the fusion layer is automatic routing based on the file_id prefix: file_ids starting with s3: are routed to the S3 backend, while all others are routed to the FastDFS backend.

The component supports three storage modes via StorageModeConfig: s3 (S3 only), fastdfs (FastDFS only), and hybrid (both backends). In hybrid mode, upload operations go to S3 with the s3: prefix added, while download, delete, and metadata query operations automatically route based on the file_id prefix. In s3 mode, all operations go to S3. In fastdfs mode, all operations go to FastDFS. The StorageManager class serves as the unified entry point, automatically creating the required clients and services based on the configured mode.

This design allows business code to work with a single set of fusion service interfaces, transparently operating on one or both storage backends, and supports packaging files from mixed sources in batch download scenarios. The StorageManager manages the lifecycle of all asynchronous clients using a context manager pattern.

2. Integration and Configuration Guide

Dependency Installation

The project uses uv or pip for dependency management. Core dependencies include aioboto3 (S3 async client), aiofdfs (FastDFS async client), pydantic (configuration models), aiofiles (async file operations), and fastapi (UploadFile support).

pip install s3py-helper

S3 Protocol Storage Configuration

Configure S3-compatible object storage services (such as MinIO, AWS S3, etc.) using S3Config. endpoint is the service address, access_key_id and secret_access_key are the authentication credentials, default_bucket specifies the default bucket name, and region is the region identifier, defaulting to us-east-1.

from s3py_helper.storage import S3Config

s3_config = S3Config(
    endpoint="http://192.168.30.36:9001",
    access_key_id="your-access-key",
    secret_access_key="your-secret-key",
    default_bucket="my-bucket",
    region="us-east-1",
)

FastDFS Configuration

Configure FastDFS connection parameters using FastdfsConfig. tracker_servers is a list of tracker server addresses (format host:port), connect_timeout and network_timeout control the connection timeout and network read/write timeout (in seconds) respectively, store_path_index specifies the storage path index, default -1 for automatic selection.

from s3py_helper.storage import FastdfsConfig

fastdfs_config = FastdfsConfig(
    tracker_servers=["192.168.30.36:22122"],
    connect_timeout=2,
    network_timeout=30,
)

Storage Mode Configuration

StorageModeConfig defines which storage backend(s) to use via the mode field. Three modes are supported:

  • hybrid (default): Both S3 and FastDFS. Upload goes to S3, download/delete/info auto-route by file_id prefix. Requires both s3_config and fastdfs_config.
  • s3: S3 only. All operations go to S3. Requires s3_config. Accessing FastDFS-routed files raises RouteNotSupportedException.
  • fastdfs: FastDFS only. All operations go to FastDFS. Requires fastdfs_config. Accessing S3-routed files raises RouteNotSupportedException.
from s3py_helper.storage import StorageModeConfig, StorageMode

# Hybrid mode (default, both backends)
mode_config = StorageModeConfig(mode=StorageMode.HYBRID)

# S3 only mode
mode_config = StorageModeConfig(mode=StorageMode.S3)

# FastDFS only mode
mode_config = StorageModeConfig(mode=StorageMode.FASTDFS)

File Validation Configuration

FileTypeConfig is used for file type validation during uploads, supporting both whitelist and blacklist modes. When the whitelist is non-empty, only extensions in the whitelist are allowed. The blacklist always takes effect. Additionally, FileValidatorImpl supports limiting file size via the max_file_size parameter. If validation is not required, it can be omitted or default values can be passed.

from s3py_helper.storage import FileTypeConfig, FileValidatorImpl

file_type_config = FileTypeConfig(
    whitelist=["pdf", "jpg", "png", "zip"],
    blacklist=["exe", "bat"],
)
validator = FileValidatorImpl(file_type_config=file_type_config, max_file_size=100 * 1024 * 1024)

3. Guide for Fusion Usage of FastDFS and S3

The fusion package provides unified interfaces for upload, download, deletion, and metadata querying. Business code does not need to worry about whether the underlying storage is S3 or FastDFS. All fusion services are assembled from raw services built using S3Client and FastdfsClient.

Service Initialization

StorageManager is the recommended entry point for initializing all fusion services. It manages client lifecycles and automatically creates the required services based on the configured mode. Use the async with context manager to ensure proper resource cleanup.

from s3py_helper.storage import (
    StorageManager, StorageModeConfig, StorageMode,
    S3Config, FastdfsConfig,
)

s3_config = S3Config(
    endpoint="http://192.168.30.36:9001",
    access_key_id="your-access-key",
    secret_access_key="your-secret-key",
    default_bucket="my-bucket",
)
fastdfs_config = FastdfsConfig(tracker_servers=["192.168.30.36:22122"])

async with StorageManager(
    mode_config=StorageModeConfig(mode=StorageMode.HYBRID),
    s3_config=s3_config,
    fastdfs_config=fastdfs_config,
) as manager:
    # Access services via properties
    fusion_upload = manager.upload_service
    fusion_download = manager.download_service
    fusion_delete = manager.delete_service
    fusion_info = manager.info_service

For S3-only or FastDFS-only modes, only the corresponding config is required:

# S3 only
async with StorageManager(
    mode_config=StorageModeConfig(mode=StorageMode.S3),
    s3_config=s3_config,
) as manager:
    attach = await manager.upload_service.upload_bytes("report.pdf", content)

# FastDFS only
async with StorageManager(
    mode_config=StorageModeConfig(mode=StorageMode.FASTDFS),
    fastdfs_config=fastdfs_config,
) as manager:
    attach = await manager.upload_service.upload_bytes("report.pdf", content)

File Upload

The fusion upload service routes based on the storage mode. In s3 and hybrid modes, files are uploaded to S3 with the s3: prefix added to the file_id. In fastdfs mode, files are uploaded to FastDFS without the s3: prefix. Before uploading, it performs file name validity checks, file type validation, and file size validation. The returned FileAttach contains the fusion-format file_id and the original file name.

upload_bytes is used for uploading in-memory byte data, upload_file is used for uploading FastAPI UploadFile objects (streaming internally to avoid loading the entire file into memory), upload_stream is used for uploading custom binary streams or async iterators, and upload_files supports batch uploading multiple UploadFile objects. All methods support an optional folder parameter to specify the prefix directory for the S3 object key, a bucket parameter to override the default bucket, and a metadata parameter (dict[str, str]) to attach custom metadata to the uploaded file.

# Upload byte data
attach = await fusion_upload.upload_bytes("report.pdf", content, folder="reports")

# Upload FastAPI UploadFile (streaming)
attach = await fusion_upload.upload_file(upload_file, folder="uploads")

# Upload custom stream
import io
stream = io.BytesIO(content)
attach = await fusion_upload.upload_stream("data.csv", stream, len(content), folder="data")

# Upload with custom metadata
attach = await fusion_upload.upload_bytes(
    "report.pdf", content, folder="reports",
    metadata={"author": "john", "category": "finance"},
)

File Download

The fusion download service automatically routes based on the file_id prefix to the corresponding backend. download_bytes reads the entire file into memory and returns bytes, suitable for small files. download_to_stream writes the data streamingly into the specified BinaryIO output stream, suitable for directly writing download content to a file or buffer. download_to_generator returns an asynchronous generator that yields data in chunks, suitable for streaming response scenarios.

# Download as byte data
data = await fusion_download.download_bytes("s3:my-bucket/reports/uuid_report.pdf")

# Download to output stream
with open("output.pdf", "wb") as f:
    await fusion_download.download_to_stream("s3:my-bucket/reports/uuid_report.pdf", f)

# Streaming download (async iterator)
async for chunk in fusion_download.download_to_generator(some_file_id):
    process(chunk)

Mixed-Source Batch Download as Zip

download_files_as_zip supports packaging multiple files from S3 and FastDFS into a single ZIP archive written to an output stream. Each file is specified via FileDownload with its file_id and an optional display name. The service automatically identifies the source backend and streams each file's content into the ZIP archive. When file_name is missing, the filename part from the file_id is used as the entry name in the ZIP archive. Duplicate entry names are automatically suffixed with a sequence number to avoid overwrites.

from s3py_helper.storage import FileDownload
import io

files = [
    FileDownload(file_id="s3:my-bucket/reports/uuid_report.pdf", file_name="report.pdf"),
    FileDownload(file_id="group1/M00/00/00/old_contract.pdf", file_name="contract.pdf"),
]
buf = io.BytesIO()
await fusion_download.download_files_as_zip(files, buf)

For scenarios where the ZIP stream needs to be sent directly to a FastAPI StreamingResponse, the stream_zip method can be used. It returns an asynchronous generator that internally uses a queue to parallelize ZIP writing and network sending.

from starlette.responses import StreamingResponse

async def download_zip(request):
    files = [FileDownload(file_id="s3:my-bucket/doc.pdf", file_name="doc.pdf")]
    return StreamingResponse(
        fusion_download.stream_zip(files),
        media_type="application/zip",
        headers={"Content-Disposition": "attachment; filename=files.zip"},
    )

File Deletion

delete_file automatically routes to the S3 or FastDFS backend for deletion based on the file_id prefix. S3 deletion is idempotent (deleting a non-existent object does not raise an error). In s3 mode, accessing FastDFS-routed files raises a RouteNotSupportedException.

# Delete S3 file
await fusion_delete.delete_file("s3:my-bucket/reports/uuid_report.pdf")

# Delete FastDFS file (hybrid/fastdfs mode only)
await fusion_delete.delete_file("group1/M00/00/00/some_file.pdf")

Metadata Query

get_file_info routes to the corresponding backend based on the file_id prefix to query metadata, returning a unified FileBasicInfo object. The S3 backend uses head_object to get file size, last modification time, storage class, and custom metadata. The FastDFS backend uses get_meta_data to get the file name and size. For S3 files, the returned file_id retains the fusion-layer s3: prefix, making it convenient for subsequent operations to use directly. A FileNotFoundException is raised when the file does not exist.

info = await fusion_info.get_file_info("s3:my-bucket/reports/uuid_report.pdf")
print(info.file_name)    # File name
print(info.file_size)    # File size (bytes)
print(info.last_modify_time)  # Last modification time (S3)
print(info.storage_class)     # Storage class (S3)
print(info.metadata)          # Custom metadata (S3)

4. Guide for Using Pure S3 Protocol Object Storage

The s3raw package provides direct operation interfaces for the S3 protocol without routing logic or file validation, suitable for scenarios requiring only S3 storage or scenarios needing finer control.

Client and Service Initialization

All S3 raw services share the same S3Client instance. The client manages the lifecycle of the underlying connection pool via the async with context manager. S3RawUploadServiceImpl requires S3Config to resolve the default bucket, while other services only depend on the client instance.

from s3py_helper.storage.s3raw.client.s3_client import S3Client
from s3py_helper.storage.s3raw.service.upload_service import S3RawUploadServiceImpl
from s3py_helper.storage.s3raw.service.download_service import S3RawDownloadServiceImpl
from s3py_helper.storage.s3raw.service.delete_service import S3RawDeleteServiceImpl
from s3py_helper.storage.s3raw.service.info_service import S3RawInfoServiceImpl

async with S3Client(s3_config) as s3:
    upload_svc = S3RawUploadServiceImpl(s3, s3_config)
    download_svc = S3RawDownloadServiceImpl(s3)
    delete_svc = S3RawDeleteServiceImpl(s3)
    info_svc = S3RawInfoServiceImpl(s3)

The returned file_id format is bucket/key (without a prefix). Subsequent operations can use this file_id directly.

Upload

upload_bytes uploads byte data to S3. upload_stream supports uploading from a BinaryIO or async iterator. upload_file accepts a FastAPI UploadFile object and streams it to save memory. upload_files supports batch uploading. All methods return a FileAttach, where file_id is in the format bucket/key and file_name is the original file name. When uploading, the S3 object key is automatically generated in the format [folder/]uuid_prefix_filename, where uuid_prefix is a 16-character random hexadecimal string ensuring uniqueness.

# Byte upload
attach = await upload_svc.upload_bytes("report.pdf", content, folder="reports")
# attach.file_id → "my-bucket/reports/a1b2c3d4e5f6g7h8_report.pdf"

# Stream upload
import io
attach = await upload_svc.upload_stream("data.bin", io.BytesIO(content), len(content))

# UploadFile upload (streaming, does not load entire file into memory)
attach = await upload_svc.upload_file(upload_file, bucket="other-bucket")

Download

download_bytes reads the entire content at once and returns bytes, suitable for small files. download_to_stream writes data in chunks to the specified BinaryIO output stream, keeping memory usage controllable. download_to_generator returns an asynchronous generator that yields data progressively in fixed block sizes (default 64KB), suitable for streaming responses or large file processing. Internally, it adapts to different versions of the aiobotocore response body interface, automatically falling back for versions that do not support chunked reading.

# Download as bytes
data = await download_svc.download_bytes("my-bucket/reports/uuid_report.pdf")

# Download to file
with open("output.pdf", "wb") as f:
    await download_svc.download_to_stream("my-bucket/reports/uuid_report.pdf", f)

# Streaming read
async for chunk in download_svc.download_to_generator("my-bucket/reports/uuid_report.pdf"):
    process(chunk)

Multi-File Packaged Download

download_files_as_zip streams multiple S3 files one by one into a ZIP archive, without loading all files into memory simultaneously. Each file is specified by a FileDownload containing its file_id and an optional display name. Entry names within the ZIP are path-sanitized (defending against Zip Slip), and sequence numbers are automatically added for duplicate names.

stream_zip is an asynchronous generator wrapper for FastAPI StreamingResponse. It internally uses asyncio.Queue to parallelize ZIP writing and data production, making it suitable for direct return as a streaming response body to the client.

from s3py_helper.storage import FileDownload

# Write to buffer
files = [
    FileDownload(file_id="my-bucket/docs/a.pdf", file_name="Document A.pdf"),
    FileDownload(file_id="my-bucket/docs/b.pdf", file_name="Document B.pdf"),
]
buf = io.BytesIO()
await download_svc.download_files_as_zip(files, buf)

# Direct streaming response
from starlette.responses import StreamingResponse

@app.get("/download")
async def download():
    return StreamingResponse(
        download_svc.stream_zip(files),
        media_type="application/zip",
    )

Deletion

delete_file deletes an S3 object using its file_id. S3 deletion operations are idempotent; deleting a non-existent object does not raise an error. The file_id format is bucket/key.

await delete_svc.delete_file("my-bucket/reports/uuid_report.pdf")

Metadata Query

get_file_info retrieves metadata of an object via the S3 head_object operation, returning a FileBasicInfo. This includes the file size, last modification time, storage class (e.g., STANDARD), and custom metadata. A FileNotFoundException is raised if the object does not exist.

info = await info_svc.get_file_info("my-bucket/reports/uuid_report.pdf")
print(info.file_size)         # File size (bytes)
print(info.last_modify_time)  # Last modification time
print(info.storage_class)     # Storage class
print(info.metadata)          # Custom metadata dictionary

Multipart Upload

S3Client also provides low-level interfaces for multipart uploads, suitable for very large file upload scenarios where manual control over part logic is needed. The complete workflow is: create_multipart_upload initializes the upload and obtains an UploadId -> upload_part is called multiple times to upload parts -> complete_multipart_upload completes the assembly. During the upload, abort_multipart_upload can be called to cancel and clean up already uploaded parts.

async with S3Client(s3_config) as s3:
    # Initiate multipart upload
    result = await s3.create_multipart_upload("my-bucket", "large-file.bin")
    upload_id = result["UploadId"]

    # Upload parts
    parts = []
    for part_num, chunk in enumerate(read_chunks("large-file.bin", 8 * 1024 * 1024), start=1):
        result = await s3.upload_part("my-bucket", "large-file.bin", upload_id, part_num, chunk)
        parts.append({"PartNumber": part_num, "ETag": result["ETag"]})

    # Complete assembly
    await s3.complete_multipart_upload("my-bucket", "large-file.bin", upload_id, parts)

5. Guide for Integration with FastAPI

This section provides a complete FastAPI application example demonstrating how to integrate fusion storage services within request handling. The core idea is to use FastAPI's dependency injection system to manage client lifecycles and service instances, allowing route functions to simply declare dependencies for direct usage.

Dependency Injection and Service Management

It is recommended to provide fusion services to routes via FastAPI's dependency injection (Depends). Clients are created and cached when the application starts and released when the application shuts down, avoiding re-establishing connections for each request. The following example uses global variables to hold client instances and uses yield-style dependencies to ensure request-level service assembly.

from contextlib import asynccontextmanager
from fastapi import FastAPI, Depends, UploadFile, File
from starlette.responses import StreamingResponse

from s3py_helper.storage import (
    StorageManager, StorageModeConfig, StorageMode,
    S3Config, FastdfsConfig, FileTypeConfig,
    FileDownload,
)

# ---- Configuration ----

s3_config = S3Config(
    endpoint="http://192.168.30.36:9001",
    access_key_id="your-access-key",
    secret_access_key="your-secret-key",
    default_bucket="my-bucket",
)
fastdfs_config = FastdfsConfig(tracker_servers=["192.168.30.36:22122"])

# ---- StorageManager Lifecycle ----

_manager: StorageManager | None = None

@asynccontextmanager
async def lifespan(app: FastAPI):
    """Create StorageManager on application startup, release on shutdown."""
    global _manager
    _manager = StorageManager(
        mode_config=StorageModeConfig(mode=StorageMode.HYBRID),
        s3_config=s3_config,
        fastdfs_config=fastdfs_config,
    )
    async with _manager:
        yield

app = FastAPI(lifespan=lifespan)

# ---- Access Services ----

def get_manager() -> StorageManager:
    return _manager

File Upload Endpoint

The upload endpoint receives an UploadFile, streams it to S3 via the fusion service, and returns the fusion-format file_id.

@app.post("/files/upload")
async def upload_file(file: UploadFile = File(...)):
    attach = await get_manager().upload_service.upload_file(file, folder="uploads")
    return {"file_id": attach.file_id, "file_name": attach.file_name}


@app.post("/files/batch-upload")
async def batch_upload(files: list[UploadFile] = File(...)):
    results = await get_manager().upload_service.upload_files(files, folder="batch")
    return [{"file_id": r.file_id, "file_name": r.file_name} for r in results]

File Download Endpoints

Single file downloads retrieve content via download_bytes and return it directly. Batch downloads use stream_zip to return a streaming ZIP response. Data is written to a queue asynchronously in the background and read from the queue on the foreground side, keeping memory usage constant.

from starlette.responses import Response

@app.get("/files/{file_id:path}/download")
async def download_file(file_id: str):
    data = await get_manager().download_service.download_bytes(file_id)
    return Response(content=data, media_type="application/octet-stream")


@app.post("/files/download-zip")
async def download_zip(body: list[FileDownload]):
    return StreamingResponse(
        get_manager().download_service.stream_zip(body),
        media_type="application/zip",
        headers={"Content-Disposition": "attachment; filename=files.zip"},
    )

The return value of stream_zip can be used directly as the content body of a StreamingResponse. The list of FileDownload objects passed by the requester can contain both S3 files (with s3: prefix) and FastDFS files; the fusion layer automatically identifies the source and pulls data from the corresponding backend.

File Deletion and Metadata Query Endpoints

The deletion and metadata query endpoints directly accept fusion-format file_ids. The services automatically route based on the prefix.

@app.delete("/files/{file_id:path}")
async def delete_file(file_id: str):
    await get_manager().delete_service.delete_file(file_id)
    return {"detail": "deleted"}


@app.get("/files/{file_id:path}/info")
async def get_file_info(file_id: str):
    info = await get_manager().info_service.get_file_info(file_id)
    return {
        "file_id": info.file_id,
        "file_name": info.file_name,
        "file_size": info.file_size,
        "last_modify_time": info.last_modify_time.isoformat() if info.last_modify_time else None,
    }

Exception Handling

The component defines a clear exception hierarchy. It is recommended to register global exception handlers in FastAPI to map storage exceptions to appropriate HTTP status codes. The base class StorageException maps to 500, FileNotFoundException maps to 404, InvalidArgumentException and InvalidFileTypeException map to 400, and DeleteNotSupportedException and RouteNotSupportedException map to 403.

from fastapi.responses import JSONResponse
from s3py_helper.storage import (
    StorageException, FileNotFoundException,
    InvalidArgumentException, InvalidFileTypeException,
    DeleteNotSupportedException, RouteNotSupportedException,
)

@app.exception_handler(FileNotFoundException)
async def not_found_handler(request, exc):
    return JSONResponse(status_code=404, content={"detail": str(exc)})

@app.exception_handler(InvalidArgumentException)
@app.exception_handler(InvalidFileTypeException)
async def bad_request_handler(request, exc):
    return JSONResponse(status_code=400, content={"detail": str(exc)})

@app.exception_handler(DeleteNotSupportedException)
@app.exception_handler(RouteNotSupportedException)
async def forbidden_handler(request, exc):
    return JSONResponse(status_code=403, content={"detail": str(exc)})

@app.exception_handler(StorageException)
async def storage_error_handler(request, exc):
    return JSONResponse(status_code=500, content={"detail": str(exc)})

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

s3py_helper-0.0.13.tar.gz (25.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

s3py_helper-0.0.13-py3-none-any.whl (41.4 kB view details)

Uploaded Python 3

File details

Details for the file s3py_helper-0.0.13.tar.gz.

File metadata

  • Download URL: s3py_helper-0.0.13.tar.gz
  • Upload date:
  • Size: 25.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.8.8

File hashes

Hashes for s3py_helper-0.0.13.tar.gz
Algorithm Hash digest
SHA256 93dc2981831f4484fc1956ac9bf3bba6c0b5d31abaf376e55abb8513c1acdaac
MD5 996af672a796d630b988df0bd7cf6a16
BLAKE2b-256 41c32e7ac8702b4e09f35f8d45301885918f8911e822716a63e5654a2003184f

See more details on using hashes here.

File details

Details for the file s3py_helper-0.0.13-py3-none-any.whl.

File metadata

File hashes

Hashes for s3py_helper-0.0.13-py3-none-any.whl
Algorithm Hash digest
SHA256 229f9a4c1b442f837d1a136ac73fc2aba842684609101fb2ef7edae36094c868
MD5 7796bd39519f10a09e6d4801eba67e69
BLAKE2b-256 aa817bc0ecbb0d060ddbeab4e0e8fd202d096931d5885bd67a5f2566e2f10d7f

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page