fsspec filesystem for BOS
Project description
fsspec-bosfs
fsspec-bosfs(以ä¸ç®ç§°bosfs)æ¯ä¸ä¸ªåºäºpythonæ¥å£çæä»¶ç³»ç»å®ç°ï¼ç¨ä»¥ä¾¿æ·è®¿é®ç¾åº¦äºå¯¹è±¡åå¨BOSæå¡ãéè¿bosfsï¼ç¨æ·å¯ä»¥ä½¿ç¨æ åçfsspecæ¥å£æä½åå¨äºBOSä¸çæ°æ®ã
å®è£
éè¿pipå®è£ bosfsï¼
pip install bosfs
æ¥çæ¯å¦å®è£ æåï¼
pip show bosfs
å¿«éå¼å§
é 置访é®åè¯
æ¯æä»¥ä¸2ç§æ¹å¼ï¼å®æä»»ä¸æ¹å¼å³å¯ï¼æä¼å 级ã
- åå§åBOSFileSystemæ¶éè¿åæ°æå®:
import bosfs
fs = bosfs.BOSFileSystem(endpoint='http://bj.bcebos.com', access_key='{your ak}', secret_key='{your sk}', sts_token=None)
- é ç½®ç¯å¢åé
export BCE_ACCESS_KEY_ID=xxx
export BCE_SECRET_ACCESS_KEY=xxx
export BOS_ENDPOINT=xxx
示ä¾
- éè¿bosfså举bosä¸çæ°æ®
import bosfs
fs = bosfs.BOSFileSystem(endpoint='http://bj.bcebos.com', access_key='xxx', secret_key='xxx')
res = fs.ls('/mybucket/')
print(res)
- éè¿bosfs读åbosä¸çæ°æ®
import bosfs
fs = bosfs.BOSFileSystem(endpoint='http://bj.bcebos.com', access_key='xxx', secret_key='xxx')
with fs.open('/mybucket/README.md') as f:
print(f.readline())
- éè¿bosfsåå ¥æ°æ®å°bos
import bosfs
import os
fs = bosfs.BOSFileSystem(endpoint='http://bj.bcebos.com', access_key='xxx', secret_key='xxx')
object_name = "file1"
data = os.urandom(10 * 2**20)
with bosfs.open(object_name, "wb") as f_wb:
f_wb.write(data)
æ´å¤ä½¿ç¨æ¹æ³å¯åèfsspecææ¡£
è¯»åæ§è½
æ§è½æµè¯ç»æ:
åæä»¶é¡ºåºè¯»åæ§è½:
| file_size_mb | write_speed_mbps | read_speed_mbps | write_time_s | read_time_s |
|---|---|---|---|---|
| 1 | 9.672178 | 4.640667 | 0.104893 | 0.216561 |
| 4 | 24.867180 | 15.987999 | 0.160897 | 0.250713 |
| 10 | 30.681564 | 30.890233 | 0.343196 | 0.325457 |
| 128 | 75.634190 | 93.547005 | 1.692568 | 1.369047 |
| 512 | 80.359467 | 97.840121 | 6.371391 | 5.233074 |
夿件并åè¯»åæ§è½ï¼
| file_nums | file_size_mb | threads_nums | write_speed_mbps | read_speed_mbps | write_time_s | read_time_s |
|---|---|---|---|---|---|---|
| 1000 | 4 | 16 | 381.91 | 301.56 | 10.47 | 13.26 |
æµè¯ç¯å¢:
- æµè¯æºå¨: 16æ ¸32Gå å, 3Gbpsç½ç»å¸¦å®½
æµè¯ä»£ç :
import bosfs
import os
import time
import numpy as np
from datetime import datetime
import pandas as pd
import matplotlib.pyplot as plt
from tqdm import tqdm
def test_bosfs_performance(bucket_name, file_sizes=None, num_iterations=3, plot_results=True):
"""
æµè¯ BosFS çè¯»åæ§è½
åæ°:
bucket_name: BOS æ¡¶åç§°
file_sizes: è¦æµè¯çæä»¶å¤§å°å表ï¼ä»¥MB为åä½ï¼
num_iterations: æ¯ä¸ªæä»¶å¤§å°é夿µè¯ç次æ°
plot_results: æ¯å¦ç»å¶æ§è½å¾è¡¨
"""
# åå§å BosFS
fs = bosfs.BOSFileSystem(endpoint='http://bj.bcebos.com', access_key='xxx', secret_key='xxx')
# å¦ææªæå®æä»¶å¤§å°ï¼å使ç¨é»è®¤å¼
if file_sizes is None:
file_sizes = [1, 10, 50, 100, 250, 500] # MB
# åå»ºç»æåå¨ç»æ
results = {
'file_size_mb': [],
'write_speed_mbps': [],
'read_speed_mbps': [],
'write_time_s': [],
'read_time_s': []
}
# 对æ¯ä¸ªæä»¶å¤§å°è¿è¡æµè¯
for size_mb in file_sizes:
print(f"\næµè¯æä»¶å¤§å°: {size_mb} MB")
size_bytes = size_mb * 2**20
write_speeds = []
read_speeds = []
write_times = []
read_times = []
# é夿µè¯å¤æ¬¡ä»¥è·å¾å¹³åæ§è½
for i in range(num_iterations):
# çæéæºæ°æ®
print(f" è¿ä»£ {i+1}/{num_iterations}: çæéæºæ°æ®...")
data = os.urandom(size_bytes)
# æå»ºå¯ä¸ç对象åç§°
timestamp = datetime.now().strftime("%Y%m%d%H%M%S")
object_name = f"{bucket_name}/perf_test_{size_mb}mb_{timestamp}_{i}.dat"
# æµè¯åå
¥æ§è½
print(f" è¿ä»£ {i+1}/{num_iterations}: æµè¯åå
¥æ§è½...")
write_start = time.time()
with fs.open(object_name, "wb") as f_wb:
f_wb.write(data)
write_end = time.time()
write_time = write_end - write_start
write_speed = size_mb / write_time # MB/s
write_times.append(write_time)
write_speeds.append(write_speed)
# æµè¯è¯»åæ§è½
print(f" è¿ä»£ {i+1}/{num_iterations}: æµè¯è¯»åæ§è½...")
read_start = time.time()
with fs.open(object_name, "rb") as f_rb:
read_data = f_rb.read()
read_end = time.time()
read_time = read_end - read_start
read_speed = size_mb / read_time # MB/s
read_times.append(read_time)
read_speeds.append(read_speed)
# éªè¯æ°æ®å®æ´æ§
if len(read_data) != len(data):
print(f" è¦å: 读åçæ°æ®å¤§å°({len(read_data)})ä¸åå
¥çæ°æ®å¤§å°({len(data)})ä¸å¹é
!")
# æ¸
çæµè¯æä»¶
try:
fs.rm(object_name)
print(f" å·²å 餿µè¯æä»¶: {object_name}")
except Exception as e:
print(f" å 餿件æ¶åºé: {e}")
# 计ç®å¹³åæ§è½
avg_write_speed = np.mean(write_speeds)
avg_read_speed = np.mean(read_speeds)
avg_write_time = np.mean(write_times)
avg_read_time = np.mean(read_times)
# åå¨ç»æ
results['file_size_mb'].append(size_mb)
results['write_speed_mbps'].append(avg_write_speed)
results['read_speed_mbps'].append(avg_read_speed)
results['write_time_s'].append(avg_write_time)
results['read_time_s'].append(avg_read_time)
print(f" å¹³ååå
¥é度: {avg_write_speed:.2f} MB/s, å¹³ååå
¥æ¶é´: {avg_write_time:.2f} ç§")
print(f" å¹³å读åé度: {avg_read_speed:.2f} MB/s, å¹³åè¯»åæ¶é´: {avg_read_time:.2f} ç§")
# å建DataFrame以便äºåæ
results_df = pd.DataFrame(results)
# æå°ç»æè¡¨æ ¼
print("\næ§è½æµè¯ç»æ:")
print(results_df.to_string(index=False))
if plot_results:
# ç»å¶æ§è½å¾è¡¨
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 6))
# é度å¾è¡¨
ax1.plot(results['file_size_mb'], results['write_speed_mbps'], 'b-o', label='Write Speed')
ax1.plot(results['file_size_mb'], results['read_speed_mbps'], 'r-o', label='Read Speed')
ax1.set_xlabel('File Size (MB)')
ax1.set_ylabel('Speed (MB/s)')
ax1.set_title('BosFS Read/Write Speed')
ax1.legend()
ax1.grid(True)
# æ¶é´å¾è¡¨
ax2.plot(results['file_size_mb'], results['write_time_s'], 'b-o', label='Write Cost')
ax2.plot(results['file_size_mb'], results['read_time_s'], 'r-o', label='Read Cost')
ax2.set_xlabel('File Size (MB)')
ax2.set_ylabel('Cost (Second)')
ax2.set_title('BosFS Read/Write Cost (Second)')
ax2.legend()
ax2.grid(True)
plt.tight_layout()
plt.savefig('bosfs_performance.png')
plt.show()
return results_df
def test_bosfs_parallel_performance(bucket_name, file_size_mb=10, num_files=5, num_threads=4):
"""
æµè¯ BosFS çå¹¶è¡è¯»åæ§è½
åæ°:
bucket_name: BOS æ¡¶åç§°
file_size_mb: æ¯ä¸ªæä»¶ç大å°ï¼MBï¼
num_files: è¦åå
¥/读åçæä»¶æ°é
num_threads: å¹¶è¡çº¿ç¨æ°
"""
from concurrent.futures import ThreadPoolExecutor
# åå§å BosFS
fs = bosfs.BOSFileSystem(endpoint='http://bj.bcebos.com', access_key='xxx', secret_key='xxx')
file_size_bytes = file_size_mb * 2**20
timestamp = datetime.now().strftime("%Y%m%d%H%M%S")
# çææä»¶åå表
object_names = [f"{bucket_name}/parallel_test_{timestamp}_{i}.dat" for i in range(num_files)]
# çæéæºæ°æ®
print(f"çæ {num_files} 个 {file_size_mb} MB çéæºæä»¶...")
data_list = [os.urandom(file_size_bytes) for _ in range(num_files)]
# å¹¶è¡åå
¥å½æ°
def write_file(args):
object_name, data = args
try:
with fs.open(object_name, "wb") as f_wb:
f_wb.write(data)
return True
except Exception as e:
print(f"åå
¥ {object_name} æ¶åºé: {e}")
return False
# å¹¶è¡è¯»å彿°
def read_file(object_name):
try:
with fs.open(object_name, "rb") as f_rb:
data = f_rb.read()
return len(data)
except Exception as e:
print(f"读å {object_name} æ¶åºé: {e}")
return 0
# æµè¯å¹¶è¡åå
¥
print(f"\næµè¯å¹¶è¡åå
¥ ({num_threads} 线ç¨)...")
write_start = time.time()
with ThreadPoolExecutor(max_workers=num_threads) as executor:
results = list(tqdm(
executor.map(write_file, zip(object_names, data_list)),
total=num_files,
desc="åå
¥è¿åº¦"
))
write_end = time.time()
write_time = write_end - write_start
total_data_size_mb = file_size_mb * num_files
write_speed = total_data_size_mb / write_time
print(f"å¹¶è¡åå
¥å®æ: {sum(results)}/{num_files} 个æä»¶æå")
print(f"æ»åå
¥æ¶é´: {write_time:.2f} ç§")
print(f"æ»æ°æ®å¤§å°: {total_data_size_mb} MB")
print(f"å¹³ååå
¥é度: {write_speed:.2f} MB/s")
# æµè¯å¹¶è¡è¯»å
print(f"\næµè¯å¹¶è¡è¯»å ({num_threads} 线ç¨)...")
read_start = time.time()
with ThreadPoolExecutor(max_workers=num_threads) as executor:
read_sizes = list(tqdm(
executor.map(read_file, object_names),
total=num_files,
desc="读åè¿åº¦"
))
read_end = time.time()
read_time = read_end - read_start
read_speed = total_data_size_mb / read_time
print(f"å¹¶è¡è¯»å宿: {sum(1 for size in read_sizes if size > 0)}/{num_files} 个æä»¶æå")
print(f"æ»è¯»åæ¶é´: {read_time:.2f} ç§")
print(f"å¹³å读åé度: {read_speed:.2f} MB/s")
# æ¸
çæµè¯æä»¶
print("\næ¸
çæµè¯æä»¶...")
for object_name in tqdm(object_names, desc="å 餿件"):
try:
fs.rm(object_name)
except Exception as e:
print(f"å é¤ {object_name} æ¶åºé: {e}")
return {
'parallel_write_speed_mbps': write_speed,
'parallel_read_speed_mbps': read_speed,
'parallel_write_time_s': write_time,
'parallel_read_time_s': read_time
}
if __name__ == "__main__":
# 设置 BOS æ¡¶åç§°
bucket_name = "your-bucket-name"
# è¿è¡åæä»¶è¯»åæ§è½æµè¯
print("=" * 80)
print("å¼å§ BosFS åæä»¶è¯»åæ§è½æµè¯")
print("=" * 80)
# èªå®ä¹æä»¶å¤§å°å表 (MB)
file_sizes = [1, 10, 50, 100]
# è¿è¡æ§è½æµè¯
results_df = test_bosfs_performance(bucket_name, file_sizes=file_sizes, num_iterations=3)
# è¿è¡å¹¶è¡æ§è½æµè¯
print("\n" + "=" * 80)
print("å¼å§ BosFS å¹¶è¡è¯»åæ§è½æµè¯")
print("=" * 80)
parallel_results = test_bosfs_parallel_performance(
bucket_name,
file_size_mb=4, # æ¯ä¸ªæä»¶ 4MB
num_files=1000, # æ»å
± 1000 个æä»¶
num_threads=16 # ä½¿ç¨ 16 个线ç¨
)
# ä¿åç»æå° CSV æä»¶
results_df.to_csv('bosfs_performance_results.csv', index=False)
print("\næµè¯ç»æå·²ä¿åå° bosfs_performance_results.csv")
使ç¨éå¶
å½åbosfsæä¸æ¯æasync模å¼ã
çæ¬ä¿¡æ¯
v1.0.0 - 2025-03-20
New features
- 馿¬¡æäº¤
- æ¯æsync bosfs
Breaking changes
- æ
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distributions
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file bosfs-1.0.0-py3-none-any.whl.
File metadata
- Download URL: bosfs-1.0.0-py3-none-any.whl
- Upload date:
- Size: 19.8 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.8.10
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
71ce9a75c54ef2b68c663e8a0c869f7d37162d558d4fb2617ae2d7940d2b6819
|
|
| MD5 |
c1251a348bab98c79bbafbacd9774ce0
|
|
| BLAKE2b-256 |
0fa014cb7641efd331299c1a25f0f6ab758717fa6b08b448d1509ef85c70bbc5
|