Skip to main content

Alluxio Fsspec provides Alluxio filesystem spec implementation.

Project description

Alluxio FileSystem

This quickstart shows how you can use the AlluxioFileSystem to connect to Alluxio or directly to underlying storage systems. For more information on what to expect, please read the blog Accelerate data loading in large scale ML training with Ray and Alluxio.

Installation

A running Alluxio server with ETCD membership service

Alluxio version >= 3.7

Launch Alluxio clusters with the example configuration

# only one master, one worker are running in this example
alluxio.master.hostname=localhost
alluxio.worker.hostname=localhost

# Critical properties for this example
# UFS address (e.g., the src of data to cache), change it to your bucket
alluxio.dora.client.ufs.root=s3://example_bucket/datasets/
# storage dir
alluxio.worker.page.store.dirs=/tmp/page_ufs
# size of storage dir
alluxio.worker.page.store.sizes=10GB
# use etcd to keep consistent hashing ring
alluxio.worker.membership.manager.type=ETCD
# default etcd endpoint
alluxio.etcd.endpoints=http://localhost:2379
# number of vnodes per worker on the ring
alluxio.user.consistent.hash.virtual.node.count.per.worker=5

# Other optional settings, good to have
alluxio.job.batch.size=200
alluxio.master.journal.type=NOOP
alluxio.master.scheduler.initial.wait.time=10s
alluxio.network.netty.heartbeat.timeout=5min
alluxio.underfs.io.threads=50

Python Dependencies

Python in range of [3.8, 3.9, 3.10] ray >= 2.8.2 fsspec released after 2023.6

Install fsspec implementation for underlying data storage

Alluxio fsspec acts as a cache on top of an existing underlying data lake storage connection. The fsspec implementation corresponding to the underlying data lake storage needs to be installed. In the below Alluxio configuration example, Amazon S3 is the data lake storage where the dataset is read from.

To connect to an existing underlying storage, there are two requirements

  • Install the underlying storage fsspec
  • Set credentials for the underlying data lake storage

Example: Deploy S3 as the underlying data lake storage Install third-party S3 fsspec

pip install s3fs

Install alluxiofs

Build and install the alluxiofs package from source.

cd alluxiofs && python3 setup.py bdist_wheel && \
     pip3 install dist/alluxiofs-<alluxiofs_version>-py3-none-any.whl

Running a Hello World Example

from alluxiofs import AlluxioFileSystem


def happy_path(ufs, alluxio_fs: AlluxioFileSystem):
    if ufs == "file":
        file_path = "file:///opt/alluxio/local-ufs/python_sdk_test/test.txt"
        file_read_path = "file:///opt/alluxio/local-ufs/local-ufs/python_sdk_test/4k"
        file_read_path_2 = "file:///opt/alluxio/local-ufs/local-ufs/python_sdk_test/0"
        file_dir_path = "file:///opt/alluxio/local-ufs/local-ufs/"
    elif ufs == "oss":
        file_path = "oss://<YOUR_OSS_BUCKET>/python_sdk_test/test.txt"
        file_read_path = "oss://<YOUR_OSS_BUCKET>/python_sdk_test/4k"
        file_read_path_2 = "oss://<YOUR_OSS_BUCKET>/python_sdk_test/0"
        file_dir_path = "oss://<YOUR_OSS_BUCKET>/"
    elif ufs == "s3":
        file_path = "s3a://<YOUR_S3_BUCKET>/python_sdk_test/test.txt"
        file_read_path = "s3a://<YOUR_S3_BUCKET>/python_sdk_test/4k"
        file_read_path_2 = "s3a://<YOUR_S3_BUCKET>/python_sdk_test/0"
        file_dir_path = "s3a://<YOUR_S3_BUCKET>/"

    print()
    print(f"ufs: {ufs}")
    alluxio_fs.touch(file_dir_path + "empty_file")
    alluxio_fs.cat(file_dir_path + "empty_file")
    alluxio_fs.rm(file_dir_path + "empty_file")
    alluxio_fs.touch(file_dir_path + "empty_file")
    with alluxio_fs.open(file_dir_path + "empty_file", "wb") as f:
        f.write(b"asdgahfadh")
    print(f"length of empty_file: {len(alluxio_fs.cat(file_dir_path + "empty_file"))}")
    alluxio_fs.rm(file_dir_path + "empty_file")
    # Test little file
    print(alluxio_fs.info(file_path))
    print(len(alluxio_fs.read_bytes(file_path)))
    print(alluxio_fs.size(file_path))
    # print(alluxio_fs.is_file_in_local_cache(file_path))
    print("=============================================")

    # print(alluxio_fs.is_file_in_local_cache(file_read_path))
    print(f"read bytes: {len(alluxio_fs.read_bytes(file_read_path))}")
    print(f"cat: {len(alluxio_fs.cat(file_read_path))}")
    print(f"file size: {alluxio_fs.size(file_read_path)}")
    # print(alluxio_fs.is_file_in_local_cache(file_read_path))
    print("=============================================")
    # Test mixed-size file
    # print(alluxio_fs.is_file_in_local_cache(file_read_path_2))
    print(f"read bytes: {len(alluxio_fs.read_bytes(file_read_path_2))}")
    print(f"file size: {alluxio_fs.size(file_read_path_2)}")
    # print(alluxio_fs.is_file_in_local_cache(file_read_path_2))


    print(alluxio_fs.info(file_path))
    print(alluxio_fs.ls(file_dir_path))
    print(alluxio_fs.exists(file_path))
    print(alluxio_fs.size(file_path))
    print(alluxio_fs.isdir(file_dir_path))
    print(alluxio_fs.isfile(file_path))

    print(len(alluxio_fs.cat(file_path)))
    with alluxio_fs.open(file_path, "rb") as f:
        data = f.read()
        print(len(data))
    print(len(alluxio_fs.read_bytes(file_path)))
    print(alluxio_fs.head(file_path, 2))
    print(alluxio_fs.tail(file_path, 3))

    # print(alluxio_fs.created(file_path))
    print(alluxio_fs.modified(file_path))
    print(alluxio_fs.checksum(file_path))
    print(alluxio_fs.ukey(file_path))

    alluxio_fs.mkdir(file_dir_path + "new_dir")
    alluxio_fs.touch(file_dir_path + "new_dir/new_file")
    print(alluxio_fs.size(file_dir_path + "new_dir/new_file"))
    print(alluxio_fs.cat(file_dir_path + "new_dir/new_file"))
    alluxio_fs.write_bytes(file_dir_path + "new_dir/new_file", b"Hello, Alluxio!")
    alluxio_fs.mv(file_dir_path + "new_dir/new_file", file_dir_path + "new_dir/renamed_file")
    alluxio_fs.cp(file_dir_path + "new_dir/renamed_file", file_dir_path + "new_dir/copied_file")
    alluxio_fs.rm(file_dir_path + "new_dir/copied_file")
    alluxio_fs.rm(file_dir_path + "new_dir/renamed_file")
    if alluxio_fs.exists(file_dir_path + "new_dir"):
        alluxio_fs.rmdir(file_dir_path + "new_dir")
        
def test_ufs_fallback():
    test_options = {
        "skip_alluxio": True
    }

    alluxio_fs = AlluxioFileSystem(
        yaml_path="./config_template.yaml",
        test_options=test_options,
        log_level="DEBUG",
    )

    print("Testing UFS: file")
    happy_path("file", alluxio_fs)
    print("Testing UFS: oss")
    happy_path("oss", alluxio_fs)
    print("Testing UFS: s3")
    happy_path("s3", alluxio_fs)

def test_basic():
    alluxio_fs = AlluxioFileSystem(
        yaml_path="./config_template.yaml",
        log_level="DEBUG",
        fallback_to_ufs_enabled=False,
    )
    happy_path("file", alluxio_fs)


def test_skip_alluxio():
    ufs_config = {
        "file": {},
        "oss": {
            "access_key": "<YOUR_OSS_ACCESS_KEY>",
            "secret_key": "<YOUR_OSS_SECRET_KEY>",
            "endpoint": "<YOUR_OSS_ENDPOINT>",
        },
        "s3": {
            "access_key": "<YOUR_S3_ACCESS_KEY>",
            "secret_key": "<YOUR_S3_SECRET_KEY>",
            "endpoint": "<YOUR_S3_ENDPOINT>",
        }
    }

    alluxio_fs = AlluxioFileSystem(
        #yaml_path="./config_template.yaml",
        skip_alluxio=True,
        ufs_config=ufs_config,
        log_level="DEBUG",
    )
    print("Testing UFS: file")
    happy_path("file", alluxio_fs)
    print("Testing UFS: oss")
    happy_path("oss", alluxio_fs)
    print("Testing UFS: s3")
    happy_path("s3", alluxio_fs)


if __name__ == "__main__":
    test_basic()
    test_ufs_fallback()
    test_skip_alluxio()

Configurations YAML file for Python SDK

# ============================================================================
# Alluxio Client Configuration File
# ============================================================================
# This configuration file is used to configure the behavior of the Alluxio Python SDK client.
# The configuration file can be passed to the AlluxioFileSystem initialization function via the yaml_path parameter.
# Example: fs = AlluxioFileSystem(yaml_path="config.yaml")
# ============================================================================

# ----------------------------------------------------------------------------
# Worker Node Discovery Configuration
# ----------------------------------------------------------------------------
# Note: load_balance_domain and worker_hosts are mutually exclusive - only one can be configured

# load_balance_domain: Load balancing domain name for discovering Alluxio Worker nodes via DNS service
#   Use Case: Use when the Alluxio cluster uses DNS load balancing or service discovery mechanisms
#   How It Works: The client resolves the domain name via DNS to get all associated IP addresses (Worker nodes),
#                 then uses a path-hashing strategy to distribute requests among these nodes
#   Example Values: "localhost" or "alluxio-workers.example.com"
#   Default Value: "localhost"
load_balance_domain: "localhost"

# worker_hosts: Directly specify the Worker node list as comma-separated hostnames
#   Use Case: Use when you know all Worker node addresses explicitly, without needing DNS resolution
#   Format: "host1,host2,host3" or "host1:port1,host2:port2" (with ports)
#   How It Works: The client directly uses these host addresses and performs load balancing via path hashing
#   Example Values: "worker1.example.com,worker2.example.com,worker3.example.com"
#   Default Value: null (not set)
#   Note: If worker_hosts is set, load_balance_domain will be ignored
worker_hosts: null

# worker_http_port: HTTP service port of Alluxio Worker nodes
#   Purpose: Used for HTTP API communication between the client and Worker nodes (e.g., file listing, metadata queries)
#   Default Value: 28080
#   Note: Ensure this port is properly configured and open on Worker nodes
worker_http_port: 28080

# worker_data_port: Data port of Alluxio Worker nodes
#   Purpose: Used for data transmission (reserved configuration item)
#   Default Value: 29998
#   Note: This configuration may not be directly used in the current version, but it's recommended to keep it consistent with the Alluxio cluster configuration
worker_data_port: 29998

# ----------------------------------------------------------------------------
# UFS (Underlying File System) Fallback Configuration
# ----------------------------------------------------------------------------

# fallback_to_ufs_enabled: Whether to enable UFS fallback mechanism
#   Purpose: When Alluxio operations fail, whether to automatically fall back to the underlying file system (UFS) to execute operations
#   Use Cases:
#     - true: Improves fault tolerance, allows data access via UFS when Alluxio service is unavailable
#     - false: Strict mode, throws exceptions immediately when Alluxio fails, helpful for quickly identifying issues
#   How It Works:
#     1. First attempts to execute operations via Alluxio
#     2. If Alluxio operation fails and this option is true, automatically switches to UFS execution
#     3. All fallback operations are logged for monitoring and debugging
#   Default Value: true
#   Recommendation: Recommended for production environments to improve system availability
fallback_to_ufs_enabled: true

# ufs_info_refresh_interval_minutes: UFS information refresh interval (minutes)
#   Purpose: Controls how frequently the client refreshes underlying file system information
#   How It Works: The client periodically updates UFS metadata information to ensure data consistency
#   Default Value: 2 (minutes)
#   Recommendations:
#     - For scenarios with frequent data updates, consider reducing this value (e.g., 1 minute)
#     - For scenarios with infrequent data updates, consider increasing this value (e.g., 5 minutes) to reduce overhead
ufs_info_refresh_interval_minutes: 2

# ----------------------------------------------------------------------------
# Logging Configuration
# ----------------------------------------------------------------------------

# log_level: Logging level
#   Purpose: Controls the verbosity of logs
#   Valid Values: "DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"
#   Default Value: "INFO"
log_level: "INFO"

# log_dir: Log directory
#   Purpose: Directory to store log files. If not set, logs might be output to console or default location.
#   Default Value: null, meaning no specific log directory is set, only console output
log_dir: null

# log_tag_allowlist: Log tag allowlist
#   Purpose: Filter logs by tags. Only logs with tags in this list will be recorded.
#   Valid Values: Comma-separated list of tags. Available tags:
#     - "[FSSPEC]": FSSpec interface related logs
#     - "[FALLBACK]": Fallback to UFS related logs
#     - "[UFS_MANAGER]": UFS management and updates
#     - "[LOAD_BALANCER]": Worker load balancing
#     - "[PREFETCH]": Data prefetching operations
#     - "[ALLUXIO_CLIENT]": Core Alluxio client operations
#     - "[LOCAL_CACHE]": Local disk cache operations
#     - "[MEMORY_CACHE]": In-memory cache operations
#   Example: "FSSPEC, LOCAL_CACHE"
#   Default Value: null, meaning no filtering is applied, all logs are recorded
log_tag_allowlist: null

# ----------------------------------------------------------------------------
# Concurrency Control Configuration
# ----------------------------------------------------------------------------

# ----------------------------------------------------------------------------
# Local Cache Configuration
# ----------------------------------------------------------------------------
# Local cache functionality can accelerate data access by caching data blocks to local disk, reducing network transmission# local_cache_enabled: Whether to enable local cache acceleration
#   Purpose: Controls whether to enable local disk cache functionality
#   Note: This configuration item controls cache functionality in the current version.
#   Default Value: false
local_cache_enabled: false

# local_cache_dir: Local cache directory path
#   Purpose: Specifies the directory where local cache data is stored
#   Format: Supports single directory or multiple directories (comma-separated) for better performance, e.g., "/tmp/cache1/, /tmp/cache2/"
#   How It Works:
#     - Cache data is distributed to different directories based on file path hash
#     - Supports multi-directory configuration for distributed cache storage
#   Default Value: "/tmp/local_cache/"
#   Important Notes:
#     - Ensure the directory has sufficient write permissions
#     - Ensure the disk where the directory is located has sufficient space
#     - Recommend using SSD or NVMe for better performance
local_cache_dir: "/tmp/local_cache/"

# local_cache_size_gb: Total local cache size (GB)
#   Purpose: Limits the maximum disk space used by local cache
#   Format: Supports single value or multiple values (comma-separated, corresponding to multiple cache directories)
#   Examples: "64" or "32, 64" (two directories limited to 32GB and 64GB respectively)
#   Default Value: 64 (GB)
#   Tuning Recommendations:
#     - Set based on available disk space
#     - Recommend setting to 1-2 times the size of frequently used data
#     - When cache is full, uses LRU (Least Recently Used) strategy to evict old data
local_cache_size_gb: 64

# local_cache_block_size_mb: Cache block size (MB)
#   Purpose: Defines the size of each cache block; data is stored and read in chunks of this size
#   Default Value: 4 (MB)
#   Tuning Recommendations:
#     - Many small files: Use smaller block size (e.g., 2MB)
#     - Many large files: Use larger block size (e.g., 8MB or 16MB)
#     - Need to balance memory usage and network transmission efficiency
local_cache_block_size_mb: 4

# local_cache_eviction_high_watermark: High watermark for eviction (0-1)
#   Purpose: Trigger eviction when cache usage exceeds this ratio
#   Default Value: 0.8
local_cache_eviction_high_watermark: 0.8

# local_cache_eviction_low_watermark: Low watermark for eviction (0-1)
#   Purpose: Eviction continues until usage drops below this ratio
#   Default Value: 0.7
local_cache_eviction_low_watermark: 0.7

# ----------------------------------------------------------------------------
# Prefetch Configuration
# ----------------------------------------------------------------------------
# Prefetch functionality can download subsequent data blocks to local cache in advance while reading the current data block, improving sequential read performance

# local_cache_prefetch_concurrency: Prefetch concurrency
#   Purpose: Controls the number of threads performing prefetch operations simultaneously
#   How It Works: Uses a thread pool to execute prefetch tasks; this parameter controls the thread pool size
#   Default Value: 32
#   Tuning Recommendations:
#     - Sufficient network bandwidth: Consider increasing (e.g., 64)
#     - Limited network bandwidth: Consider decreasing (e.g., 16)
#     - Need to balance prefetch speed and system resource usage
local_cache_prefetch_concurrency: 32

# local_cache_max_prefetch_blocks: Maximum number of prefetchable blocks
#   Purpose: Limits the maximum number of cache blocks that can be prefetched for a single file
#   How It Works: Prevents prefetching too much data from occupying cache space, affecting caching for other files
#   Default Value: 16
#   Tuning Recommendations:
#     - Sequential reading of large files: Consider increasing (e.g., 32)
#     - Random read scenarios: Consider decreasing (e.g., 8)
local_cache_max_prefetch_blocks: 16

# local_cache_prefetch_policy: Prefetch policy
#   Purpose: Controls the prefetch algorithm, determining when and how much data to prefetch
#   Valid Values:
#     - "none": No prefetching, only caches the currently read data block
#     - "fixed_window": Fixed window prefetching, always prefetches a fixed number of subsequent blocks
#     - "adaptive_window": Adaptive window prefetching, dynamically adjusts prefetch window size based on read patterns
#   Default Value: "adaptive_window"
#   Policy Descriptions:
#     - "none": Suitable for random read scenarios, saves bandwidth and cache space
#     - "fixed_window": Suitable for sequential read scenarios, predictable prefetch behavior
#     - "adaptive_window": Suitable for mixed read scenarios, automatically optimizes prefetch behavior
#   Recommendation: Most scenarios recommend using "adaptive_window"
local_cache_prefetch_policy: "adaptive_window"

# local_cache_prefetch_ahead_blocks: Number of blocks to prefetch ahead
#   Purpose: When using fixed_window policy, specifies the fixed number of blocks to prefetch
#   Effective Condition: Only effective when local_cache_prefetch_policy="fixed_window"
#   Default Value: 2
#   Example: Setting to 2 means when reading the current block, it will prefetch the next 2 blocks
#   Note: This parameter is ignored when using other prefetch policies
local_cache_prefetch_ahead_blocks: 2

# local_cache_eviction_scan_interval_minutes: Interval for eviction scans (minutes)
#   Purpose: Background eviction monitor scan period
#   Default Value: 0.5
local_cache_eviction_scan_interval_minutes: 0.5

# local_cache_ttl_time_minutes: Time-to-live for cache entries (minutes)
#   Purpose: Expire cached blocks older than the TTL; -1 means disabled
#   Default Value: -1
local_cache_ttl_time_minutes: -1

# local_cache_metrics_enabled: Enable metrics collection for local cache
#   Purpose: If true, metrics such as cache usage and block info can be retrieved via get_local_cache_info.
#            This writes a hash mapping file to disk, which may incur slight overhead.
#   Default Value: false
local_cache_metrics_enabled: false

# ----------------------------------------------------------------------------
# HTTP Request Configuration
# ----------------------------------------------------------------------------

# http_max_retries: Maximum number of HTTP request retries
#   Purpose: Maximum number of automatic retries when HTTP requests fail
#   Use Cases: Network jitter, temporary service unavailability, etc.
#   Default Value: 3
#   Tuning Recommendations:
#     - Unstable network: Consider increasing (e.g., 5)
#     - Fast-fail scenarios: Consider decreasing (e.g., 1 or 2)
#   Note: Setting to 0 means no retry, returns error immediately on failure
http_max_retries: 3

# http_timeouts: HTTP request timeout (seconds)
#   Purpose: Sets the timeout for HTTP requests to prevent requests from hanging for too long
#   Default Value: 60 (seconds)
#   Tuning Recommendations:
#     - Large file transfers: Consider increasing (e.g., 120 or 300)
#     - Fast response requirements: Consider decreasing (e.g., 30)
#     - Should be adjusted based on actual network environment and file size
http_timeouts: 60

# ----------------------------------------------------------------------------
# Read Buffer Configuration
# ----------------------------------------------------------------------------

# read_buffer_size_mb: Read buffer size (MB)
#   Purpose: Sets the buffer size for io.BufferedReader to optimize file read performance
#   How It Works: Larger buffers can reduce the number of calls to alluxio worker, improving read efficiency
#   Default Value: 0.008 (MB, i.e., 8KB)
#   Tuning Recommendations:
#     - Sequential reading of large files: Consider increasing (e.g., 0.064 or 0.128, i.e., 64KB or 128KB)
#     - Random reads or small files: Keep smaller value (e.g., 0.008 or 0.016)
#     - Need to balance memory usage and read performance
#   Note: This buffer is used for Python-level buffering, different from system-level cache
read_buffer_size_mb: 0.008

Project details


Release history Release notifications | RSS feed

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distributions

No source distribution files available for this release.See tutorial on generating distribution archives.

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

alluxiofs-1.1.18rc10-py3-none-any.whl (93.2 kB view details)

Uploaded Python 3

File details

Details for the file alluxiofs-1.1.18rc10-py3-none-any.whl.

File metadata

  • Download URL: alluxiofs-1.1.18rc10-py3-none-any.whl
  • Upload date:
  • Size: 93.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.12.9

File hashes

Hashes for alluxiofs-1.1.18rc10-py3-none-any.whl
Algorithm Hash digest
SHA256 230cb0037c4549abb475b31b5e2654aaf8b60355346d9000b89420b6ac46f42c
MD5 dff411e24584fd51b313b282e365fc5b
BLAKE2b-256 1b1bdb4562e70f9fde04f9aaf3f606e7919e9eba9365126b047d36749c4d1024

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page