fsspec filesystem for BOS
Project description
fsspec-bosfs
fsspec-bosfs (abbreviated as bosfs) is a Python-based file system implementation for convenient access to (BOS)Baidu Cloud Object Storage services. Through bosfs, users can use the standard fsspec interface to operate data stored in BOS.
Installation
Install bosfs via pip:
pip install bosfs
Check if the installation is successful:
pip show bosfs
Quick Start
Configure Access Credentials
Support the following two methods, complete any one of them, with priority.
- Initialize BOSFileSystem by specifying parameters:
import bosfs
fs = bosfs.BOSFileSystem(endpoint='http://bj.bcebos.com', access_key='{your ak}', secret_key='{your sk}', sts_token=None)
- Configure Environment Variables
export BCE_ACCESS_KEY_ID=xxx
export BCE_SECRET_ACCESS_KEY=xxx
export BOS_ENDPOINT=xxx
Examples
- List data on BOS using bosfs
import bosfs
fs = bosfs.BOSFileSystem(endpoint='http://bj.bcebos.com', access_key='xxx', secret_key='xxx')
res = fs.ls('/mybucket/')
print(res)
- Read data from BOS using bosfs
import bosfs
fs = bosfs.BOSFileSystem(endpoint='http://bj.bcebos.com', access_key='xxx', secret_key='xxx')
with fs.open('/mybucket/README.md') as f:
print(f.readline())
- Write data to BOS using bosfs
import bosfs
import os
fs = bosfs.BOSFileSystem(endpoint='http://bj.bcebos.com', access_key='xxx', secret_key='xxx')
object_name = "file1"
data = os.urandom(10 * 2**20)
with bosfs.open(object_name, "wb") as f_wb:
f_wb.write(data)
For more usage methods, refer to the documentation fsspec.
Performance
Test Results:
Sequential read/write performance for single file
| file_size_mb | write_speed_mbps | read_speed_mbps | write_time_s | read_time_s |
|---|---|---|---|---|
| 1 | 9.672178 | 4.640667 | 0.104893 | 0.216561 |
| 4 | 24.867180 | 15.987999 | 0.160897 | 0.250713 |
| 10 | 30.681564 | 30.890233 | 0.343196 | 0.325457 |
| 128 | 75.634190 | 93.547005 | 1.692568 | 1.369047 |
| 512 | 80.359467 | 97.840121 | 6.371391 | 5.233074 |
Concurrent read/write performance for multiple files
| file_nums | file_size_mb | threads_nums | write_speed_mbps | read_speed_mbps | write_time_s | read_time_s |
|---|---|---|---|---|---|---|
| 1000 | 4 | 16 | 381.91 | 301.56 | 10.47 | 13.26 |
Test Environment:
- Test Machine: 16-core 32G memory, 3Gbps network bandwidth
Test Code:
import bosfs
import os
import time
import numpy as np
from datetime import datetime
import pandas as pd
import matplotlib.pyplot as plt
from tqdm import tqdm
def test_bosfs_performance(bucket_name, file_sizes=None, num_iterations=3, plot_results=True):
"""
Test the read/write performance of BosFS
Parameters:
bucket_name: BOS bucket name
file_sizes: List of file sizes (in MB) to test
num_iterations: Number of times to repeat the test for each file size
plot_results: Whether to plot performance charts
"""
# Initialize BosFS
fs = bosfs.BOSFileSystem(endpoint='http://bj.bcebos.com', access_key='xxx', secret_key='xxx')
# Use default file sizes if not specified
if file_sizes is None:
file_sizes = [1, 10, 50, 100, 250, 500] # MB
# Create a structure to store results
results = {
'file_size_mb': [],
'write_speed_mbps': [],
'read_speed_mbps': [],
'write_time_s': [],
'read_time_s': []
}
# Test for each file size
for size_mb in file_sizes:
print(f"\nTesting file size: {size_mb} MB")
size_bytes = size_mb * 2**20
write_speeds = []
read_speeds = []
write_times = []
read_times = []
# Repeat the test multiple times for average performance
for i in range(num_iterations):
# Generate random data
print(f" Iteration {i+1}/{num_iterations}: Generating random data...")
data = os.urandom(size_bytes)
# Construct a unique object name
timestamp = datetime.now().strftime("%Y%m%d%H%M%S")
object_name = f"{bucket_name}/perf_test_{size_mb}mb_{timestamp}_{i}.dat"
# Test write performance
print(f" Iteration {i+1}/{num_iterations}: Testing write performance...")
write_start = time.time()
with fs.open(object_name, "wb") as f_wb:
f_wb.write(data)
write_end = time.time()
write_time = write_end - write_start
write_speed = size_mb / write_time # MB/s
write_times.append(write_time)
write_speeds.append(write_speed)
# Test read performance
print(f" Iteration {i+1}/{num_iterations}: Testing read performance...")
read_start = time.time()
with fs.open(object_name, "rb") as f_rb:
read_data = f_rb.read()
read_end = time.time()
read_time = read_end - read_start
read_speed = size_mb / read_time # MB/s
read_times.append(read_time)
read_speeds.append(read_speed)
# Verify data integrity
if len(read_data) != len(data):
print(f" Warning: Read data size ({len(read_data)}) does not match written data size ({len(data)})!")
# Clean up test file
try:
fs.rm(object_name)
print(f" Deleted test file: {object_name}")
except Exception as e:
print(f" Error deleting file: {e}")
# Calculate average performance
avg_write_speed = np.mean(write_speeds)
avg_read_speed = np.mean(read_speeds)
avg_write_time = np.mean(write_times)
avg_read_time = np.mean(read_times)
# Store results
results['file_size_mb'].append(size_mb)
results['write_speed_mbps'].append(avg_write_speed)
results['read_speed_mbps'].append(avg_read_speed)
results['write_time_s'].append(avg_write_time)
results['read_time_s'].append(avg_read_time)
print(f" Average write speed: {avg_write_speed:.2f} MB/s, Average write time: {avg_write_time:.2f} seconds")
print(f" Average read speed: {avg_read_speed:.2f} MB/s, Average read time: {avg_read_time:.2f} seconds")
# Create DataFrame for analysis
results_df = pd.DataFrame(results)
# Print results table
print("\nPerformance test results:")
print(results_df.to_string(index=False))
if plot_results:
# Plot performance charts
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 6))
# Speed chart
ax1.plot(results['file_size_mb'], results['write_speed_mbps'], 'b-o', label='Write Speed')
ax1.plot(results['file_size_mb'], results['read_speed_mbps'], 'r-o', label='Read Speed')
ax1.set_xlabel('File Size (MB)')
ax1.set_ylabel('Speed (MB/s)')
ax1.set_title('BosFS Read/Write Speed')
ax1.legend()
ax1.grid(True)
# Time chart
ax2.plot(results['file_size_mb'], results['write_time_s'], 'b-o', label='Write Cost')
ax2.plot(results['file_size_mb'], results['read_time_s'], 'r-o', label='Read Cost')
ax2.set_xlabel('File Size (MB)')
ax2.set_ylabel('Cost (Second)')
ax2.set_title('BosFS Read/Write Cost (Second)')
ax2.legend()
ax2.grid(True)
plt.tight_layout()
plt.savefig('bosfs_performance.png')
plt.show()
return results_df
def test_bosfs_parallel_performance(bucket_name, file_size_mb=10, num_files=5, num_threads=4):
"""
Test the parallel read/write performance of BosFS
Parameters:
bucket_name: BOS bucket name
file_size_mb: Size of each file (MB)
num_files: Number of files to write/read
num_threads: Number of parallel threads
"""
from concurrent.futures import ThreadPoolExecutor
# Initialize BosFS
fs = bosfs.BOSFileSystem(endpoint='http://bj.bcebos.com', access_key='xxx', secret_key='xxx')
file_size_bytes = file_size_mb * 2**20
timestamp = datetime.now().strftime("%Y%m%d%H%M%S")
# Generate list of file names
object_names = [f"{bucket_name}/parallel_test_{timestamp}_{i}.dat" for i in range(num_files)]
# Generate random data
print(f"Generating {num_files} {file_size_mb} MB random files...")
data_list = [os.urandom(file_size_bytes) for _ in range(num_files)]
# Parallel write function
def write_file(args):
object_name, data = args
try:
with fs.open(object_name, "wb") as f_wb:
f_wb.write(data)
return True
except Exception as e:
print(f"Error writing {object_name}: {e}")
return False
# Parallel read function
def read_file(object_name):
try:
with fs.open(object_name, "rb") as f_rb:
data = f_rb.read()
return len(data)
except Exception as e:
print(f"Error reading {object_name}: {e}")
return 0
# Test parallel write
print(f"\nTesting parallel write ({num_threads} threads)...")
write_start = time.time()
with ThreadPoolExecutor(max_workers=num_threads) as executor:
results = list(tqdm(
executor.map(write_file, zip(object_names, data_list)),
total=num_files,
desc="Write Progress"
))
write_end = time.time()
write_time = write_end - write_start
total_data_size_mb = file_size_mb * num_files
write_speed = total_data_size_mb / write_time
print(f"Parallel write complete: {sum(results)}/{num_files} files successful")
print(f"Total write time: {write_time:.2f} seconds")
print(f"Total data size: {total_data_size_mb} MB")
print(f"Average write speed: {write_speed:.2f} MB/s")
# Test parallel read
print(f"\nTesting parallel read ({num_threads} threads)...")
read_start = time.time()
with ThreadPoolExecutor(max_workers=num_threads) as executor:
read_sizes = list(tqdm(
executor.map(read_file, object_names),
total=num_files,
desc="Read Progress"
))
read_end = time.time()
read_time = read_end - read_start
read_speed = total_data_size_mb / read_time
print(f"Parallel read complete: {sum(1 for size in read_sizes if size > 0)}/{num_files} files successful")
print(f"Total read time: {read_time:.2f} seconds")
print(f"Average read speed: {read_speed:.2f} MB/s")
# Clean up test files
print("\nCleaning up test files...")
for object_name in tqdm(object_names, desc="Deleting files"):
try:
fs.rm(object_name)
except Exception as e:
print(f"Error deleting {object_name}: {e}")
return {
'parallel_write_speed_mbps': write_speed,
'parallel_read_speed_mbps': read_speed,
'parallel_write_time_s': write_time,
'parallel_read_time_s': read_time
}
if __name__ == "__main__":
# Set BOS bucket name
bucket_name = "your-bucket-name"
# Run single file read/write performance test
print("=" * 80)
print("Starting BosFS Single File Read/Write Performance Test")
print("=" * 80)
# Custom file size list (MB)
file_sizes = [1, 10, 50, 100]
# Run performance test
results_df = test_bosfs_performance(bucket_name, file_sizes=file_sizes, num_iterations=3)
# Run parallel performance test
print("\n" + "=" * 80)
print("Starting BosFS Parallel Read/Write Performance Test")
print("=" * 80)
parallel_results = test_bosfs_parallel_performance(
bucket_name,
file_size_mb=4, # Each file is 4MB
num_files=1000, # Total of 1000 files
num_threads=16 # Use 16 threads
)
# Save results to CSV file
results_df.to_csv('bosfs_performance_results.csv', index=False)
print("\nTest results saved to bosfs_performance_results.csv")
Usage Limitations
- Does not support async mode.
Versions
v1.1.1 - 2025-03-20
New features
- Fix messy code
Breaking changes
- None
v1.0.0 - 2025-03-20
New features
- First release
- Supports sync bosfs
Breaking changes
- None
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distributions
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file bosfs-1.0.1-py3-none-any.whl.
File metadata
- Download URL: bosfs-1.0.1-py3-none-any.whl
- Upload date:
- Size: 19.3 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.8.10
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
28523e3bd94c29a8c486e4996a598e466e71d8ba35efd3b120c19daf32f1212e
|
|
| MD5 |
1731eefbae5bff42f6fe5bf36dafe4f9
|
|
| BLAKE2b-256 |
b9733f2523a8043571698d6482038dd1dee2b27f6bb401988f09b2535c9e4e1b
|