# 通过 FSSpec 的 Python API

{% hint style="warning" %}
此功能为实验性功能。
{% endhint %}

Alluxio Python SDK (`alluxiofs`) 基于 [FSSpec](https://filesystem-spec.readthedocs.io/en/latest/api.html)，它允许应用程序使用统一的 Python 文件系统接口与各种存储后端无缝交互。它利用高性能的分布式缓存层，即 Alluxio 集群，显著提高数据访问速度并减少延迟。这对于数据密集型应用程序和工作流，尤其是需要快速、重复访问大型数据集的 AI 训练工作负载，尤其有益。

## 先决条件

* 一个正在运行的 Alluxio 集群
* Python 版本 >= 3.8

## 安装

### 安装依赖

#### 安装底层数据湖存储（例如 s3、oss）

示例：将 S3 部署为底层数据湖存储 [安装第三方 S3 存储](https://s3fs.readthedocs.io/en/latest/)

```
pip install s3fs
```

#### 安装 alluxiofs

```
pip install alluxiofs
```

### 环境设置

1. **启动至少有 1 个Coordinator和 1 个 worker 的 Alluxio 集群**
   * 如果要在 K8s 中启动集群，请参阅 [Alluxio 官方文档](/ee-ai-cn/ai-3.7/start/installing-on-kubernetes.md)。
   * 如果要在裸机上启动集群，可以使用 `./bin/alluxio process start` cli。
2. **使用 UFS 的凭据配置 Alluxio**

```
#s3 相关
s3a.accessKeyId=your-s3a-accessKeyId
s3a.secretKey=your-s3a-secretKey


#oss 相关
fs.oss.accessKeyId=your-oss-keyid
fs.oss.accessKeySecret=your-oss-secert
fs.oss.endpoint=your-oss-endpoint
```

3. **使用 alluxiofs 与 alluxio 交互**

有关详细信息，请参阅[创建 alluxiofs 实例](#create-alluxiofss-instance)和[alluxiofs 基本文件操作](#alluxiofss-basic-file-operations)。

## 将数据加载到 Alluxio 集群

如果数据已加载到 Alluxio 集群中，请跳过此步骤。

这个简单的示例创建了一个客户端，该客户端连接到 Kubernetes 中具有 ETCD 成员资格的 Alluxio 集群，集群名称为 `alluxio`，命名空间为 `ai`，并使用 S3 作为底层存储。 有关更多配置设置，请参阅[连接到 Alluxio 服务器的高级参数](#arguments-to-connect-to-alluxio-cluster)。

向 Alluxio 集群提交分布式加载作业：

```python
from alluxiofs.client import AlluxioClient

alluxio_client = AlluxioClient(cluster_name="ai-alluxio", etcd_hosts="127.0.0.1", etcd_port=2379)
alluxio_client.submit_load("s3://bucket/path/to/dataset/")
```

这将异步触发加载作业。您可以使用以下命令等待加载完成或检查此加载过程的进度：

```python
alluxio_client.load_progress("s3://bucket/path/to/dataset/")
```

要取消分布式加载作业：

```python
alluxio_client.stop_load("s3://bucket/path/to/dataset/")
```

## 创建 alluxiofs 实例

这个简单的示例创建了一个文件系统处理程序，该处理程序连接到 Kubernetes 中具有 ETCD 成员资格的 Alluxio 集群，集群名称为 `alluxio`，命名空间为 `ai`，并使用 S3 作为底层存储。

```python
import fsspec
from alluxiofs import AlluxioFileSystem, setup_logger

# Register Alluxio to fsspec
fsspec.register_implementation("alluxiofs", AlluxioFileSystem, clobber=True)

# Create Alluxio filesystem
alluxio_options = {
    "alluxio.worker.page.store.page.size": "4MB",
}
alluxio_fs = fsspec.filesystem(
    "alluxiofs",
    cluster_name="ai-alluxio",
    etcd_hosts="127.0.0.1", # the ip address of etcd
    etcd_port=2379， # the port of etcd
    target_protocol="s3", 
    options=alluxio_options
    logger=setup_logger("./", level=logging.DEBUG),
)
```

有关为 Alluxio 集群和/或存储系统连接设置高级参数，请参阅[高级初始化参数](#advanced-initialization-parameters)。

## alluxiofs 的基本文件操作

以下是 `alluxio_fs` 支持的用于与文件和目录交互的常见操作：

### 1. `ls`

```python
ls(self, path: str, detail: bool=False) -> list[dict]
```

列出目录的内容。

**参数**

* `path (str)`: 要列出的目录的路径。
* `detail (bool, optional)`: 如果为 True，则返回有关每个条目的详细信息。默认为 False。

**返回**

* `list[dict]`: 具有类 JSON 结构的字典列表。

**示例**

```python
contents = alluxio_fs.ls("/path/to/directory", False)
print(contents)
```

### 2. `info`

```python
info(self, path: str) -> dict
```

检索有关文件或目录的信息。

**参数**

* `path (str)`: 文件或目录的路径。

**返回**

* `dict`: 具有文件或目录信息的类 JSON 结构。

**示例**

```python
file_info = alluxio_fs.info("/path/to/file")
print(file_info)
```

### 3. `isdir`

```python
isdir(self, path: str) -> bool
```

检查路径是否为目录。

**参数**

* `path (str)`: 要检查的路径。

**返回**

* `bool`: 路径是否为目录。

**示例**

```python
is_directory = alluxio_fs.isdir("/path/to/directory")
print(is_directory)
```

### 4. `_open`

```python
_open(self, path: str, mode: str="rb", block_size: int=None, autocommit: bool=True, cache_options: dict=None, **kwargs) -> AlluxioFile
```

打开文件以进行读取或写入。

**参数**

* `path (str)`: 要打开的文件的路径。
* `mode (str, optional)`: 打开文件的模式。默认为 "rb"。
* `block_size (int, optional)`: 读取的块大小。
* `autocommit (bool, optional)`: 如果为 True，则自动提交更改。
* `cache_options (dict, optional)`: 缓存选项。
* `**kwargs`: 其他关键字参数。

**返回**

* `AlluxioFile`: 支持 `read()`、`write()` 等的对象。

**示例**

```python
with alluxio_fs._open("/path/to/file", mode="rb") as f:
    data = f.read()
    print(data)

with alluxio_fs._open("/path/to/file", mode="wb") as f:
    f.write(data)
```

### 5. `cat_file`

```python
cat_file(self, path: str, start: int=0, end: int=None) -> bytes
```

从文件中读取一定范围的字节（适用于小于 10MB 的小文件）。

**参数**

* `path (str)`: 要读取的文件的路径。
* `start (int, optional)`: 起始字节。默认为 0。
* `end (int, optional)`: 结束字节。默认为 None。

**返回**

* `bytes`: 读取的字节。

**示例**

```python
data = alluxio_fs.cat_file("/path/to/file", start=0, end=100)
print(data)
```

### 6. `mkdir`

```python
mkdir(self, path: str) -> bool
```

创建目录。

**参数**

* `path (str)`: 新目录的路径。

**返回**

* `bool`: 目录是否成功创建。

**示例**

```python
alluxio_fs.mkdir("/path/to/new_directory")
```

### 7. `rm`

```python
rm(self, path: str, recursive: bool=False, recursiveAlias: bool=False, removeAlluxioOnly: bool=False, deleteMountPoint: bool=False, syncParentNextTime: bool=False, removeUncheckedOptionChar: bool=False) -> bool
```

删除文件或目录。

**参数**: 用于控制的多个可选标志。

**返回**

* `bool`: 操作是否成功。

**示例**

```python
alluxio_fs.rm("/path/to/file_or_directory", recursive=True)
```

### 8. `touch`

```python
touch(self, path: str) -> bool
```

创建空文件。

**参数**

* `path (str)`: 文件路径。

**返回**

* `bool`: 文件是否已创建。

**示例**

```python
alluxio_fs.touch("/path/to/new_file")
```

### 9. `head`

```python
head(self, path: str, num_of_bytes: int=1024) -> bytes
```

读取文件的前几个字节。

**参数**

* `path (str)`: 文件路径。
* `num_of_bytes (int)`: 字节数。

**返回**

* `bytes`: 读取的字节。

**示例**

```python
data = alluxio_fs.head("/path/to/file", num_of_bytes=1024)
print(data)
```

### 10. `tail`

```python
tail(self, path: str, num_of_bytes: int=1024) -> bytes
```

读取文件的最后几个字节。

**参数**

* `path (str)`: 文件路径。
* `num_of_bytes (int)`: 字节数。

**返回**

* `bytes`: 读取的字节。

**示例**

```python
data = alluxio_fs.tail("/path/to/file", num_of_bytes=1024)
print(data)
```

### 11. `mv`

```python
mv(self, path1: str, path2: str) -> bool
```

移动或重命名文件或目录。

**参数**

* `path1 (str)`: 源路径。
* `path2 (str)`: 目标路径。

**返回**

* `bool`: 移动/重命名是否成功。

**示例**

```python
alluxio_fs.mv("/path/to/source", "/path/to/destination")
```

### 12. `copy` / `cp_file`

```python
copy(self, path1: str, path2: str, recursive: bool=False, recursiveAlias: bool=False, force: bool=False, thread: int=None, bufferSize: int=None, preserve: bool=None) -> bool
```

复制文件或目录。

**参数**: 用于递归、强制、线程等的各种选项。

**返回**

* `bool`: 复制是否成功。

**示例**

```python
alluxio_fs.copy("/path/to/source", "/path/to/destination", recursive=True)
```

### 13. `read`

```python
read(self, path: str) -> bytes
```

读取整个文件（适用于小于 10MB 的小文件）。

**参数**

* `path (str)`: 文件路径。

**返回**

* `bytes`: 读取的内容。

**示例**

```python
data = alluxio_fs.read("/path/to/file")
print(data)
```

***

### 14. `rename` / `move`

```python
rename(self, path1: str, path2: str) -> bool
```

`mv` 的别名。重命名或移动文件或目录。

**参数**

* `path1 (str)`: 源路径。
* `path2 (str)`: 目标路径。

**返回**

* `bool`: 操作是否成功。

**示例**

```python
alluxio_fs.rename("/path/old", "/path/new")
```

***

### 15. `cp_file`

```python
cp_file(self, path1: str, path2: str, recursive: bool=False) -> bool
```

`copy` 的别名。复制文件或目录。

**参数**

* `path1 (str)`: 源路径。
* `path2 (str)`: 目标路径。
* `recursive (bool)`: 是否为目录递归复制。

**返回**

* `bool`: 复制是否成功。

**示例**

```python
alluxio_fs.cp_file("/path/source", "/path/destination", recursive=True)
```

***

### 16. `upload`

```python
upload(self, lpath: str, rpath: str) -> bool
```

将大文件从本地操作系统文件系统上传到 Alluxio（以及 UFS，取决于 WriteType）。

**参数**

* `lpath (str)`: 本地操作系统中文件的源路径。
* `rpath (str)`: Alluxio/UFS 中文件的目标路径。

**返回**

* `bool`: 上传任务是否成功完成。

**示例**

```python
# 将大文件上传到 Alluxio
alluxio_fs.upload("/path/in/local", "/path/in/UFS")
```

### 17. `upload_data`

```python
upload_data(self, lpath: str, data: bytes) -> bool
```

使用字节流将大文件上传到 Alluxio。与从磁盘读取的 `upload` 不同，`upload_data` 直接接受字节。

**参数**

* `lpath (str)`: Alluxio/UFS 中的目标路径。
* `data (bytes)`: 要上传的字节内容。

**返回**

* `bool`: 上传任务是否成功完成。

**示例**

```python
# upload data as byte stream
alluxio_fs.upload_data("/path/in/UFS", b"binary data")
```

***

### 18. `download`

```python
download(self, lpath: str, rpath: str) -> bool
```

将大文件从 Alluxio 下载到本地操作系统文件系统。

**参数**

* `lpath (str)`: 本地文件系统中的目标路径。
* `rpath (str)`: Alluxio 中的源路径。

**返回**

* `bool`: 下载任务是否成功完成。

**示例**

```python
# download a large file from Alluxio
alluxio_fs.download("/local/path", "/alluxio/path")
```

***

### 19. `download_data`

```python
download_data(self, lpath: str) -> io.BytesIO
```

从 Alluxio 下载文件并将其作为内存中的字节流返回。

**参数**

* `lpath (str)`: Alluxio 中文件的路径。

**返回**

* `io.BytesIO`: 包含文件内容的字节流。

**示例**

```python
# download as byte stream
byte_stream = alluxio_fs.download_data("/path/in/UFS")
data = byte_stream.read()
```

***

### 20. `write`

```python
write(self, path: str, value: bytes) -> bool
```

将字节数据写入 Alluxio 中的文件。等同于 `upload_data`。

**参数**

* `path (str)`: 要写入数据的路径。
* `value (bytes)`: 字节内容。

**返回**

* `bool`: 写入是否成功。

**示例**

```python
alluxio_fs.write("/path/to/file", b"data to write")
```

***

更多 Python 文件系统操作示例可以在[这里](https://filesystem-spec.readthedocs.io/en/latest/usage.html#use-a-file-system)找到。

## 与其他框架集成

### 示例：Ray

[Ray](https://www.ray.io/) 是一个用于构建和运行分布式应用程序的快速简单的框架。在 Ray 上运行的 PyTorch、TensorFlow 和 XGBoost 训练器可以利用 Ray 的高级功能，例如创建由用于数据加载和预处理的 CPU 机器和用于训练的 GPU 机器组成的异构集群。数据加载、预处理和训练可以使用 Ray Data 进行并行化。

像 PyTorch 这样的训练器会在每个 epoch 中一遍又一遍地读取相同的数据集。在每个 epoch 中为 PyTorch 获取大型数据集成为训练瓶颈。通过利用 Alluxio 的高性能分布式缓存，Ray 上的训练器可以减少总训练时间，提高 GPU 利用率，并加速端到端模型生命周期。

先决条件：Ray 版本 >= 2.8.2

```python
# Pass the initialized Alluxio filesystem to Ray and read the dataset
ds = ray.data.read_parquet("s3://bucket/path/to/dataset/file1.parquet", filesystem=alluxio_fs)

# Get a count of the number of records in the single file
ds.count()

# Display the schema derived from the file header record
ds.schema()

# Display the header record
ds.take(1)

# Display the first data record
ds.take(2)

# Read multiple files:
ds2 = ray.data.read_parquet("s3://bucket/path/to/dataset/", filesystem=alluxio_fs)

# Get a count of the number of records in the files
ds2.count()
```

### 示例：PyArrow

PyArrow 通过提供高性能的内存中列式存储格式，允许应用程序和数据无缝连接。它实现了不同数据处理系统之间的高效数据交换。通过将其存储接口委托给 fsspec，PyArrow 可以通过统一的接口访问各种存储后端。通过使用 alluxiofs，PyArrow 可以利用 Alluxio 的分布式缓存功能来提高数据访问速度并减少延迟。

示例 1：

<pre class="language-python"><code class="lang-python"><strong># Pass the initialized Alluxio filesystem to Pyarrow and read the data set from the example csv file
</strong>import pyarrow.dataset as ds
dataset = ds.dataset("s3://bucket/path/to/dataset/file1.parquet", filesystem=alluxio_fs)
​
# Get a count of the number of records in the file
dataset.count_rows()
​
# Display the schema derived from the file header record
dataset.schema
​
# Display the first record
dataset.take(0)
</code></pre>

示例 2：

<pre class="language-python"><code class="lang-python"><strong>from pyarrow.fs import PyFileSystem, FSSpecHandler
</strong>​
# Create a python-based PyArrow filesystem using FsspecHandler
py_fs = PyFileSystem(FSSpecHandler(alluxio_fs))
​
# Read the data by using the Pyarrow filesystem interface
with py_fs.open_input_file("s3://bucket/path/to/dataset/file1.parquet") as f:
    f.read()
</code></pre>

## 高级初始化参数

### 连接到 Alluxio 集群的参数

* **etcd\_hosts** (str, required): ETCD 服务器主机的逗号分隔列表，格式为 "host1:port1,host2:port2,..."。ETCD 用于动态发现 Alluxio worker。
* **etcd\_port** (int, optional): 每个 ETCD 服务器使用的端口号。默认为 `2379`。
* **options** (dict, optional): Alluxio 配置选项的字典，其中键是属性名称，值是属性值。这些选项配置 Alluxio 客户端行为。

示例：配置 Alluxio fsspec。请注意，以下选项在 alluxiofs 和 alluxio 集群之间必须相同

* `alluxio.worker.page.store.page.size` (默认 `1MB`): worker 分页块存储中每个页面的大小。对于大型 parquet 文件，建议设置为 `20MB`。
* `alluxio.user.worker.selection.policy.consistent.hash.virtual.node.count.per.worker` (默认 `2000`): 这是一致性哈希算法中一个 worker 的虚拟节点数。在一致性哈希算法中，当成员资格发生变化时，会重新分配一些虚拟节点，而不是重建整个哈希表。这保证了哈希表仅发生最小程度的更改。为了实现这一点，虚拟节点的数量应该是集群中物理节点的 X 倍，其中 X 是重新分配粒度和大小之间的平衡。建议设置为 `5`。

```python
alluxio_options = {
    "alluxio.user.worker.selection.policy.consistent.hash.virtual.node.count.per.worker": "2000",
    "alluxio.worker.page.store.page.size": "4MB",
}
```

（可选）为分布式加载操作初始化 `alluxio_client`：

```python
alluxio_client = AlluxioClient(etcd_hosts="host1,host2,host3", etcd_port=8888, options=alluxio_options)
```

为 fsspec 文件系统操作初始化 `alluxio_fs`：

```python
alluxio_fs = fsspec.filesystem("alluxiofs", cluster_name="ai-alluxio", etcd_hosts="127.0.0.1", etcd_port=2379, target_protocol="s3", options=alluxio_options)
```

配置 `alluxio_fs` 的记录器

```python
import logging
import fsspec
from alluxiofs import AlluxioFileSystem, setup_logger

alluxio_fs = fsspec.filesystem("alluxiofs", cluster_name="ai-alluxio", etcd_hosts="127.0.0.1", etcd_port=2379, logger=setup_logger("./", level=logging.DEBUG))
```

### 存储后端的参数

参数：

* **target\_protocol** (str, optional): 指定用于创建底层存储文件系统对象的底层存储协议。常见示例包括用于 Amazon S3 的 `s3`、用于 Hadoop 分布式文件系统的 `hdfs` 等。
* **target\_options** (dict, optional): 提供与 `target_protocol` 相关的一组配置选项。这些选项可能包括凭据、端点 URL 以及与底层存储系统成功交互所需的其他协议特定设置。
* **fs** (object, optional): 直接提供用于访问 Alluxio 底层存储的文件系统对象实例
* **logger**(object， optional): 配置存储日志文件的路径和记录器的级别，路径默认为当前路径，级别默认为 logging.INFO。

#### 示例：连接到 S3

要连接到 S3，您可以按照以下步骤操作：

1. 查看 S3 fsspec 文档：请参阅 [s3fs 文档](https://s3fs.readthedocs.io/en/latest/) 以了解连接到 S3 支持的参数。典型参数包括：
   * **anon** bool (False): 是否使用匿名连接（仅限公共存储桶）。如果为 False，则使用给定的密钥/秘密，或 boto 的凭据解析器；client\_kwargs、环境变量、配置文件、EC2 IAM 服务器，按此顺序
   * **endpoint\_url** string (None): 如果指定，则使用此 endpoint\_url。连接到非 AWS S3 存储桶时需要。优先于 client\_kwargs 中的 `endpoint_url`。
   * **key** string (None): 如果不是匿名，则使用此访问密钥 ID（如果指定）。优先于 client\_kwargs 中的 `aws_access_key_id`。
   * **secret** string (None): 如果不是匿名，则使用此秘密访问密钥（如果指定）。优先于 client\_kwargs 中的 `aws_secret_access_key`。
   * **token** string (None): 如果不是匿名，则使用此安全令牌（如果指定）
2. 将支持的参数作为 target\_options 传递给 Alluxio：然后，您可以使用这些参数使用 fsspec 创建 Alluxio 文件系统对象。

以下是使用 fsspec 创建连接到 S3 的 Alluxio 文件系统对象的方法：

```python
import fsspec

# Example configuration for connecting to S3
s3_options = {
    "key": "your-aws-access-key",
    "secret": "your-aws-secret-key",
    "endpoint_url": "https://s3.your-endpoint.com"
}

# Creating the Alluxio file system object
alluxio_fs = fsspec.filesystem(
    "alluxiofs",
    cluster_name="ai-alluxio", 
    etcd_hosts="127.0.0.1", 
    etcd_port=2379,
    target_protocol="s3",
    target_options=s3_options
)

# Now you can use alluxio_fs to interact with the Alluxio file system backed by S3
```

在此示例中：

* 将 `your-aws-access-key` 和 `your-aws-secret-key` 替换为您的实际 AWS 凭据。
* 如果需要，将 `https://s3.your-endpoint.com` 替换为您的 S3 兼容服务的相应端点 URL。

通过执行这些步骤，您可以使用 fsspec 有效地连接到具有 S3 后端的 Alluxio。

## 监控指标

### 监控系统设置

1. **Prometheus 安装和配置** 编辑 `prometheus.yml` 配置文件，然后启动 Prometheus：

   ```bash
   nohup ./prometheus --web.enable-admin-api --config.file=./prometheus.yml >./prometheus.log 2>&1 &
   ```
2. **Grafana 安装和配置** 启动 Grafana：

   ```bash
   nohup ./bin/grafana-server --homepath . web >./grafana.log 2>&1 &
   ```

有关设置监控系统的更详细说明，请参阅 [Alluxio 官方文档](/ee-ai-cn/ai-3.7/administration/monitoring-alluxio.md)。

### 监控指标说明

| **指标名称**                                    | **描述**                                      | **标签**              | **单位**  | **实现代码**           |
| ------------------------------------------- | ------------------------------------------- | ------------------- | ------- | ------------------ |
| `alluxio_http_server_call_latency_ms`       | HTTP 服务调用延迟的直方图（桶边界：\[10, 40, 160, 640] ms） | `method`, `success` | 毫秒 (ms) | `HistogramWrapper` |
| `alluxio_http_server_result_total`          | HTTP 服务结果总数                                 | `method`, `state`   | 计数      | `CounterWrapper`   |
| `alluxio_http_server_call_latency_ms_sum`   | HTTP 服务调用总延迟                                | `method`, `success` | 毫秒 (ms) | `HistogramWrapper` |
| `alluxio_http_server_call_latency_ms_count` | HTTP 服务调用计数                                 | `method`, `success` | 计数      | `HistogramWrapper` |


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://documentation.alluxio.io/ee-ai-cn/ai-3.7/data-access/fsspec.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
