# Python FSSpec API

{% hint style="warning" %}
此功能为实验性功能。
{% endhint %}

Alluxio Python SDK (`alluxiofs`) 基于 [FSSpec](https://filesystem-spec.readthedocs.io/en/latest/api.html)，它允许应用程序使用统一的 Python 文件系统接口与各种存储后端无缝交互。它利用高性能的分布式缓存层，即 Alluxio 集群，显著提高数据访问速度并减少延迟。这对于数据密集型应用程序和工作流，尤其是需要快速、重复访问大型数据集的 AI 训练工作负载，尤其有益。

## 先决条件

* 一个正在运行的 Alluxio 集群
* Python >= 3.10（`alluxiofs` 使用 Python 3.10 引入的联合类型语法；在 3.9 及更低版本上安装虽然成功，但 `import alluxiofs` 会报 `SyntaxError`）

## 安装

### 安装依赖

#### 安装 alluxiofs

```
pip install alluxiofs
```

#### （可选）安装基于 FSSpec 的 UFS 客户端（例如 s3, oss）

示例:如果您的底层存储（UFS）是 S3，并且您希望具备在 Alluxio 故障后 fallback 到 UFS 的能力，您需要下载 [s3fs](https://s3fs.readthedocs.io/en/latest/)

```
pip install s3fs
```

### 环境准备

1. **启动 Alluxio 集群，至少包含 1 个 coordinator 和 1 个 worker**

* 如果您希望在 Kubernetes 中启动集群，请参考[Alluxio 官方文档](/ee-ai-cn/start/installing-on-kubernetes.md)。
* 如果您希望在物理裸机上启动集群，可以使用 `./bin/alluxio process start` 命令行工具。

2. **使用 FSSpec 与 Alluxio 交互**\
   关于具体的细节，参考[创建alluxiofs实例](#创建alluxiofs实例)和[alluxiofs基本的文件操作](#alluxiofs基本的文件操作)

## 创建alluxiofs实例

以下是一个简单的文件系统处理程序创建过程，用于连接到 Alluxio 集群, 负载均衡服务的域名是 localhost，使用 S3 作为底层存储。

> **路径格式**：传给 `alluxiofs` 的路径必须是 **UFS 原生路径**（如 `my-bucket/path/to/file`），而不是 Alluxio 虚拟挂载路径（如 `/s3/path/to/file`）。使用虚拟路径会导致 `unstrip_protocol` 将挂载点误解析为 bucket 名，产生格式错误的 S3 URI。

```
import fsspec
from alluxiofs import AlluxioFileSystem, setup_logger

# Register Alluxio to fsspec
fsspec.register_implementation("alluxiofs", AlluxioFileSystem, clobber=True)

# Create Alluxio filesystem
alluxio_options = {
    "alluxio.worker.page.store.page.size": "4MB",
}
alluxio_fs = fsspec.filesystem(
    "alluxiofs",
    load_balance_domain="localhost", # 或 worker_hosts="127.0.0.1, 127.0.0.2"
    target_protocol="s3",
    options=alluxio_options
    logger=setup_logger("./", level=logging.DEBUG),
)
```

有关Alluxio集群和/或存储系统连接的高级参数设置，请参见[高级初始化参数](#高级初始化参数)。

## alluxiofs基本的文件操作

以下是 `alluxio_fs` 支持的文件和目录常用操作：

### 1. `ls`

```python
ls(self, path: str, detail: bool=False) -> list[dict]
```

列出目录内容

**参数**

• `path (str)`: 目标目录路径\
• `detail (bool, 可选)`: 为True时返回详细条目信息，默认为False

**返回值**

• `list[dict]`: 包含JSON格式信息的字典列表

**示例**

```python
contents = alluxio_fs.ls("/path/to/directory", False)
print(contents)
```

### 2. `info`

```python
info(self, path: str) -> dict
```

获取文件/目录信息

**参数**

• `path (str)`: 目标路径

**返回值**

• `dict`: JSON格式的文件/目录信息

**示例**

```python
file_info = alluxio_fs.info("/path/to/file")
print(file_info)
```

### 3. `isdir`

```python
isdir(self, path: str) -> bool
```

检查路径是否为目录

**参数**

• `path (str)`: 目标路径

**返回值**

• `bool`: 是否为目录

**示例**

```python
is_directory = alluxio_fs.isdir("/path/to/directory")
print(is_directory)
```

### 4. `open`

```python
open(self, path: str, mode: str="rb", block_size: int=None, autocommit: bool=True, cache_options: dict=None, **kwargs) -> AlluxioFile
```

打开文件进行读写操作

**参数**

• `path (str)`: 文件路径\
• `mode (str, 可选)`: 打开模式，默认为"rb"\
• `block_size (int, 可选)`: 读取块大小\
• `autocommit (bool, 可选)`: 是否自动提交更改\
• `cache_options (dict, 可选)`: 缓存选项\
• `**kwargs`: 其他关键字参数

**返回值**

• `AlluxioFile`: 支持`read()`/`write()`等操作的文件对象

**示例**

```python
with alluxio_fs.open("/path/to/file", mode="rb") as f:
    data = f.read()
    print(data)

with alluxio_fs.open("/path/to/file", mode="wb") as f:
    f.write(data)
```

### 5. `mkdir`

```python
mkdir(self, path: str) -> bool
```

创建新目录

**参数**

• `path (str)`: 新目录路径

**返回值**

• `bool`: 是否创建成功

**示例**

```python
alluxio_fs.mkdir("/path/to/new_directory")
```

### 6. `rm`

```python
rm(self, path: str, recursive: bool=False, recursiveAlias: bool=False, removeAlluxioOnly: bool=False, deleteMountPoint: bool=False, syncParentNextTime: bool=False, removeUncheckedOptionChar: bool=False) -> bool
```

删除文件或目录

**参数**: 多个控制选项标志

**返回值**

• `bool`: 是否操作成功

**示例**

```python
alluxio_fs.rm("/path/to/file", recursive=True)
```

### 7. `touch`

```python
touch(self, path: str) -> bool
```

创建空文件

**参数**

• `path (str)`: 文件路径

**返回值**

• `bool`: 是否创建成功

**示例**

```python
alluxio_fs.touch("/新文件路径")
```

### 8. `head`

```python
head(self, path: str, num_of_bytes: int=1024) -> bytes
```

读取文件开头部分

**参数**

• `path (str)`: 文件路径\
• `num_of_bytes (int)`: 读取字节数

**返回值**

• `bytes`: 读取的字节数据

**示例**

```python
data = alluxio_fs.head("/path/to/file", num_of_bytes=1024)
print(data)
```

### 9. `tail`

```python
tail(self, path: str, num_of_bytes: int=1024) -> bytes
```

读取文件末尾部分

**参数**

• `path (str)`: 文件路径\
• `num_of_bytes (int)`: 读取字节数

**返回值**

• `bytes`: 读取的字节数据

**示例**

```python
data = alluxio_fs.tail("/path/to/file", num_of_bytes=1024)
print(data)
```

### 10. `mv`

```python
mv(self, path1: str, path2: str) -> bool
```

移动或重命名文件/目录

**参数**

• `path1 (str)`: 源路径\
• `path2 (str)`: 目标路径

**返回值**

• `bool`: 是否操作成功

**示例**

```python
alluxio_fs.mv("/源路径", "/path/to/file")
```

### 11. `cp`

```python
cp(self, path1: str, path2: str, recursive: bool=False, recursiveAlias: bool=False, force: bool=False, thread: int=None, bufferSize: int=None, preserve: bool=None) -> bool
```

复制文件或目录

**参数**: 递归/强制/线程等多种选项

**返回值**

• `bool`: 是否复制成功

**示例**

```python
alluxio_fs.cp("/源路径", "/path/to/file", recursive=True)
```

### 12. `read_bytes`

```python
read_bytes(self, path: str) -> bytes
```

读取整个文件（适用于<10MB的小文件）

**参数**

• `path (str)`: 文件路径

**返回值**

• `bytes`: 文件完整内容

**示例**

```python
data = alluxio_fs.read_bytes("/path/to/file")
print(data)
```

### 13. `write_bytes`

```python
write(self, path: str, value: bytes) -> bool
```

将字节数据写入 Alluxio 中的文件。

**参数**

• `path (str)`: 文件路径 • `value (bytes)`: 字节内容

**返回值**

• `bool`: 写入是否成功

**示例**

```python
alluxio_fs.write_bytes("/path/to/file", b"data to write")
```

### 14. `upload`

<pre class="language-python"><code class="lang-python"><strong>upload(self, lpath: str, rpath: str) -> bool
</strong></code></pre>

从本地文件系统上传大文件到Alluxio（根据WriteType可能同时写入UFS）。该方法采用分块读取和流式写入方式。

**参数**

• `lpath (str)`: 本地文件系统源路径\
• `rpath (str)`: Alluxio/UFS目标路径

**返回值**

• `bool`: 是否上传成功

**示例**

```python
# 上传大文件到Alluxio
alluxio_fs.upload("/path/to/ocal", "/path/to/ufs")
```

### 15. `download`

```python
download(self, lpath: str, rpath: str) -> bool
```

将 Alluxio 中的大文件下载到本地文件系统中。

**参数**

* `lpath (str)`：本地文件系统中的目标路径。
* `rpath (str)`：Alluxio 中的源路径。

**返回值**

* `bool`：下载任务是否成功完成。

**示例**

```python
# 从 Alluxio 下载大文件
alluxio_fs.download("/local/path", "/alluxio/path")
```

更多关于fsspec的Python文件系统操作示例，可以参见[fsspec官方文档](https://filesystem-spec.readthedocs.io/en/latest/usage.html#use-a-file-system)。

## 与其他框架的集成

### 举例：Ray

[Ray](https://www.ray.io/)是一个快速且简单的框架，用于构建和运行分布式应用程序。 在Ray之上运行的PyTorch，TensorFlow和XGBoost训练器可以利用Ray的高级功能，如创建异构集群，用CPU机器来进行数据加载和预处理，用GPU机器来进行训练。 可以使用Ray Data并行化进行数据加载、预处理和训练。

像PyTorch这样的训练器会在每个时期重复读取相同的数据集。 对于PyTorch来说，在每个 epoch 获取大量的数据集已经成为训练的瓶颈。 通过利用Alluxio的高性能分布式缓存，Ray上的训练器可以减少总训练时间，提高GPU利用率，并缩短端到端的模型生命周期。

先决条件: Ray版本 >= 2.8.2

```
# Pass the initialized Alluxio filesystem to Ray and read the dataset
ds = ray.data.read_parquet("s3://bucket/path/to/dataset/file1.parquet", filesystem=alluxio_fs)

# Get a count of the number of records in the single file
ds.count()

# Display the schema derived from the file header record
ds.schema()

# Display the header record
ds.take(1)

# Display the first data record
ds.take(2)

# Read multiple files:
ds2 = ray.data.read_parquet("s3://bucket/path/to/dataset/", filesystem=alluxio_fs)

# Get a count of the number of records in the files
ds2.count()
```

### PyArrow

PyArrow通过提供高性能的内存列存储格式，可以让应用程序和数据无缝连接。 它实现了不同数据处理系统之间的高效数据交换。 通过将其存储接口委托给fsspec，PyArrow可以通过统一的接口访问各种存储后端。 通过使用alluxiofs，PyArrow可以利用Alluxio的分布式缓存能力来提高数据访问速度并降低延迟。

示例1：

```
# Pass the initialized Alluxio filesystem to Pyarrow and read the data set from the example csv file
import pyarrow.dataset as ds
dataset = ds.dataset("s3://bucket/path/to/dataset/file1.parquet", filesystem=alluxio_fs)

# Get a count of the number of records in the file
dataset.count_rows()

# Display the schema derived from the file header record
dataset.schema

# Display the first record
dataset.take(0)
```

示例2：

```
from pyarrow.fs import PyFileSystem, FSSpecHandler

# Create a python-based PyArrow filesystem using FsspecHandler
py_fs = PyFileSystem(FSSpecHandler(alluxio_fs))

# Read the data by using the Pyarrow filesystem interface
with py_fs.open_input_file("s3://bucket/path/to/dataset/file1.parquet") as f:
    f.read()
```

## 高级初始化参数

### 连接Alluxio集群的参数

* **options** (dict, 可选): 一个包含Alluxio配置选项的字典，其中键是属性名称，值是属性值。 这些选项用于配置Alluxio客户端的行为。

示例：配置 Alluxio fsspec。请注意，以下选项在 alluxiofs 和 Alluxio 集群之间必须保持一致。

* `alluxio.worker.page.store.page.size`（默认值 1MB）：Worker 分页块存储中每个页面的大小。对于大型 Parquet 文件，建议设置为 20MB。
* `alluxio.user.worker.selection.policy.consistent.hash.virtual.node.count.per.worker`（默认值 `2000`）：在一致性哈希算法中，每个 Worker 的虚拟节点数。在成员变更时，一致性哈希算法会重新分配部分虚拟节点，而不是重建整个哈希表，从而尽可能减少哈希表的变更。为了实现这一点，虚拟节点数应为集群中物理节点数的 X 倍，其中 X 是虚拟节点分配粒度和规模之间的平衡参数。建议将此值设置为 5。

```
alluxio_options = {
    "alluxio.user.worker.selection.policy.consistent.hash.virtual.node.count.per.worker": "2000",
    "alluxio.worker.page.store.page.size": "4MB",
}
```

（可选）初始化 `alluxio_client` 以进行分布式加载作业:

```
alluxio_client = AlluxioClient(load_balance_domain="localhost")
```

初始化 `alluxio_fs` 以进行 fsspec 文件系统操作:

```
alluxio_fs = fsspec.filesystem(
    "alluxiofs",
    load_balance_domain="localhost", # 或 worker_hosts="127.0.0.1, 127.0.0.2"
    target_protocol="s3",
    options=alluxio_options
)
```

配置 `alluxio_fs` 的logger

```
import logging
import fsspec
from alluxiofs import AlluxioFileSystem, setup_logger

alluxio_fs = fsspec.filesystem(
    "alluxiofs",
    load_balance_domain="localhost", # 或 worker_hosts="127.0.0.1, 127.0.0.2"
    logger=setup_logger("./", level=logging.DEBUG)
)
```

### 底层存储的参数

参数：

* **target\_protocol** (str, 可选): 指定要创建底层存储文件系统对象的底层存储协议。 常见的示例包括`s3`用于Amazon S3，`hdfs` 用于Hadoop分布式文件系统等。
* **target\_options** (dict, 可选): 提供一组与 `target_protocol` 相关的配置选项。 这些选项可能包括凭证、终端节点URL和其他与底层存储系统成功交互所需的特定协议的设置。
* **fs** (object, 可选): 直接提供用于访问Alluxio底层存储的文件系统对象实例。
* **logger**(object，可选): 配置日志文件的存储路径和日志级别，路径默认为当前路径，日志级别默认为 `logging.INFO`。

#### 示例: 连接到 S3

要连接到S3，可以按照以下步骤进行：

1. 查看S3 fsspec文档：参考[s3fs文档](https://s3fs.readthedocs.io/en/latest/) 以了解连接到S3所支持的参数。典型的参数包括：

* **anon** bool (False): 是否使用匿名连接（仅限公共存储桶）。 如果为False，则使用给定的key/secret，或boto的凭证解析器；client\_kwargs、环境变量、配置文件、EC2 IAM服务器，依次进行。
* **endpoint\_url** string (None): 使用此endpoint\_url（如指定）。连接到非AWS S3存储桶时会需要用到。优先于client\_kwargs中的 `endpoint_url`。
* **key** string (None): 非匿名的情况下，请使用此访问密钥ID（如指定）。优先于client\_kwargs中的 `aws_access_key_id`。
* **secret** string (None): 非匿名的情况下，请使用此密钥访问密钥（如指定）。优先于client\_kwargs中的 `aws_secret_access_key`。
* **token** string (None): 非匿名的情况下，请使用此安全 token（如指定）。

2. 将支持的参数作为target\_options传递给Alluxio：然后，您可以使用这些参数使用fsspec创建Alluxio文件系统对象。

以下是如何使用fsspec创建连接到S3的Alluxio文件系统对象的示例：

```
import fsspec

# Example configuration for connecting to S3
s3_options = {
    "key": "your-aws-access-key",
    "secret": "your-aws-secret-key",
    "endpoint_url": "https://s3.your-endpoint.com"
}

# Creating the Alluxio file system object
alluxio_fs = fsspec.filesystem(
    "alluxiofs",
    load_balance_domain="localhost", # 或 worker_hosts="127.0.0.1, 127.0.0.2"
    target_options=s3_options
)

# Now you can use alluxio_fs to interact with the Alluxio file system backed by S3
```

在此示例中：

* 将`your-aws-access-key` 和 `your-aws-secret-key` 替换为您实际的AWS凭证。
* 如果需要，将 `https://s3.your-endpoint.com` 替换为合适的且与您的S3兼容的服务端点URL。

按照这些步骤，您可以使用fsspec有效地连接到使用S3作为底层存储的Alluxio集群。

## 监控指标

### 监控系统搭建

1. **Prometheus 安装与配置**\
   修改 `prometheus.yml` 配置文件，然后启动 Prometheus：

   ```bash
   nohup ./prometheus --web.enable-admin-api --config.file=./prometheus.yml >./prometheus.log 2>&1 &
   ```
2. **Grafana 安装与配置**\
   启动 Grafana：

   ```bash
   nohup ./bin/grafana-server --homepath . web >./grafana.log 2>&1 &
   ```

详细的指标监控系统搭建可以参考 [Alluxio 官方文档](/ee-ai-cn/administration/monitoring-alluxio.md)。

### 监控指标的说明

| **指标名称**                                   | **描述**                                             | **标签**              | **单位**     | **实现代码**           |
| ------------------------------------------ | -------------------------------------------------- | ------------------- | ---------- | ------------------ |
| `alluxio_http_server_call_latency_ms`      | HTTP 服务调用延迟的直方图（Bucket 边界: \[10, 40, 160, 640] 毫秒） | `method`, `success` | 毫秒 (ms)    | `HistogramWrapper` |
| `alluxio_http_server_result_total`         | HTTP 服务结果的总计数                                      | `method`, `state`   | 次数 (count) | `CounterWrapper`   |
| `alluxio_http_server_call_latency_ms_sum`  | HTTP 服务调用延迟总和                                      | `method`, `success` | 毫秒 (ms)    | `HistogramWrapper` |
| `aluxio_http_server_call_latency_ms_count` | HTTP 服务调用次数                                        | `method`, `success` | 次数 (count) | `HistogramWrapper` |


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://documentation.alluxio.io/ee-ai-cn/data-access/fsspec.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
