Skip to main content

fsspec filesystem for BOS

Project description

fsspec-bosfs

fsspec-bosfs (abbreviated as bosfs) is a Python-based file system implementation for convenient access to (BOS)Baidu Cloud Object Storage services. Through bosfs, users can use the standard fsspec interface to operate data stored in BOS.

Installation

Install bosfs via pip:

pip install bosfs

Check if the installation is successful:

pip show bosfs

Quick Start

Configure Access Credentials

Support the following two methods, complete any one of them, with priority.

  1. Initialize BOSFileSystem by specifying parameters:
import bosfs

fs = bosfs.BOSFileSystem(endpoint='http://bj.bcebos.com', access_key='{your ak}', secret_key='{your sk}', sts_token=None)
  1. Configure Environment Variables
export BCE_ACCESS_KEY_ID=xxx
export BCE_SECRET_ACCESS_KEY=xxx
export BOS_ENDPOINT=xxx

Examples

  1. List data on BOS using bosfs
import bosfs

fs = bosfs.BOSFileSystem(endpoint='http://bj.bcebos.com', access_key='xxx', secret_key='xxx')
res = fs.ls('/mybucket/')
print(res)
  1. Read data from BOS using bosfs
import bosfs

fs = bosfs.BOSFileSystem(endpoint='http://bj.bcebos.com', access_key='xxx', secret_key='xxx')
with fs.open('/mybucket/README.md') as f:
    print(f.readline())
  1. Write data to BOS using bosfs
import bosfs
import os

fs = bosfs.BOSFileSystem(endpoint='http://bj.bcebos.com', access_key='xxx', secret_key='xxx')
object_name = "file1"
data = os.urandom(10 * 2**20)
with bosfs.open(object_name, "wb") as f_wb:
    f_wb.write(data)

For more usage methods, refer to the documentation fsspec.

Performance

Test Results:

Sequential read/write performance for single file

file_size_mb write_speed_mbps read_speed_mbps write_time_s read_time_s
1 9.672178 4.640667 0.104893 0.216561
4 24.867180 15.987999 0.160897 0.250713
10 30.681564 30.890233 0.343196 0.325457
128 75.634190 93.547005 1.692568 1.369047
512 80.359467 97.840121 6.371391 5.233074

Concurrent read/write performance for multiple files

file_nums file_size_mb threads_nums write_speed_mbps read_speed_mbps write_time_s read_time_s
1000 4 16 381.91 301.56 10.47 13.26

Test Environment:

  • Test Machine: 16-core 32G memory, 3Gbps network bandwidth

Test Code:

import bosfs
import os
import time
import numpy as np
from datetime import datetime
import pandas as pd
import matplotlib.pyplot as plt
from tqdm import tqdm

def test_bosfs_performance(bucket_name, file_sizes=None, num_iterations=3, plot_results=True):
    """
    Test the read/write performance of BosFS
    
    Parameters:
    bucket_name: BOS bucket name
    file_sizes: List of file sizes (in MB) to test
    num_iterations: Number of times to repeat the test for each file size
    plot_results: Whether to plot performance charts
    """
    # Initialize BosFS
    fs = bosfs.BOSFileSystem(endpoint='http://bj.bcebos.com', access_key='xxx', secret_key='xxx')
    
    # Use default file sizes if not specified
    if file_sizes is None:
        file_sizes = [1, 10, 50, 100, 250, 500]  # MB
    
    # Create a structure to store results
    results = {
        'file_size_mb': [],
        'write_speed_mbps': [],
        'read_speed_mbps': [],
        'write_time_s': [],
        'read_time_s': []
    }
    
    # Test for each file size
    for size_mb in file_sizes:
        print(f"\nTesting file size: {size_mb} MB")
        size_bytes = size_mb * 2**20
        
        write_speeds = []
        read_speeds = []
        write_times = []
        read_times = []
        
        # Repeat the test multiple times for average performance
        for i in range(num_iterations):
            # Generate random data
            print(f"  Iteration {i+1}/{num_iterations}: Generating random data...")
            data = os.urandom(size_bytes)
            
            # Construct a unique object name
            timestamp = datetime.now().strftime("%Y%m%d%H%M%S")
            object_name = f"{bucket_name}/perf_test_{size_mb}mb_{timestamp}_{i}.dat"
            
            # Test write performance
            print(f"  Iteration {i+1}/{num_iterations}: Testing write performance...")
            write_start = time.time()
            with fs.open(object_name, "wb") as f_wb:
                f_wb.write(data)
            write_end = time.time()
            
            write_time = write_end - write_start
            write_speed = size_mb / write_time  # MB/s
            
            write_times.append(write_time)
            write_speeds.append(write_speed)
            
            # Test read performance
            print(f"  Iteration {i+1}/{num_iterations}: Testing read performance...")
            read_start = time.time()
            with fs.open(object_name, "rb") as f_rb:
                read_data = f_rb.read()
            read_end = time.time()
            
            read_time = read_end - read_start
            read_speed = size_mb / read_time  # MB/s
            
            read_times.append(read_time)
            read_speeds.append(read_speed)
            
            # Verify data integrity
            if len(read_data) != len(data):
                print(f"  Warning: Read data size ({len(read_data)}) does not match written data size ({len(data)})!")
            
            # Clean up test file
            try:
                fs.rm(object_name)
                print(f"  Deleted test file: {object_name}")
            except Exception as e:
                print(f"  Error deleting file: {e}")
        
        # Calculate average performance
        avg_write_speed = np.mean(write_speeds)
        avg_read_speed = np.mean(read_speeds)
        avg_write_time = np.mean(write_times)
        avg_read_time = np.mean(read_times)
        
        # Store results
        results['file_size_mb'].append(size_mb)
        results['write_speed_mbps'].append(avg_write_speed)
        results['read_speed_mbps'].append(avg_read_speed)
        results['write_time_s'].append(avg_write_time)
        results['read_time_s'].append(avg_read_time)
        
        print(f"  Average write speed: {avg_write_speed:.2f} MB/s, Average write time: {avg_write_time:.2f} seconds")
        print(f"  Average read speed: {avg_read_speed:.2f} MB/s, Average read time: {avg_read_time:.2f} seconds")
    
    # Create DataFrame for analysis
    results_df = pd.DataFrame(results)
    
    # Print results table
    print("\nPerformance test results:")
    print(results_df.to_string(index=False))
    
    if plot_results:
        # Plot performance charts
        fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 6))
        
        # Speed chart
        ax1.plot(results['file_size_mb'], results['write_speed_mbps'], 'b-o', label='Write Speed')
        ax1.plot(results['file_size_mb'], results['read_speed_mbps'], 'r-o', label='Read Speed')
        ax1.set_xlabel('File Size (MB)')
        ax1.set_ylabel('Speed (MB/s)')
        ax1.set_title('BosFS Read/Write Speed')
        ax1.legend()
        ax1.grid(True)
        
        # Time chart
        ax2.plot(results['file_size_mb'], results['write_time_s'], 'b-o', label='Write Cost')
        ax2.plot(results['file_size_mb'], results['read_time_s'], 'r-o', label='Read Cost')
        ax2.set_xlabel('File Size (MB)')
        ax2.set_ylabel('Cost (Second)')
        ax2.set_title('BosFS Read/Write Cost (Second)')
        ax2.legend()
        ax2.grid(True)
        
        plt.tight_layout()
        plt.savefig('bosfs_performance.png')
        plt.show()
    
    return results_df


def test_bosfs_parallel_performance(bucket_name, file_size_mb=10, num_files=5, num_threads=4):
    """
    Test the parallel read/write performance of BosFS
    
    Parameters:
    bucket_name: BOS bucket name
    file_size_mb: Size of each file (MB)
    num_files: Number of files to write/read
    num_threads: Number of parallel threads
    """
    from concurrent.futures import ThreadPoolExecutor
    
    # Initialize BosFS
    fs = bosfs.BOSFileSystem(endpoint='http://bj.bcebos.com', access_key='xxx', secret_key='xxx')
    
    file_size_bytes = file_size_mb * 2**20
    timestamp = datetime.now().strftime("%Y%m%d%H%M%S")
    
    # Generate list of file names
    object_names = [f"{bucket_name}/parallel_test_{timestamp}_{i}.dat" for i in range(num_files)]
    
    # Generate random data
    print(f"Generating {num_files} {file_size_mb} MB random files...")
    data_list = [os.urandom(file_size_bytes) for _ in range(num_files)]
    
    # Parallel write function
    def write_file(args):
        object_name, data = args
        try:
            with fs.open(object_name, "wb") as f_wb:
                f_wb.write(data)
            return True
        except Exception as e:
            print(f"Error writing {object_name}: {e}")
            return False
    
    # Parallel read function
    def read_file(object_name):
        try:
            with fs.open(object_name, "rb") as f_rb:
                data = f_rb.read()
            return len(data)
        except Exception as e:
            print(f"Error reading {object_name}: {e}")
            return 0
    
    # Test parallel write
    print(f"\nTesting parallel write ({num_threads} threads)...")
    write_start = time.time()
    
    with ThreadPoolExecutor(max_workers=num_threads) as executor:
        results = list(tqdm(
            executor.map(write_file, zip(object_names, data_list)),
            total=num_files,
            desc="Write Progress"
        ))
    
    write_end = time.time()
    write_time = write_end - write_start
    total_data_size_mb = file_size_mb * num_files
    write_speed = total_data_size_mb / write_time
    
    print(f"Parallel write complete: {sum(results)}/{num_files} files successful")
    print(f"Total write time: {write_time:.2f} seconds")
    print(f"Total data size: {total_data_size_mb} MB")
    print(f"Average write speed: {write_speed:.2f} MB/s")
    
    # Test parallel read
    print(f"\nTesting parallel read ({num_threads} threads)...")
    read_start = time.time()
    
    with ThreadPoolExecutor(max_workers=num_threads) as executor:
        read_sizes = list(tqdm(
            executor.map(read_file, object_names),
            total=num_files,
            desc="Read Progress"
        ))
    
    read_end = time.time()
    read_time = read_end - read_start
    read_speed = total_data_size_mb / read_time
    
    print(f"Parallel read complete: {sum(1 for size in read_sizes if size > 0)}/{num_files} files successful")
    print(f"Total read time: {read_time:.2f} seconds")
    print(f"Average read speed: {read_speed:.2f} MB/s")
    
    # Clean up test files
    print("\nCleaning up test files...")
    for object_name in tqdm(object_names, desc="Deleting files"):
        try:
            fs.rm(object_name)
        except Exception as e:
            print(f"Error deleting {object_name}: {e}")
    
    return {
        'parallel_write_speed_mbps': write_speed,
        'parallel_read_speed_mbps': read_speed,
        'parallel_write_time_s': write_time,
        'parallel_read_time_s': read_time
    }


if __name__ == "__main__":
    # Set BOS bucket name
    bucket_name = "your-bucket-name"
    
    # Run single file read/write performance test
    print("=" * 80)
    print("Starting BosFS Single File Read/Write Performance Test")
    print("=" * 80)
    
    # Custom file size list (MB)
    file_sizes = [1, 10, 50, 100]
    
    # Run performance test
    results_df = test_bosfs_performance(bucket_name, file_sizes=file_sizes, num_iterations=3)
    
    # Run parallel performance test
    print("\n" + "=" * 80)
    print("Starting BosFS Parallel Read/Write Performance Test")
    print("=" * 80)
    
    parallel_results = test_bosfs_parallel_performance(
        bucket_name, 
        file_size_mb=4,     # Each file is 4MB
        num_files=1000,     # Total of 1000 files
        num_threads=16      # Use 16 threads
    )
    
    # Save results to CSV file
    results_df.to_csv('bosfs_performance_results.csv', index=False)
    print("\nTest results saved to bosfs_performance_results.csv")

Usage Limitations

  • Does not support async mode.

Versions

v1.1.1 - 2025-03-20

New features

  • Fix messy code

Breaking changes

  • None

v1.0.0 - 2025-03-20

New features

  • First release
  • Supports sync bosfs

Breaking changes

  • None

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distributions

No source distribution files available for this release.See tutorial on generating distribution archives.

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

bosfs-1.0.1-py3-none-any.whl (19.3 kB view details)

Uploaded Python 3

File details

Details for the file bosfs-1.0.1-py3-none-any.whl.

File metadata

  • Download URL: bosfs-1.0.1-py3-none-any.whl
  • Upload date:
  • Size: 19.3 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.8.10

File hashes

Hashes for bosfs-1.0.1-py3-none-any.whl
Algorithm Hash digest
SHA256 28523e3bd94c29a8c486e4996a598e466e71d8ba35efd3b120c19daf32f1212e
MD5 1731eefbae5bff42f6fe5bf36dafe4f9
BLAKE2b-256 b9733f2523a8043571698d6482038dd1dee2b27f6bb401988f09b2535c9e4e1b

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page