Alluxio Python Filesystem API based on FSSpec

Alluxio FSSpec Python API (alluxiofs) allows applications to seamlessly interact with various storage backends using a unified Python filesystem interface. It leverages a high-performance distributed caching layer, the Alluxio servers, to significantly enhance data access speeds and reduce latency. This is particularly beneficial for data-intensive applications and workflows, especially AI training workloads, where large datasets need to be accessed quickly and repeatedly.

Prerequisites

  • A running Alluxio server using ETCD as the membership manager

  • Python in range of [3.8, 3.9, 3.10]

Installation

Install Storage Backend

Alluxiofs acts as a caching layer on top of an existing underlying data lake storage connection. The fsspec implementation corresponding to the underlying data lake storage needs to be installed.

To connect to an existing underlying storage, there are three requirements:

  1. Alluxio servers are running with the same storage backend as Alluxiofs.

  2. Install the underlying storage fsspec if using a third-party storage. For all built-in storage, no extra python libraries are needed.

  3. Set credentials for the underlying data lake storage

Example: Deploy S3 as the underlying data lake storage Install third-party S3 storage

pip install s3fs

Install alluxiofs

pip install alluxiofs

Load Data into Alluxio server

Skip this step if data is already loaded into Alluxio server.

This simple example creates a client that connects to an Alluxio cluster with localhost ETCD membership service and S3 as the under storage. See advanced arguments to connect to Alluxio servers for more configuration settings.

Submit distributed load job to Alluxio server:

from alluxiofs.client import AlluxioClient

alluxio_client = AlluxioClient(etcd_hosts="localhost")
alluxio_client.submit_load("s3://bucket/path/to/dataset/")

This will trigger a load job asynchronously. You can wait until the load finishes or check the progress of this loading process using the following command:

alluxio_client.load_progress("s3://bucket/path/to/dataset/")

To cancel the distributed load job:

alluxio_client.stop_load("s3://bucket/path/to/dataset/")

Create alluxiofs Alluxio Python API

This simple example creates a filesystem handler that connects to an Alluxio cluster with localhost ETCD membership service and S3 as the under storage.

import fsspec
from alluxiofs import AlluxioFileSystem

# Register Alluxio to fsspec
fsspec.register_implementation("alluxiofs", AlluxioFileSystem, clobber=True)

# Create Alluxio filesystem
alluxio_options = {
    "alluxio.user.worker.selection.policy.consistent.hash.virtual.node.count.per.worker": "2000",
    "alluxio.worker.page.store.page.size": "1MB",
    }
alluxio_fs = fsspec.filesystem("alluxiofs", etcd_hosts="localhost", target_protocol="s3", options=alluxio_options)

See advanced init arguments to set advanced arguments for Alluxio server and/or storage system connections.

Example: alluxiofs Hello World

# list files
contents = alluxio_fs.ls("s3://bucket/path/to/dataset/", detail=True)

# Read files
with alluxio_fs.open("s3://bucket/path/to/dataset/file1.parquet", "rb") as f:
    data = f.read()

More Python filesystem operations examples can be found here.

Example: Ray

Ray is a fast and simple framework for building and running distributed applications. PyTorch, TensorFlow, and XGBoost trainers, running on top of Ray, can leverage Ray's advanced functionalities like creating heterogeneous clusters consisting of CPU machines for data loading and preprocessing, and GPU machines for training. Data loading, preprocessing, and training can be parallelized using Ray Data.

Trainers like PyTorch will read the same dataset again and again for each epoch. Getting a large dataset for PyTorch in each epoch becomes the training bottleneck. By leveraging Alluxio's high-performance distributed caching, trainers on Ray can reduce total training time, improve GPU utilization rate, and speed up the end-to-end model lifecycle.

Prerequisites: Ray version >= 2.8.2

# Pass the initialized Alluxio filesystem to Ray and read the dataset
ds = ray.data.read_parquet("s3://bucket/path/to/dataset/file1.parquet", filesystem=alluxio_fs)

# Get a count of the number of records in the single file
ds.count()

# Display the schema derived from the file header record
ds.schema()

# Display the header record
ds.take(1)

# Display the first data record
ds.take(2)

# Read multiple files:
ds2 = ray.data.read_parquet("s3://bucket/path/to/dataset/", filesystem=alluxio_fs)

# Get a count of the number of records in the files
ds2.count()

Example: PyArrow

PyArrow allows applications and data to seamlessly connect with each other by providing a high-performance, in-memory columnar storage format. It enables efficient data interchange between different data processing systems. By delegating its storage interface to fsspec, PyArrow can access various storage backends through a unified interface. By using alluxiofs, PyArrow can leverage Alluxio's distributed caching capabilities to enhance data access speeds and reduce latency.

Example 1:

# Pass the initialized Alluxio filesystem to Pyarrow and read the data set from the example csv file
import pyarrow.dataset as ds
dataset = ds.dataset("s3://bucket/path/to/dataset/file1.parquet", filesystem=alluxio_fs)

# Get a count of the number of records in the file
dataset.count_rows()

# Display the schema derived from the file header record
dataset.schema

# Display the first record
dataset.take(0)

Example 2:

from pyarrow.fs import PyFileSystem, FSSpecHandler

# Create a python-based PyArrow filesystem using FsspecHandler
py_fs = PyFileSystem(FSSpecHandler(alluxio_fs))

# Read the data by using the Pyarrow filesystem interface
with py_fs.open_input_file("s3://bucket/path/to/dataset/file1.parquet") as f:
    f.read()

Advanced Init Arguments

Arguments to Connect to Alluxio Servers

  • etcd_hosts (str, required): A comma-separated list of ETCD server hosts in the format "host1:port1,host2:port2,...". ETCD is used for dynamic discovery of Alluxio workers.

  • etcd_port (int, optional): The port number used by each ETCD server. Defaults to 2379.

  • options (dict, optional): A dictionary of Alluxio configuration options where keys are property names and values are property values. These options configure the Alluxio client behavior.

Example: Configure Alluxio fsspec. Note that the following options much be the same between alluxiofs and alluxio servers

  • alluxio.worker.page.store.page.size (default 1MB): Size of each page in worker paged block store. Recommend to set to 20MB for large parquet files.

  • alluxio.user.worker.selection.policy.consistent.hash.virtual.node.count.per.worker (default 2000): This is the number of virtual nodes for one worker in the consistent hashing algorithm. In a consistent hashing algorithm, on membership changes, some virtual nodes are re-distributed instead of rebuilding the whole hash table. This guarantees the hash table is changed only in a minimal. In order to achieve that, the number of virtual nodes should be X times the physical nodes in the cluster, where X is a balance between redistribution granularity and size. Recommend to set to 5.

alluxio_options = {
    "alluxio.user.worker.selection.policy.consistent.hash.virtual.node.count.per.worker": "2000",
    "alluxio.worker.page.store.page.size": "1MB",
    }

(Optional) Init alluxio_client for distributed load operations:

alluxio_client = AlluxioClient(etcd_hosts="host1,host2,host3", etcd_port=8888, options=alluxio_options)

Init alluxio_fs for fsspec filesystem operations:

alluxio_fs = fsspec.filesystem("alluxiofs", etcd_hosts="localhost", target_protocol="s3", options=alluxio_options)

Arguments for storage backend

Arguments:

  • target_protocol (str, optional): Specifies the under storage protocol to create the under storage file system object. Common examples include s3 for Amazon S3, hdfs for Hadoop Distributed File System, and others.

  • target_options (dict, optional): Provides a set of configuration options relevant to the target_protocol. These options might include credentials, endpoint URLs, and other protocol-specific settings required to successfully interact with the under storage system.

  • fs (object, optional): Directly supplies an instance of a file system object for accessing the underlying storage of Alluxio

Example: connect to S3

To connect to S3, you can follow these steps:

  1. Review S3 fsspec documentation: Refer to the s3fs documentation to find out the supported arguments for connecting to S3. Typical arguments include:

  • anon bool (False): Whether to use anonymous connection (public buckets only). If False, uses the key/secret given, or boto's credential resolver; client_kwargs, environment, variables, config files, EC2 IAM server, in that order

  • endpoint_url string (None): Use this endpoint_url, if specified. Needed for connecting to non-AWS S3 buckets. Takes precedence over endpoint_url in client_kwargs.

  • key string (None): If not anonymous, use this access key ID, if specified. Takes precedence over aws_access_key_id in client_kwargs.

  • secret string (None): If not anonymous, use this secret access key, if specified. Takes precedence over aws_secret_access_key in client_kwargs.

  • token string (None): If not anonymous, use this security token, if specified

  1. Pass the supported arguments as target_options to Alluxio: You can then use these arguments to create an Alluxio file system object using fsspec.

Here's how to create an Alluxio file system object connected to S3 using fsspec:

import fsspec

# Example configuration for connecting to S3
s3_options = {
    "key": "your-aws-access-key",
    "secret": "your-aws-secret-key",
    "endpoint_url": "https://s3.your-endpoint.com"
}

# Creating the Alluxio file system object
alluxio_fs = fsspec.filesystem(
    "alluxiofs",
    etcd_hosts="localhost",
    target_protocol="s3",
    target_options=s3_options
)

# Now you can use alluxio_fs to interact with the Alluxio file system backed by S3

In this example:

  • Replace your-aws-access-key and your-aws-secret-key with your actual AWS credentials.

  • Replace https://s3.your-endpoint.com with the appropriate endpoint URL for your S3-compatible service if needed.

By following these steps, you can effectively connect to Alluxio with an S3 backend using fsspec.

Last updated