# Python API via FSSpec

{% hint style="warning" %}
This feature is experimental.
{% endhint %}

Alluxio Python SDK (`alluxiofs`) is based on [FSSpec](https://filesystem-spec.readthedocs.io/en/latest/api.html), which allows applications to seamlessly interact with various storage backends using a unified Python filesystem interface.It leverages a high-performance distributed caching layer, the Alluxio cluster, to significantly enhance data access speeds and reduce latency.This is particularly beneficial for data-intensive applications and workflows, especially AI training workloads, where large datasets need to be accessed quickly and repeatedly.

## Prerequisites

* A running Alluxio cluster
* Python's version >= 3.8

## Installation

### Install Dependencies

#### Install the underlying data lake storage(such as s3, oss)

Example: Deploy S3 as the underlying data lake storage [Install third-party S3 storage](https://s3fs.readthedocs.io/en/latest/)

```
pip install s3fs
```

#### Install alluxiofs

```
pip install alluxiofs
```

### Environment Setup

1. **Start an Alluxio cluster with at least 1 coordinator and 1 worker**

* If you want to start cluster in K8s, please refer to [Alluxio Official Documentation](https://documentation.alluxio.io/ee-ai-en/ai-3.7/start/installing-on-kubernetes)).
* If you want to start cluster in Bare Metal, you can use `./bin/alluxio process start` cli.

2. **Configure Alluxio with the UFS's Credentials**

```
#s3 related
s3a.accessKeyId=your-s3a-accessKeyId
s3a.secretKey=your-s3a-secretKey


#oss related
fs.oss.accessKeyId=your-oss-keyid
fs.oss.accessKeySecret=your-oss-secert
fs.oss.endpoint=your-oss-endpoint
```

3. **Interacting with alluxio using alluxiofs**

For details，you can go to [Create alluxiofs's Instance](#create-alluxiofss-instance) and [alluxiofs's Basic File Operations](#alluxiofss-basic-file-operations).

## Load Data into Alluxio Cluster

If the data has already been loaded into the Alluxio cluster, skip this step.

This simple example creates a client to connects to an Alluxio cluster with ETCD membership in Kubernetes with cluster\_name `alluxio` in namespace `ai` and S3 as the under storage.\
See [advanced arguments to connect to Alluxio servers](#arguments-to-connect-to-alluxio-cluster) for more configuration settings.

Submit distributed load job to Alluxio cluster:

```
from alluxiofs.client import AlluxioClient

alluxio_client = AlluxioClient(cluster_name="ai-alluxio", etcd_hosts="127.0.0.1", etcd_port=2379)
alluxio_client.submit_load("s3://bucket/path/to/dataset/")
```

This will trigger a load job asynchronously. You can wait until the load finishes or check the progress of this loading process using the following command:

```
alluxio_client.load_progress("s3://bucket/path/to/dataset/")
```

To cancel the distributed load job:

```
alluxio_client.stop_load("s3://bucket/path/to/dataset/")
```

## Create alluxiofs's Instance

This simple example creates a filesystem handler that connects to an Alluxio cluster with an ETCD membership in Kubernetes with cluster\_name `alluxio` in namespace `ai` and S3 as the under storage.

```
import fsspec
from alluxiofs import AlluxioFileSystem, setup_logger

# Register Alluxio to fsspec
fsspec.register_implementation("alluxiofs", AlluxioFileSystem, clobber=True)

# Create Alluxio filesystem
alluxio_options = {
    "alluxio.worker.page.store.page.size": "4MB",
}
alluxio_fs = fsspec.filesystem(
    "alluxiofs",
    cluster_name="ai-alluxio",
    etcd_hosts="127.0.0.1", # the ip address of etcd
    etcd_port=2379， # the port of etcd
    target_protocol="s3", 
    options=alluxio_options
    logger=setup_logger("./", level=logging.DEBUG),
)
```

See [advanced init arguments](#advanced-initialization-parameters) to set advanced arguments for Alluxio cluster and/or storage system connections.

## alluxiofs's Basic File Operations

Below are common operations supported by `alluxio_fs` for interacting with files and directories:

### 1. `ls`

```python
ls(self, path: str, detail: bool=False) -> list[dict]
```

Lists the contents of a directory.

**Parameters**

* `path (str)`: The path of the directory to list.
* `detail (bool, optional)`: If True, returns detailed information about each entry. Defaults to False.

**Returns**

* `list[dict]`: A list of dicts with JSON-like structure.

**Example**

```python
contents = alluxio_fs.ls("/path/to/directory", False)
print(contents)
```

### 2. `info`

```python
info(self, path: str) -> dict
```

Retrieves information about a file or directory.

**Parameters**

* `path (str)`: The path of the file or directory.

**Returns**

* `dict`: JSON-like structure with file or directory info.

**Example**

```python
file_info = alluxio_fs.info("/path/to/file")
print(file_info)
```

### 3. `isdir`

```python
isdir(self, path: str) -> bool
```

Checks if a path is a directory.

**Parameters**

* `path (str)`: The path to check.

**Returns**

* `bool`: Whether the path is a directory.

**Example**

```python
is_directory = alluxio_fs.isdir("/path/to/directory")
print(is_directory)
```

### 4. `_open`

```python
_open(self, path: str, mode: str="rb", block_size: int=None, autocommit: bool=True, cache_options: dict=None, **kwargs) -> AlluxioFile
```

Opens a file for reading or writing.

**Parameters**

* `path (str)`: The path of the file to open.
* `mode (str, optional)`: The mode in which to open the file. Defaults to "rb".
* `block_size (int, optional)`: The block size for reading.
* `autocommit (bool, optional)`: If True, commits changes automatically.
* `cache_options (dict, optional)`: Cache options.
* `**kwargs`: Additional keyword arguments.

**Returns**

* `AlluxioFile`: Object supporting `read()`, `write()`, etc.

**Example**

```python
with alluxio_fs._open("/path/to/file", mode="rb") as f:
    data = f.read()
    print(data)

with alluxio_fs._open("/path/to/file", mode="wb") as f:
    f.write(data)
```

### 5. `cat_file`

```python
cat_file(self, path: str, start: int=0, end: int=None) -> bytes
```

Reads a range of bytes from a file (for little files <10MB).

**Parameters**

* `path (str)`: The path of the file to read.
* `start (int, optional)`: Starting byte. Defaults to 0.
* `end (int, optional)`: Ending byte. Defaults to None.

**Returns**

* `bytes`: The read bytes.

**Example**

```python
data = alluxio_fs.cat_file("/path/to/file", start=0, end=100)
print(data)
```

### 6. `mkdir`

```python
mkdir(self, path: str) -> bool
```

Creates a directory.

**Parameters**

* `path (str)`: The path of the new directory.

**Returns**

* `bool`: Whether the directory was created successfully.

**Example**

```python
alluxio_fs.mkdir("/path/to/new_directory")
```

### 7. `rm`

```python
rm(self, path: str, recursive: bool=False, recursiveAlias: bool=False, removeAlluxioOnly: bool=False, deleteMountPoint: bool=False, syncParentNextTime: bool=False, removeUncheckedOptionChar: bool=False) -> bool
```

Removes a file or directory.

**Parameters**: Multiple optional flags for control.

**Returns**

* `bool`: Whether the operation succeeded.

**Example**

```python
alluxio_fs.rm("/path/to/file_or_directory", recursive=True)
```

### 8. `touch`

```python
touch(self, path: str) -> bool
```

Creates an empty file.

**Parameters**

* `path (str)`: File path.

**Returns**

* `bool`: Whether the file was created.

**Example**

```python
alluxio_fs.touch("/path/to/new_file")
```

### 9. `head`

```python
head(self, path: str, num_of_bytes: int=1024) -> bytes
```

Reads the first few bytes of a file.

**Parameters**

* `path (str)`: File path.
* `num_of_bytes (int)`: Number of bytes.

**Returns**

* `bytes`: The read bytes.

**Example**

```python
data = alluxio_fs.head("/path/to/file", num_of_bytes=1024)
print(data)
```

### 10. `tail`

```python
tail(self, path: str, num_of_bytes: int=1024) -> bytes
```

Reads the last few bytes of a file.

**Parameters**

* `path (str)`: File path.
* `num_of_bytes (int)`: Number of bytes.

**Returns**

* `bytes`: The read bytes.

**Example**

```python
data = alluxio_fs.tail("/path/to/file", num_of_bytes=1024)
print(data)
```

### 11. `mv`

```python
mv(self, path1: str, path2: str) -> bool
```

Moves or renames a file or directory.

**Parameters**

* `path1 (str)`: Source path.
* `path2 (str)`: Destination path.

**Returns**

* `bool`: Whether the move/rename succeeded.

**Example**

```python
alluxio_fs.mv("/path/to/source", "/path/to/destination")
```

### 12. `copy` / `cp_file`

```python
copy(self, path1: str, path2: str, recursive: bool=False, recursiveAlias: bool=False, force: bool=False, thread: int=None, bufferSize: int=None, preserve: bool=None) -> bool
```

Copies a file or directory.

**Parameters**: Various options for recursion, force, threading, etc.

**Returns**

* `bool`: Whether the copy succeeded.

**Example**

```python
alluxio_fs.copy("/path/to/source", "/path/to/destination", recursive=True)
```

### 13. `read`

```python
read(self, path: str) -> bytes
```

Reads an entire file (for little files <10MB).

**Parameters**

* `path (str)`: File path.

**Returns**

* `bytes`: The read content.

**Example**

```python
data = alluxio_fs.read("/path/to/file")
print(data)
```

***

### 14. `rename` / `move`

```python
rename(self, path1: str, path2: str) -> bool
```

Alias for `mv`. Renames or moves a file or directory.

**Parameters**

* `path1 (str)`: Source path.
* `path2 (str)`: Destination path.

**Returns**

* `bool`: Whether the operation succeeded.

**Example**

```python
alluxio_fs.rename("/path/old", "/path/new")
```

***

### 15. `cp_file`

```python
cp_file(self, path1: str, path2: str, recursive: bool=False) -> bool
```

Alias for `copy`. Copies a file or directory.

**Parameters**

* `path1 (str)`: Source path.
* `path2 (str)`: Destination path.
* `recursive (bool)`: Whether to copy recursively for directories.

**Returns**

* `bool`: Whether the copy succeeded.

**Example**

```python
alluxio_fs.cp_file("/path/source", "/path/destination", recursive=True)
```

***

### 15. `upload`

```python
upload(self, lpath: str, rpath: str) -> bool
```

Upload a large file from the local OS file system to Alluxio (and UFS, depending on the WriteType).

**Parameters**

* `lpath (str)`: The source path of the file in the local OS.
* `rpath (str)`: The destination path of the file in Alluxio/UFS.

**Returns**

* `bool`: Whether the upload task completed successfully.

**Example**

```python
# upload a large file to Alluxio
alluxio_fs.upload("/path/in/local", "/path/in/UFS")
```

### 17. `upload_data`

```python
upload_data(self, lpath: str, data: bytes) -> bool
```

Uploads a large file to Alluxio using a byte stream. Different from `upload` which reads from disk, `upload_data` accepts bytes directly.

**Parameters**

* `lpath (str)`: The destination path in Alluxio/UFS.
* `data (bytes)`: The byte content to upload.

**Returns**

* `bool`: Whether the upload task completed successfully.

**Example**

```python
# upload data as byte stream
alluxio_fs.upload_data("/path/in/UFS", b"binary data")
```

***

### 18. `download`

```python
download(self, lpath: str, rpath: str) -> bool
```

Downloads a large file from Alluxio to the local OS file system.

**Parameters**

* `lpath (str)`: The destination path in the local file system.
* `rpath (str)`: The source path in Alluxio.

**Returns**

* `bool`: Whether the download task completed successfully.

**Example**

```python
# download a large file from Alluxio
alluxio_fs.download("/local/path", "/alluxio/path")
```

***

### 19. `download_data`

```python
download_data(self, lpath: str) -> io.BytesIO
```

Downloads a file from Alluxio and returns it as an in-memory byte stream.

**Parameters**

* `lpath (str)`: The path of the file in Alluxio.

**Returns**

* `io.BytesIO`: A byte stream containing the file content.

**Example**

```python
# download as byte stream
byte_stream = alluxio_fs.download_data("/path/in/UFS")
data = byte_stream.read()
```

***

### 20. `write`

```python
write(self, path: str, value: bytes) -> bool
```

Writes byte data to a file in Alluxio. Equivalent to `upload_data`.

**Parameters**

* `path (str)`: Path to write the data to.
* `value (bytes)`: The byte content.

**Returns**

* `bool`: Whether the write succeeded.

**Example**

```python
alluxio_fs.write("/path/to/file", b"data to write")
```

***

More Python filesystem operations examples can be found [here](https://filesystem-spec.readthedocs.io/en/latest/usage.html#use-a-file-system).

## Integration with Other Frameworks

### Example: Ray

[Ray](https://www.ray.io/) is a fast and simple framework for building and running distributed applications. PyTorch, TensorFlow, and XGBoost trainers, running on top of Ray, can leverage Ray's advanced functionalities like creating heterogeneous clusters consisting of CPU machines for data loading and preprocessing, and GPU machines for training. Data loading, preprocessing, and training can be parallelized using Ray Data.

Trainers like PyTorch will read the same dataset again and again for each epoch. Getting a large dataset for PyTorch in each epoch becomes the training bottleneck. By leveraging Alluxio's high-performance distributed caching, trainers on Ray can reduce total training time, improve GPU utilization rate, and speed up the end-to-end model lifecycle.

Prerequisites: Ray version >= 2.8.2

```
# Pass the initialized Alluxio filesystem to Ray and read the dataset
ds = ray.data.read_parquet("s3://bucket/path/to/dataset/file1.parquet", filesystem=alluxio_fs)

# Get a count of the number of records in the single file
ds.count()

# Display the schema derived from the file header record
ds.schema()

# Display the header record
ds.take(1)

# Display the first data record
ds.take(2)

# Read multiple files:
ds2 = ray.data.read_parquet("s3://bucket/path/to/dataset/", filesystem=alluxio_fs)

# Get a count of the number of records in the files
ds2.count()
```

### Example: PyArrow

PyArrow allows applications and data to seamlessly connect with each other by providing a high-performance, in-memory columnar storage format. It enables efficient data interchange between different data processing systems. By delegating its storage interface to fsspec, PyArrow can access various storage backends through a unified interface. By using alluxiofs, PyArrow can leverage Alluxio's distributed caching capabilities to enhance data access speeds and reduce latency.

Example 1:

```
# Pass the initialized Alluxio filesystem to Pyarrow and read the data set from the example csv file
import pyarrow.dataset as ds
dataset = ds.dataset("s3://bucket/path/to/dataset/file1.parquet", filesystem=alluxio_fs)

# Get a count of the number of records in the file
dataset.count_rows()

# Display the schema derived from the file header record
dataset.schema

# Display the first record
dataset.take(0)
```

Example 2:

```
from pyarrow.fs import PyFileSystem, FSSpecHandler

# Create a python-based PyArrow filesystem using FsspecHandler
py_fs = PyFileSystem(FSSpecHandler(alluxio_fs))

# Read the data by using the Pyarrow filesystem interface
with py_fs.open_input_file("s3://bucket/path/to/dataset/file1.parquet") as f:
    f.read()
```

## Advanced Initialization Parameters

### Arguments to Connect to Alluxio Cluster

* **etcd\_hosts** (str, required): A comma-separated list of ETCD server hosts in the format "host1:port1,host2:port2,...". ETCD is used for dynamic discovery of Alluxio workers.
* **etcd\_port** (int, optional): The port number used by each ETCD server. Defaults to `2379`.
* **options** (dict, optional): A dictionary of Alluxio configuration options where keys are property names and values are property values. These options configure the Alluxio client behavior.

Example: Configure Alluxio fsspec. Note that the following options must be the same between alluxiofs and alluxio cluster

* `alluxio.worker.page.store.page.size` (default `1MB`): Size of each page in worker paged block store. Recommend to set to `20MB` for large parquet files.
* `alluxio.user.worker.selection.policy.consistent.hash.virtual.node.count.per.worker` (default `2000`): This is the number of virtual nodes for one worker in the consistent hashing algorithm. In a consistent hashing algorithm, on membership changes, some virtual nodes are re-distributed instead of rebuilding the whole hash table. This guarantees the hash table is changed only in a minimal. In order to achieve that, the number of virtual nodes should be X times the physical nodes in the cluster, where X is a balance between redistribution granularity and size. Recommend to set to `5`.

```
alluxio_options = {
    "alluxio.user.worker.selection.policy.consistent.hash.virtual.node.count.per.worker": "2000",
    "alluxio.worker.page.store.page.size": "4MB",
}
```

(Optional) Init `alluxio_client` for distributed load operations:

```
alluxio_client = AlluxioClient(etcd_hosts="host1,host2,host3", etcd_port=8888, options=alluxio_options)
```

Init `alluxio_fs` for fsspec filesystem operations:

```
alluxio_fs = fsspec.filesystem("alluxiofs", cluster_name="ai-alluxio", etcd_hosts="127.0.0.1", etcd_port=2379, target_protocol="s3", options=alluxio_options)
```

config the logger of `alluxio_fs`

```
import logging
import fsspec
from alluxiofs import AlluxioFileSystem, setup_logger

alluxio_fs = fsspec.filesystem("alluxiofs", cluster_name="ai-alluxio", etcd_hosts="127.0.0.1", etcd_port=2379, logger=setup_logger("./", level=logging.DEBUG))
```

### Arguments for storage backend

Arguments:

* **target\_protocol** (str, optional): Specifies the under storage protocol to create the under storage file system object. Common examples include `s3` for Amazon S3, `hdfs` for Hadoop Distributed File System, and others.
* **target\_options** (dict, optional): Provides a set of configuration options relevant to the `target_protocol`. These options might include credentials, endpoint URLs, and other protocol-specific settings required to successfully interact with the under storage system.
* **fs** (object, optional): Directly supplies an instance of a file system object for accessing the underlying storage of Alluxio
* **logger**(object， optional): config the the path to store log files and level of logger, the path is the current path default, and the level is logging.INFO default.

#### Example: connect to S3

To connect to S3, you can follow these steps:

1. Review S3 fsspec documentation: Refer to the [s3fs documentation](https://s3fs.readthedocs.io/en/latest/) to find out the supported arguments for connecting to S3. Typical arguments include:

* **anon** bool (False): Whether to use anonymous connection (public buckets only). If False, uses the key/secret given, or boto's credential resolver; client\_kwargs, environment, variables, config files, EC2 IAM server, in that order
* **endpoint\_url** string (None): Use this endpoint\_url, if specified. Needed for connecting to non-AWS S3 buckets. Takes precedence over `endpoint_url` in client\_kwargs.
* **key** string (None): If not anonymous, use this access key ID, if specified. Takes precedence over `aws_access_key_id` in client\_kwargs.
* **secret** string (None): If not anonymous, use this secret access key, if specified. Takes precedence over `aws_secret_access_key` in client\_kwargs.
* **token** string (None): If not anonymous, use this security token, if specified

2. Pass the supported arguments as target\_options to Alluxio: You can then use these arguments to create an Alluxio file system object using fsspec.

Here's how to create an Alluxio file system object connected to S3 using fsspec:

```
import fsspec

# Example configuration for connecting to S3
s3_options = {
    "key": "your-aws-access-key",
    "secret": "your-aws-secret-key",
    "endpoint_url": "https://s3.your-endpoint.com"
}

# Creating the Alluxio file system object
alluxio_fs = fsspec.filesystem(
    "alluxiofs",
    cluster_name="ai-alluxio", 
    etcd_hosts="127.0.0.1", 
    etcd_port=2379,
    target_protocol="s3",
    target_options=s3_options
)

# Now you can use alluxio_fs to interact with the Alluxio file system backed by S3
```

In this example:

* Replace `your-aws-access-key` and `your-aws-secret-key` with your actual AWS credentials.
* Replace `https://s3.your-endpoint.com` with the appropriate endpoint URL for your S3-compatible service if needed.

By following these steps, you can effectively connect to Alluxio with an S3 backend using fsspec.

## Monitoring Metrics

### Monitoring System Setup

1. **Prometheus Installation and Configuration**\
   Edit the `prometheus.yml` configuration file and then start Prometheus:

   ```bash
   nohup ./prometheus --web.enable-admin-api --config.file=./prometheus.yml >./prometheus.log 2>&1 &
   ```
2. **Grafana Installation and Configuration**\
   Start Grafana:

   ```bash
   nohup ./bin/grafana-server --homepath . web >./grafana.log 2>&1 &
   ```

For more detailed instructions on setting up the monitoring system, refer to the [Alluxio Official Documentation](https://documentation.alluxio.io/ee-ai-en/ai-3.7/administration/monitoring-alluxio).

### Explanation of Monitoring Metrics

| **Metric Name**                             | **Description**                                                                    | **Labels**          | **Unit**          | **Implementation Code** |
| ------------------------------------------- | ---------------------------------------------------------------------------------- | ------------------- | ----------------- | ----------------------- |
| `alluxio_http_server_call_latency_ms`       | Histogram of HTTP service call latency (Bucket boundaries: \[10, 40, 160, 640] ms) | `method`, `success` | Milliseconds (ms) | `HistogramWrapper`      |
| `alluxio_http_server_result_total`          | Total count of HTTP service results                                                | `method`, `state`   | Count             | `CounterWrapper`        |
| `alluxio_http_server_call_latency_ms_sum`   | Total latency of HTTP service calls                                                | `method`, `success` | Milliseconds (ms) | `HistogramWrapper`      |
| `alluxio_http_server_call_latency_ms_count` | Count of HTTP service calls                                                        | `method`, `success` | Count             | `HistogramWrapper`      |
