Skip to main content

TOS implementation for pythonic file-system interface

Project description

tosfsspec - 火山引擎 TOS 文件系统接口

概述

tosfsspec 是基于 Python fsspec (Filesystem Specification) 规范实现的火山引擎对象存储(TOS)文件系统接口。通过 tosfsspec,用户可以使用标准的 Python 文件操作方式(如 open, read, write)直接访问 TOS 中的数据,同时支持与 Pandas、Dask、Ray、PyArrow 等主流数据分析和 AI 框架无缝集成。

产品优势

  • 标准接口:完全遵循 fsspec 标准,与 Python 生态系统高度兼容。
  • 高性能:底层核心逻辑使用 Rust 编写的 tosnativeclient,提供高性能的并发 IO 操作。
  • 生态集成:原生支持 Pandas、Ray、PyTorch 等框架,无需额外适配即可读取 TOS 数据。
  • 易用性:提供类似于本地文件系统的操作体验,简化了对象存储的使用复杂度。

安装方式

使用 pip 安装 tosfsspec

pip install tosfsspec

验证安装是否成功:

pip show tosfsspec

快速开始

1. 配置访问凭证

在使用 tosfsspec 之前,您需要准备好火山引擎 TOS 的访问凭证(Access Key ID 和 Secret Access Key)、Endpoint 和 Region 信息。

初始化 TosFileSystem 示例:

import tosfsspec

# 初始化文件系统对象
fs = tosfsspec.TosFileSystem(
    key='YOUR_ACCESS_KEY',
    secret='YOUR_SECRET_KEY',
    endpoint='https://tos-cn-beijing.volces.com',  # 替换为您的 Endpoint
    region='cn-beijing'  # 替换为您的 Region
)

2. 基本文件操作

列举文件

# 列举存储桶中的文件
files = fs.ls('tos://my-bucket/data/')
print(files)

写入数据

# 直接写入字符串数据
with fs.open('tos://my-bucket/test.txt', 'w') as f:
    f.write('Hello, TOS!')

# 上传本地文件到 TOS
fs.put_file('local_data.csv', 'tos://my-bucket/data/remote_data.csv')

读取数据

# 读取 TOS 文件内容
with fs.open('tos://my-bucket/test.txt', 'r') as f:
    content = f.read()
    print(content)

# 下载 TOS 文件到本地
fs.get_file('tos://my-bucket/data/remote_data.csv', 'local_data.csv')

TosFileSystem 初始化详解

TosFileSystem 是与 TOS 交互的核心入口,支持多种参数配置以适应不同的网络环境和性能需求。

初始化参数

参数名 类型 必填 默认值 说明
region str - TOS 存储桶所在的区域,例如 cn-beijing
endpoint str '' TOS 服务端点,例如 https://tos-cn-beijing.volces.com
key str '' 访问密钥 Access Key ID。
secret str '' 访问密钥 Secret Access Key。
part_size int 8MB 分片上传/下载的大小(字节)。对于大文件,增大此值可减少 API 调用次数并提升吞吐量。
max_retry_count int 3 请求失败时的最大重试次数。
shared_prefetch_tasks int 32 数据预取时的最大并发任务数。
shared_upload_part_tasks int 32 分片上传时的最大并发任务数。
shared_upload_part_copy_tasks int 32 分片拷贝时的最大并发任务数。
enable_crc bool True 是否开启 CRC64 数据完整性校验。关闭可降低 CPU 开销,但会降低数据安全性。

初始化示例

1. 基础初始化

使用显式的 AK/SK 进行初始化。

from tosfsspec import TosFileSystem

fs = TosFileSystem(
    region='cn-beijing',
    endpoint='https://tos-cn-beijing.volces.com',
    key='YOUR_ACCESS_KEY',
    secret='YOUR_SECRET_KEY'
)

2. 性能调优初始化

针对高吞吐场景,增加分片大小和并发数,并关闭 CRC 校验。

fs = TosFileSystem(
    region='cn-beijing',
    endpoint='https://tos-cn-beijing.volces.com',
    key='YOUR_ACCESS_KEY',
    secret='YOUR_SECRET_KEY',
    part_size=16 * 1024 * 1024,  # 16MB 分片
    shared_upload_part_tasks=64,  # 提高上传并发
    enable_crc=False  # 关闭 CRC 校验
)

常用功能使用示例

示例 1:在 Python 环境中直接使用

在 Python 环境中可直接使用 tosfsspec.TosFileSystem 的常用 API 读写 TOS,类似本地文件系统的使用方式。

import os

import tosfsspec

fs = tosfsspec.TosFileSystem(
    key='YOUR_ACCESS_KEY',
    secret='YOUR_SECRET_KEY',
    endpoint='http://tos-cn-beijing.volces.com',
    region='cn-beijing'
)

bucket = 'my-bucket'
local_file_path = 'localfile.txt'
remote_file_path = f'tos://{bucket}/remote_file.txt'

# create a local file to upload
with open(local_file_path, 'w') as f:
    f.write('Hello TOSFS.')

# upload to tos
fs.put_file(local_file_path, remote_file_path)
print(f'Uploaded {local_file_path} to {remote_file_path}')

# download from tos
downloaded_file_path = 'downloaded_file.txt'
fs.get_file(remote_file_path, downloaded_file_path)
print(f'Downloaded {remote_file_path} to {downloaded_file_path}')

# read content from downloaded local file
with open(downloaded_file_path, 'r') as f:
    content = f.read()
    print(f'Content of {downloaded_file_path}: {content}')

# delete tos file
fs.rm(remote_file_path)

# write to tos
with fs.open(remote_file_path, 'w') as f:
    f.write('Hello TOSFS.')

# read from tos
with fs.open(remote_file_path, 'r') as f:
    tos_content = f.read()
    print(f'Content of {remote_file_path}: {tos_content}')

# clean local files
os.remove(local_file_path)
os.remove(downloaded_file_path)

示例 2:Ray 分布式计算集成

Ray 可以利用 tosfsspec.TosFileSystem 在分布式集群中直接加载 TOS 数据进行处理。

import ray
import tosfsspec

# 初始化 Ray (通常在集群环境中已初始化)
ray.init()

bucket = 'my-bucket'

fs = tosfsspec.TosFileSystem(
    key='YOUR_ACCESS_KEY',
    secret='YOUR_SECRET_KEY',
    endpoint='http://tos-cn-beijing.volces.com',
    region='cn-beijing'
)

# csv file path
input_csv_path = f'tos://{bucket}/input.csv'
output_csv_path = f'tos://{bucket}/output.csv'

with fs.open(input_csv_path, 'w') as f:
    f.write('id,name,age\n1,John Doe,30\n2,Jane Smith,25\n3,Bob Johnson,40\n')

ds = ray.data.read_csv(input_csv_path, filesystem=fs)

# processing data
# ds = ds.map_batches(your_processing_function)

ds.repartition(1).write_csv(output_csv_path, filesystem=fs)

示例 3:PyArrow 集成

PyArrow 可以通过 filesystem 参数使用 tosfsspec.TosFileSystem,实现高效读写 TOS 中的数据。

import pyarrow as pa
import pyarrow.dataset as ds
import pyarrow.parquet as pq
import tosfsspec

bucket = 'my-bucket'

fs = tosfsspec.TosFileSystem(
    key='YOUR_ACCESS_KEY',
    secret='YOUR_SECRET_KEY',
    endpoint='http://tos-cn-beijing.volces.com',
    region='cn-beijing'
)

# 生成 Parquet 数据集
table = pa.table({'year': [2020, 2022, 2021, 2022, 2019, 2021],
                  'n_legs': [2, 2, 4, 4, 5, 100],

                  'animal': ['Flamingo', 'Parrot', 'Dog', 'Horse',
                             'Brittle stars', 'Centipede']})
pq.write_table(table, f'tos://{bucket}/data.parquet', filesystem=fs)

# 读取 Parquet 数据集
dataset = ds.dataset(f'tos://{bucket}/data.parquet', filesystem=fs)

# 输出数据集信息
print(f'Rows: {dataset.count_rows()}')
print(dataset.schema)

# 转换为 Pandas DataFrame
df = dataset.to_table().to_pandas()
print(df.head())

示例 4:PyTorch 集成

PyTorch 可以在数据集加载和 Checkpoint 读写等场景集成 tosfsspec.TosFileSystem

import fsspec
import torch
import tosfsspec
from torchdata.datapipes.iter import FSSpecFileLister, FSSpecFileOpener

bucket = 'my-bucket'

# 数据集加载
fsspec.register_implementation('tos-new', 'tosfsspec.TosFileSystem', )
kwargs = {
    'key': 'YOUR_ACCESS_KEY',
    'secret': 'YOUR_SECRET_KEY',
    'endpoint': 'http://tos-cn-beijing.volces.com',
    'region': 'cn-beijing'
}

file_lister = FSSpecFileLister(root=f'tos-new://{bucket}/dataset/', **kwargs)
iterable_dataset = FSSpecFileOpener(file_lister, mode='rb', **kwargs)

for _, item in iterable_dataset:
    data = item.read()

# Checkpoint 读写
fs = tosfsspec.TosFileSystem(
    key='YOUR_ACCESS_KEY',
    secret='YOUR_SECRET_KEY',
    endpoint='http://tos-cn-beijing.volces.com',
    region='cn-beijing'
)

model = torch.nn.Linear(10, 10)
with fs.open(f'tos://{bucket}/models/model.pt', 'wb') as f:
    torch.save(model.state_dict(), f)

with fs.open(f'tos://{bucket}/models/model.pt', 'rb') as f:
    state_dict = torch.load(f)
    # print(state_dict)
    model.load_state_dict(state_dict)

示例 5:Pandas 集成

Pandas 可以利用tosfsspec.TosFileSystem直接从 TOS 读取或向 TOS 写入各种格式的数据(如 CSV、Parquet 等),无需先将文件下载到本地。

# 从 TOS 读取 CSV 文件
import fsspec
import pandas as pd
import tosfsspec

fsspec.register_implementation('tos-new', 'tosfsspec.TosFileSystem', )

# 准备一个 CSV 文件在 TOS 上
fs = tosfsspec.TosFileSystem(
    key='YOUR_ACCESS_KEY',
    secret='YOUR_SECRET_KEY',
    region='cn-beijing'
)
bucket = 'my-bucket'
csv_path = f'tos-new://{bucket}/datasets/iris.csv'
csv_content = 'sepal_length,sepal_width,petal_length,petal_width,species\n5.1,3.5,1.4,0.2,setosa\n'
with fs.open(csv_path, 'wb') as f:
    f.write(csv_content.encode())

# 配置 storage_options
storage_options = {
    'key': 'YOUR_ACCESS_KEY',
    'secret': 'YOUR_SECRET_KEY',
    'region': 'cn-beijing'
}

# 使用 pandas 直接读取
df = pd.read_csv(csv_path, storage_options=storage_options)

print('Read csv data from TOS:')
print(df)

# 将 DataFrame 写入 TOS 的 Parquet 文件
# 创建一个 DataFrame
data = {'col1': [1, 2], 'col2': [3, 4]}
df_to_write = pd.DataFrame(data=data)

# 定义要写入的 Parquet 文件路径
parquet_path = f'tos-new://{bucket}/output/data.parquet'

# 直接写入 TOS,需要安装 pyarrow 或 fastparquet
df_to_write.to_parquet(parquet_path, storage_options=storage_options, index=False)

print(f'Write data to: {parquet_path}')

示例 6:Dask 集成

Dask 是一个用于并行计算的灵活库,它天然支持fsspec兼容的文件系统。结合tosfsspec.TosFileSystem后,你可以使用 dask.dataframe 一次性并行读取 TOS 上的多个 CSV 文件,构建一个分布式 DataFrame。

import dask.dataframe as dd
import fsspec

fsspec.register_implementation('tos-new', 'tosfsspec.TosFileSystem', )

bucket = 'my-bucket'

# 配置 storage_options
storage_options = {
    'key': 'YOUR_ACCESS_KEY',
    'secret': 'YOUR_SECRET_KEY',
    'region': 'cn-beijing'
}

# 文件路径支持通配符
dask_path = f'tos-new://{bucket}/stock_data/part.*.csv'

# 使用 Dask 读取数据,Dask 会自动处理通配符并并行加载
ddf = dd.read_csv(dask_path, storage_options=storage_options)

# Dask 是惰性计算的,此时数据还未真正加载
print('Dask DataFrame:', ddf.npartitions)

# 执行计算,例如计算平均值,这会触发实际的 I/O 操作
mean_value = ddf.some_column.mean().compute()

print('Avg:', mean_value)

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

tosfsspec-2026.4.22.tar.gz (31.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

tosfsspec-2026.4.22-py3-none-any.whl (23.0 kB view details)

Uploaded Python 3

File details

Details for the file tosfsspec-2026.4.22.tar.gz.

File metadata

  • Download URL: tosfsspec-2026.4.22.tar.gz
  • Upload date:
  • Size: 31.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.0

File hashes

Hashes for tosfsspec-2026.4.22.tar.gz
Algorithm Hash digest
SHA256 1c9b2302c6875723e60b5e6b440a9688b6d553efca763988d833b5531e27f95b
MD5 bc3162eec76d0fe68f377539dda7c655
BLAKE2b-256 026ee9b86e14c64d7dd3c67ac5a47093ec134aca03f94cc552fe419e2cfbc888

See more details on using hashes here.

File details

Details for the file tosfsspec-2026.4.22-py3-none-any.whl.

File metadata

  • Download URL: tosfsspec-2026.4.22-py3-none-any.whl
  • Upload date:
  • Size: 23.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.0

File hashes

Hashes for tosfsspec-2026.4.22-py3-none-any.whl
Algorithm Hash digest
SHA256 1a26700559d3f5661c21475a2c17aa2e783e09de19eceec440bda49f1b3f617c
MD5 18a8b2349094e87a36fa3ddbbfb59cdd
BLAKE2b-256 d9fbbe074e32242a583498c187ec3fb7fe3c0c7b1b8e2c398a8c88315894c01b

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page