Skip to main content

fsspec filesystem for BOS

Project description

fsspec-bosfs

fsspec-bosfs(以下简称bosfs)是一个基于python接口的文件系统实现,用以便捷访问百度云对象存储BOS服务。通过bosfs,用户可以使用标准的fsspec接口操作存储于BOS中的数据。

安装

通过pip安装bosfs:

pip install bosfs

查看是否安装成功:

pip show bosfs

快速开始

配置访问凭证

支持以下2种方式,完成任一方式即可,有优先级。

  1. 初始化BOSFileSystem时通过参数指定:
import bosfs

fs = bosfs.BOSFileSystem(endpoint='http://bj.bcebos.com', access_key='{your ak}', secret_key='{your sk}', sts_token=None)
  1. 配置环境变量
export BCE_ACCESS_KEY_ID=xxx
export BCE_SECRET_ACCESS_KEY=xxx
export BOS_ENDPOINT=xxx

示例

  1. 通过bosfs列举bos上的数据
import bosfs

fs = bosfs.BOSFileSystem(endpoint='http://bj.bcebos.com', access_key='xxx', secret_key='xxx')
res = fs.ls('/mybucket/')
print(res)
  1. 通过bosfs读取bos上的数据
import bosfs

fs = bosfs.BOSFileSystem(endpoint='http://bj.bcebos.com', access_key='xxx', secret_key='xxx')
with fs.open('/mybucket/README.md') as f:
    print(f.readline())
  1. 通过bosfs写入数据到bos
import bosfs
import os

fs = bosfs.BOSFileSystem(endpoint='http://bj.bcebos.com', access_key='xxx', secret_key='xxx')
object_name = "file1"
data = os.urandom(10 * 2**20)
with bosfs.open(object_name, "wb") as f_wb:
    f_wb.write(data)

更多使用方法可参考fsspec文档

读写性能

性能测试结果:

单文件顺序读写性能:

file_size_mb write_speed_mbps read_speed_mbps write_time_s read_time_s
1 9.672178 4.640667 0.104893 0.216561
4 24.867180 15.987999 0.160897 0.250713
10 30.681564 30.890233 0.343196 0.325457
128 75.634190 93.547005 1.692568 1.369047
512 80.359467 97.840121 6.371391 5.233074

多文件并发读写性能:

file_nums file_size_mb threads_nums write_speed_mbps read_speed_mbps write_time_s read_time_s
1000 4 16 381.91 301.56 10.47 13.26

测试环境:

  • 测试机器: 16æ ¸32G内存, 3Gbps网络带宽

测试代码:

import bosfs
import os
import time
import numpy as np
from datetime import datetime
import pandas as pd
import matplotlib.pyplot as plt
from tqdm import tqdm

def test_bosfs_performance(bucket_name, file_sizes=None, num_iterations=3, plot_results=True):
    """
    测试 BosFS 的读写性能
    
    参数:
    bucket_name: BOS 桶名称
    file_sizes: 要测试的文件大小列表(以MB为单位)
    num_iterations: 每个文件大小重复测试的次数
    plot_results: 是否绘制性能图表
    """
    # 初始化 BosFS
    fs = bosfs.BOSFileSystem(endpoint='http://bj.bcebos.com', access_key='xxx', secret_key='xxx')
    
    # 如果未指定文件大小,则使用默认值
    if file_sizes is None:
        file_sizes = [1, 10, 50, 100, 250, 500]  # MB
    
    # 创建结果存储结构
    results = {
        'file_size_mb': [],
        'write_speed_mbps': [],
        'read_speed_mbps': [],
        'write_time_s': [],
        'read_time_s': []
    }
    
    # 对每个文件大小进行测试
    for size_mb in file_sizes:
        print(f"\n测试文件大小: {size_mb} MB")
        size_bytes = size_mb * 2**20
        
        write_speeds = []
        read_speeds = []
        write_times = []
        read_times = []
        
        # 重复测试多次以获得平均性能
        for i in range(num_iterations):
            # 生成随机数据
            print(f"  迭代 {i+1}/{num_iterations}: 生成随机数据...")
            data = os.urandom(size_bytes)
            
            # 构建唯一的对象名称
            timestamp = datetime.now().strftime("%Y%m%d%H%M%S")
            object_name = f"{bucket_name}/perf_test_{size_mb}mb_{timestamp}_{i}.dat"
            
            # 测试写入性能
            print(f"  迭代 {i+1}/{num_iterations}: 测试写入性能...")
            write_start = time.time()
            with fs.open(object_name, "wb") as f_wb:
                f_wb.write(data)
            write_end = time.time()
            
            write_time = write_end - write_start
            write_speed = size_mb / write_time  # MB/s
            
            write_times.append(write_time)
            write_speeds.append(write_speed)
            
            # 测试读取性能
            print(f"  迭代 {i+1}/{num_iterations}: 测试读取性能...")
            read_start = time.time()
            with fs.open(object_name, "rb") as f_rb:
                read_data = f_rb.read()
            read_end = time.time()
            
            read_time = read_end - read_start
            read_speed = size_mb / read_time  # MB/s
            
            read_times.append(read_time)
            read_speeds.append(read_speed)
            
            # 验证数据完整性
            if len(read_data) != len(data):
                print(f"  警告: 读取的数据大小({len(read_data)})与写入的数据大小({len(data)})不匹配!")
            
            # 清理测试文件
            try:
                fs.rm(object_name)
                print(f"  已删除测试文件: {object_name}")
            except Exception as e:
                print(f"  删除文件时出错: {e}")
        
        # 计算平均性能
        avg_write_speed = np.mean(write_speeds)
        avg_read_speed = np.mean(read_speeds)
        avg_write_time = np.mean(write_times)
        avg_read_time = np.mean(read_times)
        
        # 存储结果
        results['file_size_mb'].append(size_mb)
        results['write_speed_mbps'].append(avg_write_speed)
        results['read_speed_mbps'].append(avg_read_speed)
        results['write_time_s'].append(avg_write_time)
        results['read_time_s'].append(avg_read_time)
        
        print(f"  平均写入速度: {avg_write_speed:.2f} MB/s, 平均写入时间: {avg_write_time:.2f} 秒")
        print(f"  平均读取速度: {avg_read_speed:.2f} MB/s, 平均读取时间: {avg_read_time:.2f} 秒")
    
    # 创建DataFrame以便于分析
    results_df = pd.DataFrame(results)
    
    # 打印结果表格
    print("\n性能测试结果:")
    print(results_df.to_string(index=False))
    
    if plot_results:
        # 绘制性能图表
        fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 6))
        
        # 速度图表
        ax1.plot(results['file_size_mb'], results['write_speed_mbps'], 'b-o', label='Write Speed')
        ax1.plot(results['file_size_mb'], results['read_speed_mbps'], 'r-o', label='Read Speed')
        ax1.set_xlabel('File Size (MB)')
        ax1.set_ylabel('Speed (MB/s)')
        ax1.set_title('BosFS Read/Write Speed')
        ax1.legend()
        ax1.grid(True)
        
        # 时间图表
        ax2.plot(results['file_size_mb'], results['write_time_s'], 'b-o', label='Write Cost')
        ax2.plot(results['file_size_mb'], results['read_time_s'], 'r-o', label='Read Cost')
        ax2.set_xlabel('File Size (MB)')
        ax2.set_ylabel('Cost (Second)')
        ax2.set_title('BosFS Read/Write Cost (Second)')
        ax2.legend()
        ax2.grid(True)
        
        plt.tight_layout()
        plt.savefig('bosfs_performance.png')
        plt.show()
    
    return results_df


def test_bosfs_parallel_performance(bucket_name, file_size_mb=10, num_files=5, num_threads=4):
    """
    测试 BosFS 的并行读写性能
    
    参数:
    bucket_name: BOS 桶名称
    file_size_mb: 每个文件的大小(MB)
    num_files: 要写入/读取的文件数量
    num_threads: 并行线程数
    """
    from concurrent.futures import ThreadPoolExecutor
    
    # 初始化 BosFS
    fs = bosfs.BOSFileSystem(endpoint='http://bj.bcebos.com', access_key='xxx', secret_key='xxx')
    
    file_size_bytes = file_size_mb * 2**20
    timestamp = datetime.now().strftime("%Y%m%d%H%M%S")
    
    # 生成文件名列表
    object_names = [f"{bucket_name}/parallel_test_{timestamp}_{i}.dat" for i in range(num_files)]
    
    # 生成随机数据
    print(f"生成 {num_files} 个 {file_size_mb} MB 的随机文件...")
    data_list = [os.urandom(file_size_bytes) for _ in range(num_files)]
    
    # 并行写入函数
    def write_file(args):
        object_name, data = args
        try:
            with fs.open(object_name, "wb") as f_wb:
                f_wb.write(data)
            return True
        except Exception as e:
            print(f"写入 {object_name} 时出错: {e}")
            return False
    
    # 并行读取函数
    def read_file(object_name):
        try:
            with fs.open(object_name, "rb") as f_rb:
                data = f_rb.read()
            return len(data)
        except Exception as e:
            print(f"读取 {object_name} 时出错: {e}")
            return 0
    
    # 测试并行写入
    print(f"\n测试并行写入 ({num_threads} 线程)...")
    write_start = time.time()
    
    with ThreadPoolExecutor(max_workers=num_threads) as executor:
        results = list(tqdm(
            executor.map(write_file, zip(object_names, data_list)),
            total=num_files,
            desc="写入进度"
        ))
    
    write_end = time.time()
    write_time = write_end - write_start
    total_data_size_mb = file_size_mb * num_files
    write_speed = total_data_size_mb / write_time
    
    print(f"并行写入完成: {sum(results)}/{num_files} 个文件成功")
    print(f"总写入时间: {write_time:.2f} 秒")
    print(f"总数据大小: {total_data_size_mb} MB")
    print(f"平均写入速度: {write_speed:.2f} MB/s")
    
    # 测试并行读取
    print(f"\n测试并行读取 ({num_threads} 线程)...")
    read_start = time.time()
    
    with ThreadPoolExecutor(max_workers=num_threads) as executor:
        read_sizes = list(tqdm(
            executor.map(read_file, object_names),
            total=num_files,
            desc="读取进度"
        ))
    
    read_end = time.time()
    read_time = read_end - read_start
    read_speed = total_data_size_mb / read_time
    
    print(f"并行读取完成: {sum(1 for size in read_sizes if size > 0)}/{num_files} 个文件成功")
    print(f"总读取时间: {read_time:.2f} 秒")
    print(f"平均读取速度: {read_speed:.2f} MB/s")
    
    # 清理测试文件
    print("\n清理测试文件...")
    for object_name in tqdm(object_names, desc="删除文件"):
        try:
            fs.rm(object_name)
        except Exception as e:
            print(f"删除 {object_name} 时出错: {e}")
    
    return {
        'parallel_write_speed_mbps': write_speed,
        'parallel_read_speed_mbps': read_speed,
        'parallel_write_time_s': write_time,
        'parallel_read_time_s': read_time
    }


if __name__ == "__main__":
    # 设置 BOS 桶名称
    bucket_name = "your-bucket-name"
    
    # 运行单文件读写性能测试
    print("=" * 80)
    print("开始 BosFS 单文件读写性能测试")
    print("=" * 80)
    
    # 自定义文件大小列表 (MB)
    file_sizes = [1, 10, 50, 100]
    
    # 运行性能测试
    results_df = test_bosfs_performance(bucket_name, file_sizes=file_sizes, num_iterations=3)
    
    # 运行并行性能测试
    print("\n" + "=" * 80)
    print("开始 BosFS 并行读写性能测试")
    print("=" * 80)
    
    parallel_results = test_bosfs_parallel_performance(
        bucket_name, 
        file_size_mb=4,     # 每个文件 4MB
        num_files=1000,     # 总共 1000 个文件
        num_threads=16      # 使用 16 个线程
    )
    
    # 保存结果到 CSV 文件
    results_df.to_csv('bosfs_performance_results.csv', index=False)
    print("\n测试结果已保存到 bosfs_performance_results.csv")

使用限制

当前bosfs暂不支持async模式。

版本信息

v1.0.0 - 2025-03-20

New features

  • 首次提交
  • 支持sync bosfs

Breaking changes

  • 无

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distributions

No source distribution files available for this release.See tutorial on generating distribution archives.

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

bosfs-1.0.0-py3-none-any.whl (19.8 kB view details)

Uploaded Python 3

File details

Details for the file bosfs-1.0.0-py3-none-any.whl.

File metadata

  • Download URL: bosfs-1.0.0-py3-none-any.whl
  • Upload date:
  • Size: 19.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.8.10

File hashes

Hashes for bosfs-1.0.0-py3-none-any.whl
Algorithm Hash digest
SHA256 71ce9a75c54ef2b68c663e8a0c869f7d37162d558d4fb2617ae2d7940d2b6819
MD5 c1251a348bab98c79bbafbacd9774ce0
BLAKE2b-256 0fa014cb7641efd331299c1a25f0f6ab758717fa6b08b448d1509ef85c70bbc5

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page