# 通过 PythonSDK/FSSpec 访问

{% hint style="warning" %}
实验性功能
{% endhint %}

Alluxio Python SDK (`alluxiofs`) 是基于[FSSpec](https://filesystem-spec.readthedocs.io/en/latest/api.html)的，允许应用程序通过统一的Python文件系统接口无缝地与各种存储后端进行交互。它利用高性能的分布式缓存层，即Alluxio集群，显著提高数据访问的速度，降低延迟，这对于数据密集型应用程序和工作流非常有好处，特别是针对AI训练场景，在模型的训练过程中需要快速且重复地访问大型数据集，加速效果异常明显。

## 先决条件

* 正在运行的Alluxio集群
* Python版本 >= 3.8

## 安装

### 安装依赖

#### 安装底层数据湖存储（如s3，oss）

示例：将S3作为底层数据湖存储，[安装第三方S3存储](https://s3fs.readthedocs.io/en/latest/)

```
pip install s3fs
```

#### 安装 alluxiofs

```
pip install alluxiofs
```

### 环境准备

1. **启动 Alluxio 集群，至少包含 1 个 coordinator 和 1 个 worker**

* 如果您希望在 Kubernetes 中启动集群，请参考[Alluxio 官方文档](https://documentation.alluxio.io/ee-ai-cn/ai-3.6/start/install/install-alluxio-on-kubernetes)。
* 如果您希望在物理裸机上启动集群，可以使用 `./bin/alluxio process start` 命令行工具。

2. **使用 UFS 的凭证配置 Alluxio**

```
# S3 相关配置
s3a.accessKeyId=your-s3a-accessKeyId
s3a.secretKey=your-s3a-secretKey

# OSS 相关配置
fs.oss.accessKeyId=your-oss-keyid
fs.oss.accessKeySecret=your-oss-secert
fs.oss.endpoint=your-oss-endpoint
```

3. **使用 FSSpec 与 Alluxio 交互**\
   关于具体的细节，参考[创建alluxiofs实例](#创建alluxiofs实例)和[alluxiofs基本的文件操作](#alluxiofs基本的文件操作)

## 将数据加载到Alluxio

如果数据已经加载到Alluxio集群中，请跳过此步骤。

下面是一个简单的示例，该例子创建了一个客户端，用于连接到在 Kubernetes 中使用 ETCD 进行集群成员管理的 Alluxio 集群。集群的服务名称为 `alluxio-etcd.alluxio-ai`，集群名称为 `alluxio`，位于 `alluxio-ai` 命名空间，并使用 S3 作为底层存储。有关更多配置设置，请参见[连接Alluxio集群的参数](#连接alluxio集群的参数)。

向Alluxio集群提交分布式加载作业：

```
from alluxiofs.client import AlluxioClient

alluxio_client = AlluxioClient(cluster_name="alluxio", etcd_hosts="alluxio-etcd.alluxio-ai")
alluxio_client.submit_load("s3://bucket/path/to/dataset/")
```

这将异步触发加载作业。您可以等待加载完成或使用以下命令检查此加载的进度：

```
alluxio_client.load_progress("s3://bucket/path/to/dataset/")
```

取消分布式加载作业:

```
alluxio_client.stop_load("s3://bucket/path/to/dataset/")
```

## 创建alluxiofs实例

以下是一个简单的文件系统处理程序创建过程，用于连接到在 Kubernetes 中使用 ETCD 进行集群成员管理的 Alluxio 集群。该服务名为 `alluxio-etcd.alluxio-ai`，集群名称为 `alluxio`，位于 `alluxio-ai` 命名空间，并使用 S3 作为底层存储。

```
import fsspec
from alluxiofs import AlluxioFileSystem, setup_logger

# Register Alluxio to fsspec
fsspec.register_implementation("alluxiofs", AlluxioFileSystem, clobber=True)

# Create Alluxio filesystem
alluxio_options = {
    "alluxio.worker.page.store.page.size": "4MB",
}
alluxio_fs = fsspec.filesystem(
    "alluxiofs",
    cluster_name="alluxio",
    etcd_hosts="alluxio-etcd.alluxio-ai",
    target_protocol="s3",
    options=alluxio_options
    logger=setup_logger("./", level=logging.DEBUG),
)
```

有关Alluxio集群和/或存储系统连接的高级参数设置，请参见[高级初始化参数](#高级初始化参数)。

## alluxiofs基本的文件操作

以下是 `alluxio_fs` 支持的文件和目录常用操作：

### 1. `ls`

```python
ls(self, path: str, detail: bool=False) -> list[dict]
```

列出目录内容

**参数**

• `path (str)`: 目标目录路径\
• `detail (bool, 可选)`: 为True时返回详细条目信息，默认为False

**返回值**

• `list[dict]`: 包含JSON格式信息的字典列表

**示例**

```python
contents = alluxio_fs.ls("/目录路径", False)
print(contents)
```

### 2. `info`

```python
info(self, path: str) -> dict
```

获取文件/目录信息

**参数**

• `path (str)`: 目标路径

**返回值**

• `dict`: JSON格式的文件/目录信息

**示例**

```python
file_info = alluxio_fs.info("/文件路径")
print(file_info)
```

### 3. `isdir`

```python
isdir(self, path: str) -> bool
```

检查路径是否为目录

**参数**

• `path (str)`: 目标路径

**返回值**

• `bool`: 是否为目录

**示例**

```python
is_directory = alluxio_fs.isdir("/目录路径")
print(is_directory)
```

### 4. `_open`

```python
_open(self, path: str, mode: str="rb", block_size: int=None, autocommit: bool=True, cache_options: dict=None, **kwargs) -> AlluxioFile
```

打开文件进行读写操作

**参数**

• `path (str)`: 文件路径\
• `mode (str, 可选)`: 打开模式，默认为"rb"\
• `block_size (int, 可选)`: 读取块大小\
• `autocommit (bool, 可选)`: 是否自动提交更改\
• `cache_options (dict, 可选)`: 缓存选项\
• `**kwargs`: 其他关键字参数

**返回值**

• `AlluxioFile`: 支持`read()`/`write()`等操作的文件对象

**示例**

```python
with alluxio_fs._open("/文件路径", mode="rb") as f:
    data = f.read()
    print(data)

with alluxio_fs._open("/文件路径", mode="wb") as f:
    f.write(data)
```

### 5. `cat_file`

```python
cat_file(self, path: str, start: int=0, end: int=None) -> bytes
```

读取文件指定字节范围（适用于<10MB的小文件）

**参数**

• `path (str)`: 文件路径\
• `start (int, 可选)`: 起始字节，默认为0\
• `end (int, 可选)`: 结束字节，默认为None

**返回值**

• `bytes`: 读取的字节数据

**示例**

```python
data = alluxio_fs.cat_file("/文件路径", start=0, end=100)
print(data)
```

### 6. `mkdir`

```python
mkdir(self, path: str) -> bool
```

创建新目录

**参数**

• `path (str)`: 新目录路径

**返回值**

• `bool`: 是否创建成功

**示例**

```python
alluxio_fs.mkdir("/新目录路径")
```

### 7. `rm`

```python
rm(self, path: str, recursive: bool=False, recursiveAlias: bool=False, removeAlluxioOnly: bool=False, deleteMountPoint: bool=False, syncParentNextTime: bool=False, removeUncheckedOptionChar: bool=False) -> bool
```

删除文件或目录

**参数**: 多个控制选项标志

**返回值**

• `bool`: 是否操作成功

**示例**

```python
alluxio_fs.rm("/目标路径", recursive=True)
```

### 8. `touch`

```python
touch(self, path: str) -> bool
```

创建空文件

**参数**

• `path (str)`: 文件路径

**返回值**

• `bool`: 是否创建成功

**示例**

```python
alluxio_fs.touch("/新文件路径")
```

### 9. `head`

```python
head(self, path: str, num_of_bytes: int=1024) -> bytes
```

读取文件开头部分

**参数**

• `path (str)`: 文件路径\
• `num_of_bytes (int)`: 读取字节数

**返回值**

• `bytes`: 读取的字节数据

**示例**

```python
data = alluxio_fs.head("/文件路径", num_of_bytes=1024)
print(data)
```

### 10. `tail`

```python
tail(self, path: str, num_of_bytes: int=1024) -> bytes
```

读取文件末尾部分

**参数**

• `path (str)`: 文件路径\
• `num_of_bytes (int)`: 读取字节数

**返回值**

• `bytes`: 读取的字节数据

**示例**

```python
data = alluxio_fs.tail("/文件路径", num_of_bytes=1024)
print(data)
```

### 11. `mv`

```python
mv(self, path1: str, path2: str) -> bool
```

移动或重命名文件/目录

**参数**

• `path1 (str)`: 源路径\
• `path2 (str)`: 目标路径

**返回值**

• `bool`: 是否操作成功

**示例**

```python
alluxio_fs.mv("/源路径", "/目标路径")
```

### 12. `copy` / `cp_file`

```python
copy(self, path1: str, path2: str, recursive: bool=False, recursiveAlias: bool=False, force: bool=False, thread: int=None, bufferSize: int=None, preserve: bool=None) -> bool
```

复制文件或目录

**参数**: 递归/强制/线程等多种选项

**返回值**

• `bool`: 是否复制成功

**示例**

```python
alluxio_fs.copy("/源路径", "/目标路径", recursive=True)
```

### 13. `read`

```python
read(self, path: str) -> bytes
```

读取整个文件（适用于<10MB的小文件）

**参数**

• `path (str)`: 文件路径

**返回值**

• `bytes`: 文件完整内容

**示例**

```python
data = alluxio_fs.read("/文件路径")
print(data)
```

### 14. `rename` / `move`

```python
rename(self, path1: str, path2: str) -> bool
```

`rename` 是 `mv` 的别名，用于重命名或移动文件或目录。

* **参数**
  * `path1 (str)`：源路径
  * `path2 (str)`：目标路径
* **返回值**
  * `bool`：操作是否成功
* **示例**

  ```python
  python复制编辑alluxio_fs.rename("/path/old", "/path/new")
  ```

***

### 15. `cp_file`

```python
cp_file(self, path1: str, path2: str, recursive: bool=Fa
```

`cp_file` 是 `copy` 的别名，用于复制文件或目录。

* **参数**
  * `path1 (str)`：源路径
  * `path2 (str)`：目标路径
  * `recursive (bool)`：是否递归复制目录（默认 `False`）
* **返回值**
  * `bool`：复制操作是否成功
* **示例**

  ```python
  python复制编辑alluxio_fs.cp_file("/path/source", "/path/destination", recursive=True)
  ```

***

### 16. `upload`

<pre class="language-python"><code class="lang-python"><strong>upload(self, lpath: str, rpath: str) -> bool
</strong></code></pre>

从本地文件系统上传大文件到Alluxio（根据WriteType可能同时写入UFS）。该方法采用分块读取和流式写入方式。

**参数**

• `lpath (str)`: 本地文件系统源路径\
• `rpath (str)`: Alluxio/UFS目标路径

**返回值**

• `bool`: 是否上传成功

**示例**

```python
# 上传大文件到Alluxio
alluxio_fs.upload("/本地路径", "/UFS路径")
```

***

### 17. `upload_data`

```python
upload_data(self, lpath: str, data: bytes) -> bool
```

使用字节流将大型文件上传到 Alluxio。与从磁盘读取的 `upload` 方法不同，`upload_data` 直接接收字节内容。

* **参数**
  * `lpath (str)`：Alluxio/UFS 中的目标路径
  * `data (bytes)`：要上传的字节内容
* **返回值**
  * `bool`：上传任务是否成功完成
* **示例**

  ```python
  python复制编辑# 以字节流方式上传数据
  alluxio_fs.upload_data("/path/in/UFS", b"binary data")
  ```

### 18. `download`

```python
download(self, lpath: str, rpath: str) -> bool
```

将 Alluxio 中的大文件下载到本地文件系统中。

**参数**

* `lpath (str)`：本地文件系统中的目标路径。
* `rpath (str)`：Alluxio 中的源路径。

**返回值**

* `bool`：下载任务是否成功完成。

**示例**

```python
# 从 Alluxio 下载大文件
alluxio_fs.download("/local/path", "/alluxio/path")
```

***

### 19. `download_data`

```python
download_data(self, lpath: str) -> io.BytesIO
```

将 Alluxio 中的文件以内存字节流的方式下载返回。

**参数**

* `lpath (str)`：Alluxio 文件路径。

**返回值**

* `io.BytesIO`：包含文件内容的字节流对象。

**示例**

```python
# 以字节流方式下载文件
byte_stream = alluxio_fs.download_data("/path/in/UFS")
data = byte_stream.read()
```

***

### 20. `write`

```python
write(self, path: str, value: bytes) -> bool
```

更多关于fsspec的Python文件系统操作示例，可以参见[fsspec官方文档](https://filesystem-spec.readthedocs.io/en/latest/usage.html#use-a-file-system)。

## 与其他框架的集成

### 举例：Ray

[Ray](https://www.ray.io/)是一个快速且简单的框架，用于构建和运行分布式应用程序。 在Ray之上运行的PyTorch，TensorFlow和XGBoost训练器可以利用Ray的高级功能，如创建异构集群，用CPU机器来进行数据加载和预处理，用GPU机器来进行训练。 可以使用Ray Data并行化进行数据加载、预处理和训练。

像PyTorch这样的训练器会在每个时期重复读取相同的数据集。 对于PyTorch来说，在每个 epoch 获取大量的数据集已经成为训练的瓶颈。 通过利用Alluxio的高性能分布式缓存，Ray上的训练器可以减少总训练时间，提高GPU利用率，并缩短端到端的模型生命周期。

先决条件: Ray版本 >= 2.8.2

```
# Pass the initialized Alluxio filesystem to Ray and read the dataset
ds = ray.data.read_parquet("s3://bucket/path/to/dataset/file1.parquet", filesystem=alluxio_fs)

# Get a count of the number of records in the single file
ds.count()

# Display the schema derived from the file header record
ds.schema()

# Display the header record
ds.take(1)

# Display the first data record
ds.take(2)

# Read multiple files:
ds2 = ray.data.read_parquet("s3://bucket/path/to/dataset/", filesystem=alluxio_fs)

# Get a count of the number of records in the files
ds2.count()
```

### PyArrow

PyArrow通过提供高性能的内存列存储格式，可以让应用程序和数据无缝连接。 它实现了不同数据处理系统之间的高效数据交换。 通过将其存储接口委托给fsspec，PyArrow可以通过统一的接口访问各种存储后端。 通过使用alluxiofs，PyArrow可以利用Alluxio的分布式缓存能力来提高数据访问速度并降低延迟。

示例1：

```
# Pass the initialized Alluxio filesystem to Pyarrow and read the data set from the example csv file
import pyarrow.dataset as ds
dataset = ds.dataset("s3://bucket/path/to/dataset/file1.parquet", filesystem=alluxio_fs)

# Get a count of the number of records in the file
dataset.count_rows()

# Display the schema derived from the file header record
dataset.schema

# Display the first record
dataset.take(0)
```

示例2：

```
from pyarrow.fs import PyFileSystem, FSSpecHandler

# Create a python-based PyArrow filesystem using FsspecHandler
py_fs = PyFileSystem(FSSpecHandler(alluxio_fs))

# Read the data by using the Pyarrow filesystem interface
with py_fs.open_input_file("s3://bucket/path/to/dataset/file1.parquet") as f:
    f.read()
```

## 高级初始化参数

### 连接Alluxio集群的参数

* **etcd\_hosts** (str, 必需): ETCD服务器主机地址列表，格式为 "host1:port1,host2:port2,..."。 ETCD用于动态发现Alluxio工作节点。
* **etcd\_port** (int, 可选): 每个ETCD服务器使用的端口号。默认为 2379。
* **options** (dict, 可选): 一个包含Alluxio配置选项的字典，其中键是属性名称，值是属性值。 这些选项用于配置Alluxio客户端的行为。

示例：配置 Alluxio fsspec。请注意，以下选项在 alluxiofs 和 Alluxio 集群之间必须保持一致。

* `alluxio.worker.page.store.page.size`（默认值 1MB）：Worker 分页块存储中每个页面的大小。对于大型 Parquet 文件，建议设置为 20MB。
* `alluxio.user.worker.selection.policy.consistent.hash.virtual.node.count.per.worker`（默认值 `2000`）：在一致性哈希算法中，每个 Worker 的虚拟节点数。在成员变更时，一致性哈希算法会重新分配部分虚拟节点，而不是重建整个哈希表，从而尽可能减少哈希表的变更。为了实现这一点，虚拟节点数应为集群中物理节点数的 X 倍，其中 X 是虚拟节点分配粒度和规模之间的平衡参数。建议将此值设置为 5。

```
alluxio_options = {
    "alluxio.user.worker.selection.policy.consistent.hash.virtual.node.count.per.worker": "2000",
    "alluxio.worker.page.store.page.size": "4MB",
}
```

（可选）初始化 `alluxio_client` 以进行分布式加载作业:

```
alluxio_client = AlluxioClient(etcd_hosts="host1,host2,host3", etcd_port=8888, )
```

初始化 `alluxio_fs` 以进行 fsspec 文件系统操作:

```
alluxio_fs = fsspec.filesystem("alluxiofs", etcd_hosts="localhost", target_protocol="s3", options=alluxio_options)
```

配置 `alluxio_fs` 的logger

```
import logging
import fsspec
from alluxiofs import AlluxioFileSystem, setup_logger

alluxio_fs = fsspec.filesystem("alluxiofs", etcd_hosts="localhost", logger=setup_logger("./", level=logging.DEBUG))
```

### 底层存储的参数

参数：

* **target\_protocol** (str, 可选): 指定要创建底层存储文件系统对象的底层存储协议。 常见的示例包括`s3`用于Amazon S3，`hdfs` 用于Hadoop分布式文件系统等。
* **target\_options** (dict, 可选): 提供一组与 `target_protocol` 相关的配置选项。 这些选项可能包括凭证、终端节点URL和其他与底层存储系统成功交互所需的特定协议的设置。
* **fs** (object, 可选): 直接提供用于访问Alluxio底层存储的文件系统对象实例。
* **logger**(object，可选): 配置日志文件的存储路径和日志级别，路径默认为当前路径，日志级别默认为 `logging.INFO`。

#### 示例: 连接到 S3

要连接到S3，可以按照以下步骤进行：

1. 查看S3 fsspec文档：参考[s3fs文档](https://s3fs.readthedocs.io/en/latest/) 以了解连接到S3所支持的参数。典型的参数包括：

* **anon** bool (False): 是否使用匿名连接（仅限公共存储桶）。 如果为False，则使用给定的key/secret，或boto的凭证解析器；client\_kwargs、环境变量、配置文件、EC2 IAM服务器，依次进行。
* **endpoint\_url** string (None): 使用此endpoint\_url（如指定）。连接到非AWS S3存储桶时会需要用到。优先于client\_kwargs中的 `endpoint_url`。
* **key** string (None): 非匿名的情况下，请使用此访问密钥ID（如指定）。优先于client\_kwargs中的 `aws_access_key_id`。
* **secret** string (None): 非匿名的情况下，请使用此密钥访问密钥（如指定）。优先于client\_kwargs中的 `aws_secret_access_key`。
* **token** string (None): 非匿名的情况下，请使用此安全 token（如指定）。

2. 将支持的参数作为target\_options传递给Alluxio：然后，您可以使用这些参数使用fsspec创建Alluxio文件系统对象。

以下是如何使用fsspec创建连接到S3的Alluxio文件系统对象的示例：

```
import fsspec

# Example configuration for connecting to S3
s3_options = {
    "key": "your-aws-access-key",
    "secret": "your-aws-secret-key",
    "endpoint_url": "https://s3.your-endpoint.com"
}

# Creating the Alluxio file system object
alluxio_fs = fsspec.filesystem(
    "alluxiofs",
    etcd_hosts="localhost",
    target_protocol="s3",
    target_options=s3_options
)

# Now you can use alluxio_fs to interact with the Alluxio file system backed by S3
```

在此示例中：

* 将`your-aws-access-key` 和 `your-aws-secret-key` 替换为您实际的AWS凭证。
* 如果需要，将 `https://s3.your-endpoint.com` 替换为合适的且与您的S3兼容的服务端点URL。

按照这些步骤，您可以使用fsspec有效地连接到使用S3作为底层存储的Alluxio集群。

## 监控指标

### 监控系统搭建

1. **Prometheus 安装与配置**\
   修改 `prometheus.yml` 配置文件，然后启动 Prometheus：

   ```bash
   nohup ./prometheus --web.enable-admin-api --config.file=./prometheus.yml >./prometheus.log 2>&1 &
   ```
2. **Grafana 安装与配置**\
   启动 Grafana：

   ```bash
   nohup ./bin/grafana-server --homepath . web >./grafana.log 2>&1 &
   ```

详细的指标监控系统搭建可以参考 [Alluxio 官方文档](https://documentation.alluxio.io/ee-ai-cn/ai-3.6/start/monitoring-and-metrics)。

### 监控指标的说明

| **指标名称**                                   | **描述**                                             | **标签**              | **单位**     | **实现代码**           |
| ------------------------------------------ | -------------------------------------------------- | ------------------- | ---------- | ------------------ |
| `alluxio_http_server_call_latency_ms`      | HTTP 服务调用延迟的直方图（Bucket 边界: \[10, 40, 160, 640] 毫秒） | `method`, `success` | 毫秒 (ms)    | `HistogramWrapper` |
| `alluxio_http_server_result_total`         | HTTP 服务结果的总计数                                      | `method`, `state`   | 次数 (count) | `CounterWrapper`   |
| `alluxio_http_server_call_latency_ms_sum`  | HTTP 服务调用延迟总和                                      | `method`, `success` | 毫秒 (ms)    | `HistogramWrapper` |
| `aluxio_http_server_call_latency_ms_count` | HTTP 服务调用次数                                        | `method`, `success` | 次数 (count) | `HistogramWrapper` |
