通过 FSSpec 的 Python API

Alluxio Python SDK (alluxiofs) 基于 FSSpec,它允许应用程序使用统一的 Python 文件系统接口与各种存储后端无缝交互。它利用高性能的分布式缓存层,即 Alluxio 集群,显著提高数据访问速度并减少延迟。这对于数据密集型应用程序和工作流,尤其是需要快速、重复访问大型数据集的 AI 训练工作负载,尤其有益。

先决条件

  • 一个正在运行的 Alluxio 集群

  • Python 版本 >= 3.8

安装

安装依赖

安装底层数据湖存储(例如 s3、oss)

示例:将 S3 部署为底层数据湖存储 安装第三方 S3 存储

pip install s3fs

安装 alluxiofs

pip install alluxiofs

环境设置

  1. 启动至少有 1 个协调器和 1 个 worker 的 Alluxio 集群

    • 如果要在 K8s 中启动集群,请参阅 Alluxio 官方文档

    • 如果要在裸机上启动集群,可以使用 ./bin/alluxio process start cli。

  2. 使用 UFS 的凭据配置 Alluxio

#s3 相关
s3a.accessKeyId=your-s3a-accessKeyId
s3a.secretKey=your-s3a-secretKey


#oss 相关
fs.oss.accessKeyId=your-oss-keyid
fs.oss.accessKeySecret=your-oss-secert
fs.oss.endpoint=your-oss-endpoint
  1. 使用 alluxiofs 与 alluxio 交互

有关详细信息,请参阅创建 alluxiofs 实例alluxiofs 基本文件操作

将数据加载到 Alluxio 集群

如果数据已加载到 Alluxio 集群中,请跳过此步骤。

这个简单的示例创建了一个客户端,该客户端连接到 Kubernetes 中具有 ETCD 成员资格的 Alluxio 集群,集群名称为 alluxio,命名空间为 ai,并使用 S3 作为底层存储。 有关更多配置设置,请参阅连接到 Alluxio 服务器的高级参数

向 Alluxio 集群提交分布式加载作业:

from alluxiofs.client import AlluxioClient

alluxio_client = AlluxioClient(cluster_name="ai-alluxio", etcd_hosts="127.0.0.1", etcd_port=2379)
alluxio_client.submit_load("s3://bucket/path/to/dataset/")

这将异步触发加载作业。您可以使用以下命令等待加载完成或检查此加载过程的进度:

alluxio_client.load_progress("s3://bucket/path/to/dataset/")

要取消分布式加载作业:

alluxio_client.stop_load("s3://bucket/path/to/dataset/")

创建 alluxiofs 实例

这个简单的示例创建了一个文件系统处理程序,该处理程序连接到 Kubernetes 中具有 ETCD 成员资格的 Alluxio 集群,集群名称为 alluxio,命名空间为 ai,并使用 S3 作为底层存储。

import fsspec
from alluxiofs import AlluxioFileSystem, setup_logger

# Register Alluxio to fsspec
fsspec.register_implementation("alluxiofs", AlluxioFileSystem, clobber=True)

# Create Alluxio filesystem
alluxio_options = {
    "alluxio.worker.page.store.page.size": "4MB",
}
alluxio_fs = fsspec.filesystem(
    "alluxiofs",
    cluster_name="ai-alluxio",
    etcd_hosts="127.0.0.1", # the ip address of etcd
    etcd_port=2379, # the port of etcd
    target_protocol="s3", 
    options=alluxio_options
    logger=setup_logger("./", level=logging.DEBUG),
)

有关为 Alluxio 集群和/或存储系统连接设置高级参数,请参阅高级初始化参数

alluxiofs 的基本文件操作

以下是 alluxio_fs 支持的用于与文件和目录交互的常见操作:

1. ls

ls(self, path: str, detail: bool=False) -> list[dict]

列出目录的内容。

参数

  • path (str): 要列出的目录的路径。

  • detail (bool, optional): 如果为 True,则返回有关每个条目的详细信息。默认为 False。

返回

  • list[dict]: 具有类 JSON 结构的字典列表。

示例

contents = alluxio_fs.ls("/path/to/directory", False)
print(contents)

2. info

info(self, path: str) -> dict

检索有关文件或目录的信息。

参数

  • path (str): 文件或目录的路径。

返回

  • dict: 具有文件或目录信息的类 JSON 结构。

示例

file_info = alluxio_fs.info("/path/to/file")
print(file_info)

3. isdir

isdir(self, path: str) -> bool

检查路径是否为目录。

参数

  • path (str): 要检查的路径。

返回

  • bool: 路径是否为目录。

示例

is_directory = alluxio_fs.isdir("/path/to/directory")
print(is_directory)

4. _open

_open(self, path: str, mode: str="rb", block_size: int=None, autocommit: bool=True, cache_options: dict=None, **kwargs) -> AlluxioFile

打开文件以进行读取或写入。

参数

  • path (str): 要打开的文件的路径。

  • mode (str, optional): 打开文件的模式。默认为 "rb"。

  • block_size (int, optional): 读取的块大小。

  • autocommit (bool, optional): 如果为 True,则自动提交更改。

  • cache_options (dict, optional): 缓存选项。

  • **kwargs: 其他关键字参数。

返回

  • AlluxioFile: 支持 read()write() 等的对象。

示例

with alluxio_fs._open("/path/to/file", mode="rb") as f:
    data = f.read()
    print(data)

with alluxio_fs._open("/path/to/file", mode="wb") as f:
    f.write(data)

5. cat_file

cat_file(self, path: str, start: int=0, end: int=None) -> bytes

从文件中读取一定范围的字节(适用于小于 10MB 的小文件)。

参数

  • path (str): 要读取的文件的路径。

  • start (int, optional): 起始字节。默认为 0。

  • end (int, optional): 结束字节。默认为 None。

返回

  • bytes: 读取的字节。

示例

data = alluxio_fs.cat_file("/path/to/file", start=0, end=100)
print(data)

6. mkdir

mkdir(self, path: str) -> bool

创建目录。

参数

  • path (str): 新目录的路径。

返回

  • bool: 目录是否成功创建。

示例

alluxio_fs.mkdir("/path/to/new_directory")

7. rm

rm(self, path: str, recursive: bool=False, recursiveAlias: bool=False, removeAlluxioOnly: bool=False, deleteMountPoint: bool=False, syncParentNextTime: bool=False, removeUncheckedOptionChar: bool=False) -> bool

删除文件或目录。

参数: 用于控制的多个可选标志。

返回

  • bool: 操作是否成功。

示例

alluxio_fs.rm("/path/to/file_or_directory", recursive=True)

8. touch

touch(self, path: str) -> bool

创建空文件。

参数

  • path (str): 文件路径。

返回

  • bool: 文件是否已创建。

示例

alluxio_fs.touch("/path/to/new_file")

9. head

head(self, path: str, num_of_bytes: int=1024) -> bytes

读取文件的前几个字节。

参数

  • path (str): 文件路径。

  • num_of_bytes (int): 字节数。

返回

  • bytes: 读取的字节。

示例

data = alluxio_fs.head("/path/to/file", num_of_bytes=1024)
print(data)

10. tail

tail(self, path: str, num_of_bytes: int=1024) -> bytes

读取文件的最后几个字节。

参数

  • path (str): 文件路径。

  • num_of_bytes (int): 字节数。

返回

  • bytes: 读取的字节。

示例

data = alluxio_fs.tail("/path/to/file", num_of_bytes=1024)
print(data)

11. mv

mv(self, path1: str, path2: str) -> bool

移动或重命名文件或目录。

参数

  • path1 (str): 源路径。

  • path2 (str): 目标路径。

返回

  • bool: 移动/重命名是否成功。

示例

alluxio_fs.mv("/path/to/source", "/path/to/destination")

12. copy / cp_file

copy(self, path1: str, path2: str, recursive: bool=False, recursiveAlias: bool=False, force: bool=False, thread: int=None, bufferSize: int=None, preserve: bool=None) -> bool

复制文件或目录。

参数: 用于递归、强制、线程等的各种选项。

返回

  • bool: 复制是否成功。

示例

alluxio_fs.copy("/path/to/source", "/path/to/destination", recursive=True)

13. read

read(self, path: str) -> bytes

读取整个文件(适用于小于 10MB 的小文件)。

参数

  • path (str): 文件路径。

返回

  • bytes: 读取的内容。

示例

data = alluxio_fs.read("/path/to/file")
print(data)

14. rename / move

rename(self, path1: str, path2: str) -> bool

mv 的别名。重命名或移动文件或目录。

参数

  • path1 (str): 源路径。

  • path2 (str): 目标路径。

返回

  • bool: 操作是否成功。

示例

alluxio_fs.rename("/path/old", "/path/new")

15. cp_file

cp_file(self, path1: str, path2: str, recursive: bool=False) -> bool

copy 的别名。复制文件或目录。

参数

  • path1 (str): 源路径。

  • path2 (str): 目标路径。

  • recursive (bool): 是否为目录递归复制。

返回

  • bool: 复制是否成功。

示例

alluxio_fs.cp_file("/path/source", "/path/destination", recursive=True)

16. upload

upload(self, lpath: str, rpath: str) -> bool

将大文件从本地操作系统文件系统上传到 Alluxio(以及 UFS,取决于 WriteType)。

参数

  • lpath (str): 本地操作系统中文件的源路径。

  • rpath (str): Alluxio/UFS 中文件的目标路径。

返回

  • bool: 上传任务是否成功完成。

示例

# 将大文件上传到 Alluxio
alluxio_fs.upload("/path/in/local", "/path/in/UFS")

17. upload_data

upload_data(self, lpath: str, data: bytes) -> bool

使用字节流将大文件上传到 Alluxio。与从磁盘读取的 upload 不同,upload_data 直接接受字节。

参数

  • lpath (str): Alluxio/UFS 中的目标路径。

  • data (bytes): 要上传的字节内容。

返回

  • bool: 上传任务是否成功完成。

示例

# upload data as byte stream
alluxio_fs.upload_data("/path/in/UFS", b"binary data")

18. download

download(self, lpath: str, rpath: str) -> bool

将大文件从 Alluxio 下载到本地操作系统文件系统。

参数

  • lpath (str): 本地文件系统中的目标路径。

  • rpath (str): Alluxio 中的源路径。

返回

  • bool: 下载任务是否成功完成。

示例

# download a large file from Alluxio
alluxio_fs.download("/local/path", "/alluxio/path")

19. download_data

download_data(self, lpath: str) -> io.BytesIO

从 Alluxio 下载文件并将其作为内存中的字节流返回。

参数

  • lpath (str): Alluxio 中文件的路径。

返回

  • io.BytesIO: 包含文件内容的字节流。

示例

# download as byte stream
byte_stream = alluxio_fs.download_data("/path/in/UFS")
data = byte_stream.read()

20. write

write(self, path: str, value: bytes) -> bool

将字节数据写入 Alluxio 中的文件。等同于 upload_data

参数

  • path (str): 要写入数据的路径。

  • value (bytes): 字节内容。

返回

  • bool: 写入是否成功。

示例

alluxio_fs.write("/path/to/file", b"data to write")

更多 Python 文件系统操作示例可以在这里找到。

与其他框架集成

示例:Ray

Ray 是一个用于构建和运行分布式应用程序的快速简单的框架。在 Ray 上运行的 PyTorch、TensorFlow 和 XGBoost 训练器可以利用 Ray 的高级功能,例如创建由用于数据加载和预处理的 CPU 机器和用于训练的 GPU 机器组成的异构集群。数据加载、预处理和训练可以使用 Ray Data 进行并行化。

像 PyTorch 这样的训练器会在每个 epoch 中一遍又一遍地读取相同的数据集。在每个 epoch 中为 PyTorch 获取大型数据集成为训练瓶颈。通过利用 Alluxio 的高性能分布式缓存,Ray 上的训练器可以减少总训练时间,提高 GPU 利用率,并加速端到端模型生命周期。

先决条件:Ray 版本 >= 2.8.2

# Pass the initialized Alluxio filesystem to Ray and read the dataset
ds = ray.data.read_parquet("s3://bucket/path/to/dataset/file1.parquet", filesystem=alluxio_fs)

# Get a count of the number of records in the single file
ds.count()

# Display the schema derived from the file header record
ds.schema()

# Display the header record
ds.take(1)

# Display the first data record
ds.take(2)

# Read multiple files:
ds2 = ray.data.read_parquet("s3://bucket/path/to/dataset/", filesystem=alluxio_fs)

# Get a count of the number of records in the files
ds2.count()

示例:PyArrow

PyArrow 通过提供高性能的内存中列式存储格式,允许应用程序和数据无缝连接。它实现了不同数据处理系统之间的高效数据交换。通过将其存储接口委托给 fsspec,PyArrow 可以通过统一的接口访问各种存储后端。通过使用 alluxiofs,PyArrow 可以利用 Alluxio 的分布式缓存功能来提高数据访问速度并减少延迟。

示例 1:

# Pass the initialized Alluxio filesystem to Pyarrow and read the data set from the example csv file
import pyarrow.dataset as ds
dataset = ds.dataset("s3://bucket/path/to/dataset/file1.parquet", filesystem=alluxio_fs)

# Get a count of the number of records in the file
dataset.count_rows()

# Display the schema derived from the file header record
dataset.schema

# Display the first record
dataset.take(0)

示例 2:

from pyarrow.fs import PyFileSystem, FSSpecHandler

# Create a python-based PyArrow filesystem using FsspecHandler
py_fs = PyFileSystem(FSSpecHandler(alluxio_fs))

# Read the data by using the Pyarrow filesystem interface
with py_fs.open_input_file("s3://bucket/path/to/dataset/file1.parquet") as f:
    f.read()

高级初始化参数

连接到 Alluxio 集群的参数

  • etcd_hosts (str, required): ETCD 服务器主机的逗号分隔列表,格式为 "host1:port1,host2:port2,..."。ETCD 用于动态发现 Alluxio worker。

  • etcd_port (int, optional): 每个 ETCD 服务器使用的端口号。默认为 2379

  • options (dict, optional): Alluxio 配置选项的字典,其中键是属性名称,值是属性值。这些选项配置 Alluxio 客户端行为。

示例:配置 Alluxio fsspec。请注意,以下选项在 alluxiofs 和 alluxio 集群之间必须相同

  • alluxio.worker.page.store.page.size (默认 1MB): worker 分页块存储中每个页面的大小。对于大型 parquet 文件,建议设置为 20MB

  • alluxio.user.worker.selection.policy.consistent.hash.virtual.node.count.per.worker (默认 2000): 这是一致性哈希算法中一个 worker 的虚拟节点数。在一致性哈希算法中,当成员资格发生变化时,会重新分配一些虚拟节点,而不是重建整个哈希表。这保证了哈希表仅发生最小程度的更改。为了实现这一点,虚拟节点的数量应该是集群中物理节点的 X 倍,其中 X 是重新分配粒度和大小之间的平衡。建议设置为 5

alluxio_options = {
    "alluxio.user.worker.selection.policy.consistent.hash.virtual.node.count.per.worker": "2000",
    "alluxio.worker.page.store.page.size": "4MB",
}

(可选)为分布式加载操作初始化 alluxio_client

alluxio_client = AlluxioClient(etcd_hosts="host1,host2,host3", etcd_port=8888, options=alluxio_options)

为 fsspec 文件系统操作初始化 alluxio_fs

alluxio_fs = fsspec.filesystem("alluxiofs", cluster_name="ai-alluxio", etcd_hosts="127.0.0.1", etcd_port=2379, target_protocol="s3", options=alluxio_options)

配置 alluxio_fs 的记录器

import logging
import fsspec
from alluxiofs import AlluxioFileSystem, setup_logger

alluxio_fs = fsspec.filesystem("alluxiofs", cluster_name="ai-alluxio", etcd_hosts="127.0.0.1", etcd_port=2379, logger=setup_logger("./", level=logging.DEBUG))

存储后端的参数

参数:

  • target_protocol (str, optional): 指定用于创建底层存储文件系统对象的底层存储协议。常见示例包括用于 Amazon S3 的 s3、用于 Hadoop 分布式文件系统的 hdfs 等。

  • target_options (dict, optional): 提供与 target_protocol 相关的一组配置选项。这些选项可能包括凭据、端点 URL 以及与底层存储系统成功交互所需的其他协议特定设置。

  • fs (object, optional): 直接提供用于访问 Alluxio 底层存储的文件系统对象实例

  • logger(object, optional): 配置存储日志文件的路径和记录器的级别,路径默认为当前路径,级别默认为 logging.INFO。

示例:连接到 S3

要连接到 S3,您可以按照以下步骤操作:

  1. 查看 S3 fsspec 文档:请参阅 s3fs 文档 以了解连接到 S3 支持的参数。典型参数包括:

    • anon bool (False): 是否使用匿名连接(仅限公共存储桶)。如果为 False,则使用给定的密钥/秘密,或 boto 的凭据解析器;client_kwargs、环境变量、配置文件、EC2 IAM 服务器,按此顺序

    • endpoint_url string (None): 如果指定,则使用此 endpoint_url。连接到非 AWS S3 存储桶时需要。优先于 client_kwargs 中的 endpoint_url

    • key string (None): 如果不是匿名,则使用此访问密钥 ID(如果指定)。优先于 client_kwargs 中的 aws_access_key_id

    • secret string (None): 如果不是匿名,则使用此秘密访问密钥(如果指定)。优先于 client_kwargs 中的 aws_secret_access_key

    • token string (None): 如果不是匿名,则使用此安全令牌(如果指定)

  2. 将支持的参数作为 target_options 传递给 Alluxio:然后,您可以使用这些参数使用 fsspec 创建 Alluxio 文件系统对象。

以下是使用 fsspec 创建连接到 S3 的 Alluxio 文件系统对象的方法:

import fsspec

# Example configuration for connecting to S3
s3_options = {
    "key": "your-aws-access-key",
    "secret": "your-aws-secret-key",
    "endpoint_url": "https://s3.your-endpoint.com"
}

# Creating the Alluxio file system object
alluxio_fs = fsspec.filesystem(
    "alluxiofs",
    cluster_name="ai-alluxio", 
    etcd_hosts="127.0.0.1", 
    etcd_port=2379,
    target_protocol="s3",
    target_options=s3_options
)

# Now you can use alluxio_fs to interact with the Alluxio file system backed by S3

在此示例中:

  • your-aws-access-keyyour-aws-secret-key 替换为您的实际 AWS 凭据。

  • 如果需要,将 https://s3.your-endpoint.com 替换为您的 S3 兼容服务的相应端点 URL。

通过执行这些步骤,您可以使用 fsspec 有效地连接到具有 S3 后端的 Alluxio。

监控指标

监控系统设置

  1. Prometheus 安装和配置 编辑 prometheus.yml 配置文件,然后启动 Prometheus:

    nohup ./prometheus --web.enable-admin-api --config.file=./prometheus.yml >./prometheus.log 2>&1 &
  2. Grafana 安装和配置 启动 Grafana:

    nohup ./bin/grafana-server --homepath . web >./grafana.log 2>&1 &

有关设置监控系统的更详细说明,请参阅 Alluxio 官方文档

监控指标说明

指标名称

描述

标签

单位

实现代码

alluxio_http_server_call_latency_ms

HTTP 服务调用延迟的直方图(桶边界:[10, 40, 160, 640] ms)

method, success

毫秒 (ms)

HistogramWrapper

alluxio_http_server_result_total

HTTP 服务结果总数

method, state

计数

CounterWrapper

alluxio_http_server_call_latency_ms_sum

HTTP 服务调用总延迟

method, success

毫秒 (ms)

HistogramWrapper

alluxio_http_server_call_latency_ms_count

HTTP 服务调用计数

method, success

计数

HistogramWrapper

Last updated