Alluxio
ProductsLanguageHome
AI-3.6 (stable)
AI-3.6 (stable)
  • 概览
    • Alluxio 命名空间和底层文件系统
    • worker管理与一致性哈希
    • 多租户和统一管理
    • I/O弹性
  • 部署Alluxio
    • 资源需求和兼容性
    • 安装
      • 在Kubernetes上安装Alluxio
      • 镜像管理
      • 高级配置
      • 许可证
    • 监控和指标
    • 管理控制台
      • 部署
      • 导航控制台
      • 用户角色与访问控制
    • 集群管理
    • 系统健康检查和快速恢复
    • 诊断快照
  • 底层存储系统
    • Amazon AWS S3
    • Azure Blob Store
    • HDFS
    • 阿里云 OSS
    • 腾讯 COS
    • 火山引擎 TOS
    • 谷歌云 GCS
    • 百度智能云对象存储 BOS
    • 网络附加存储 NAS
  • 数据访问
    • 通过 FUSE( POSIX API)访问
      • Client 写回
      • 客户端虚拟路径映射
    • 通过S3 API访问
    • 通过 PythonSDK/FSSpec 访问
    • UFS 带宽限制器
    • 高可用性数据访问
      • 多副本
      • 多可用区(AZ)
    • 性能优化
      • 文件读取
      • 写入文件
      • 元数据列表
  • 缓存管理
    • 缓存加载
    • 缓存过滤策略
    • 缓存驱逐
      • 通过TTL (有效时间)策略自动驱逐缓存
      • 优先级规则
      • 通过Free命令手动驱逐
    • 过期缓存清理
    • 缓存配额
  • 性能基准测试
    • Fio (POSIX)基准
    • MLPerf Storage 基准测试
    • COSBench (S3) 性能基准测试
  • 安全
    • TLS 支持
  • 参考
    • 用户命令行接口
    • 指标
    • REST API
    • S3 API 的使用
    • 第三方授权
  • 版本发布说明
Powered by GitBook
On this page
  • 先决条件
  • 安装
  • 安装依赖
  • 环境准备
  • 将数据加载到Alluxio
  • 创建alluxiofs实例
  • alluxiofs基本的文件操作
  • 1. ls
  • 2. info
  • 3. isdir
  • 4. _open
  • 5. cat_file
  • 6. mkdir
  • 7. rm
  • 8. touch
  • 9. head
  • 10. tail
  • 11. mv
  • 12. copy / cp_file
  • 13. read
  • 16. upload
  • 18. download
  • 19. download_data
  • 20. write
  • 与其他框架的集成
  • 举例:Ray
  • PyArrow
  • 高级初始化参数
  • 连接Alluxio集群的参数
  • 底层存储的参数
  • 监控指标
  • 监控系统搭建
  • 监控指标的说明
  1. 数据访问

通过 PythonSDK/FSSpec 访问

Last updated 1 day ago

实验性功能

Alluxio Python SDK (alluxiofs) 是基于的,允许应用程序通过统一的Python文件系统接口无缝地与各种存储后端进行交互。它利用高性能的分布式缓存层,即Alluxio集群,显著提高数据访问的速度,降低延迟,这对于数据密集型应用程序和工作流非常有好处,特别是针对AI训练场景,在模型的训练过程中需要快速且重复地访问大型数据集,加速效果异常明显。

先决条件

  • 正在运行的Alluxio集群

  • Python版本 >= 3.8

安装

安装依赖

安装底层数据湖存储(如s3,oss)

示例:将S3作为底层数据湖存储,

pip install s3fs

安装 alluxiofs

pip install alluxiofs

环境准备

  1. 启动 Alluxio 集群,至少包含 1 个 coordinator 和 1 个 worker

  • 如果您希望在物理裸机上启动集群,可以使用 ./bin/alluxio process start 命令行工具。

  1. 使用 UFS 的凭证配置 Alluxio

# S3 相关配置
s3a.accessKeyId=your-s3a-accessKeyId
s3a.secretKey=your-s3a-secretKey

# OSS 相关配置
fs.oss.accessKeyId=your-oss-keyid
fs.oss.accessKeySecret=your-oss-secert
fs.oss.endpoint=your-oss-endpoint

将数据加载到Alluxio

如果数据已经加载到Alluxio集群中,请跳过此步骤。

向Alluxio集群提交分布式加载作业:

from alluxiofs.client import AlluxioClient

alluxio_client = AlluxioClient(cluster_name="alluxio", etcd_hosts="alluxio-etcd.alluxio-ai")
alluxio_client.submit_load("s3://bucket/path/to/dataset/")

这将异步触发加载作业。您可以等待加载完成或使用以下命令检查此加载的进度:

alluxio_client.load_progress("s3://bucket/path/to/dataset/")

取消分布式加载作业:

alluxio_client.stop_load("s3://bucket/path/to/dataset/")

创建alluxiofs实例

以下是一个简单的文件系统处理程序创建过程,用于连接到在 Kubernetes 中使用 ETCD 进行集群成员管理的 Alluxio 集群。该服务名为 alluxio-etcd.alluxio-ai,集群名称为 alluxio,位于 alluxio-ai 命名空间,并使用 S3 作为底层存储。

import fsspec
from alluxiofs import AlluxioFileSystem, setup_logger

# Register Alluxio to fsspec
fsspec.register_implementation("alluxiofs", AlluxioFileSystem, clobber=True)

# Create Alluxio filesystem
alluxio_options = {
    "alluxio.worker.page.store.page.size": "4MB",
}
alluxio_fs = fsspec.filesystem(
    "alluxiofs",
    cluster_name="alluxio",
    etcd_hosts="alluxio-etcd.alluxio-ai",
    target_protocol="s3",
    options=alluxio_options
    logger=setup_logger("./", level=logging.DEBUG),
)

alluxiofs基本的文件操作

以下是 alluxio_fs 支持的文件和目录常用操作:

1. ls

ls(self, path: str, detail: bool=False) -> list[dict]

列出目录内容

参数

• path (str): 目标目录路径 • detail (bool, 可选): 为True时返回详细条目信息,默认为False

返回值

• list[dict]: 包含JSON格式信息的字典列表

示例

contents = alluxio_fs.ls("/目录路径", False)
print(contents)

2. info

info(self, path: str) -> dict

获取文件/目录信息

参数

• path (str): 目标路径

返回值

• dict: JSON格式的文件/目录信息

示例

file_info = alluxio_fs.info("/文件路径")
print(file_info)

3. isdir

isdir(self, path: str) -> bool

检查路径是否为目录

参数

• path (str): 目标路径

返回值

• bool: 是否为目录

示例

is_directory = alluxio_fs.isdir("/目录路径")
print(is_directory)

4. _open

_open(self, path: str, mode: str="rb", block_size: int=None, autocommit: bool=True, cache_options: dict=None, **kwargs) -> AlluxioFile

打开文件进行读写操作

参数

• path (str): 文件路径 • mode (str, 可选): 打开模式,默认为"rb" • block_size (int, 可选): 读取块大小 • autocommit (bool, 可选): 是否自动提交更改 • cache_options (dict, 可选): 缓存选项 • **kwargs: 其他关键字参数

返回值

• AlluxioFile: 支持read()/write()等操作的文件对象

示例

with alluxio_fs._open("/文件路径", mode="rb") as f:
    data = f.read()
    print(data)

with alluxio_fs._open("/文件路径", mode="wb") as f:
    f.write(data)

5. cat_file

cat_file(self, path: str, start: int=0, end: int=None) -> bytes

读取文件指定字节范围(适用于<10MB的小文件)

参数

• path (str): 文件路径 • start (int, 可选): 起始字节,默认为0 • end (int, 可选): 结束字节,默认为None

返回值

• bytes: 读取的字节数据

示例

data = alluxio_fs.cat_file("/文件路径", start=0, end=100)
print(data)

6. mkdir

mkdir(self, path: str) -> bool

创建新目录

参数

• path (str): 新目录路径

返回值

• bool: 是否创建成功

示例

alluxio_fs.mkdir("/新目录路径")

7. rm

rm(self, path: str, recursive: bool=False, recursiveAlias: bool=False, removeAlluxioOnly: bool=False, deleteMountPoint: bool=False, syncParentNextTime: bool=False, removeUncheckedOptionChar: bool=False) -> bool

删除文件或目录

参数: 多个控制选项标志

返回值

• bool: 是否操作成功

示例

alluxio_fs.rm("/目标路径", recursive=True)

8. touch

touch(self, path: str) -> bool

创建空文件

参数

• path (str): 文件路径

返回值

• bool: 是否创建成功

示例

alluxio_fs.touch("/新文件路径")

9. head

head(self, path: str, num_of_bytes: int=1024) -> bytes

读取文件开头部分

参数

• path (str): 文件路径 • num_of_bytes (int): 读取字节数

返回值

• bytes: 读取的字节数据

示例

data = alluxio_fs.head("/文件路径", num_of_bytes=1024)
print(data)

10. tail

tail(self, path: str, num_of_bytes: int=1024) -> bytes

读取文件末尾部分

参数

• path (str): 文件路径 • num_of_bytes (int): 读取字节数

返回值

• bytes: 读取的字节数据

示例

data = alluxio_fs.tail("/文件路径", num_of_bytes=1024)
print(data)

11. mv

mv(self, path1: str, path2: str) -> bool

移动或重命名文件/目录

参数

• path1 (str): 源路径 • path2 (str): 目标路径

返回值

• bool: 是否操作成功

示例

alluxio_fs.mv("/源路径", "/目标路径")

12. copy / cp_file

copy(self, path1: str, path2: str, recursive: bool=False, recursiveAlias: bool=False, force: bool=False, thread: int=None, bufferSize: int=None, preserve: bool=None) -> bool

复制文件或目录

参数: 递归/强制/线程等多种选项

返回值

• bool: 是否复制成功

示例

alluxio_fs.copy("/源路径", "/目标路径", recursive=True)

13. read

read(self, path: str) -> bytes

读取整个文件(适用于<10MB的小文件)

参数

• path (str): 文件路径

返回值

• bytes: 文件完整内容

示例

data = alluxio_fs.read("/文件路径")
print(data)

14. rename / move

rename(self, path1: str, path2: str) -> bool

rename 是 mv 的别名,用于重命名或移动文件或目录。

  • 参数

    • path1 (str):源路径

    • path2 (str):目标路径

  • 返回值

    • bool:操作是否成功

  • 示例

    python复制编辑alluxio_fs.rename("/path/old", "/path/new")

15. cp_file

cp_file(self, path1: str, path2: str, recursive: bool=Fa

cp_file 是 copy 的别名,用于复制文件或目录。

  • 参数

    • path1 (str):源路径

    • path2 (str):目标路径

    • recursive (bool):是否递归复制目录(默认 False)

  • 返回值

    • bool:复制操作是否成功

  • 示例

    python复制编辑alluxio_fs.cp_file("/path/source", "/path/destination", recursive=True)

16. upload

upload(self, lpath: str, rpath: str) -> bool

从本地文件系统上传大文件到Alluxio(根据WriteType可能同时写入UFS)。该方法采用分块读取和流式写入方式。

参数

• lpath (str): 本地文件系统源路径 • rpath (str): Alluxio/UFS目标路径

返回值

• bool: 是否上传成功

示例

# 上传大文件到Alluxio
alluxio_fs.upload("/本地路径", "/UFS路径")

17. upload_data

upload_data(self, lpath: str, data: bytes) -> bool

使用字节流将大型文件上传到 Alluxio。与从磁盘读取的 upload 方法不同,upload_data 直接接收字节内容。

  • 参数

    • lpath (str):Alluxio/UFS 中的目标路径

    • data (bytes):要上传的字节内容

  • 返回值

    • bool:上传任务是否成功完成

  • 示例

    python复制编辑# 以字节流方式上传数据
    alluxio_fs.upload_data("/path/in/UFS", b"binary data")

18. download

download(self, lpath: str, rpath: str) -> bool

将 Alluxio 中的大文件下载到本地文件系统中。

参数

  • lpath (str):本地文件系统中的目标路径。

  • rpath (str):Alluxio 中的源路径。

返回值

  • bool:下载任务是否成功完成。

示例

# 从 Alluxio 下载大文件
alluxio_fs.download("/local/path", "/alluxio/path")

19. download_data

download_data(self, lpath: str) -> io.BytesIO

将 Alluxio 中的文件以内存字节流的方式下载返回。

参数

  • lpath (str):Alluxio 文件路径。

返回值

  • io.BytesIO:包含文件内容的字节流对象。

示例

# 以字节流方式下载文件
byte_stream = alluxio_fs.download_data("/path/in/UFS")
data = byte_stream.read()

20. write

write(self, path: str, value: bytes) -> bool

与其他框架的集成

举例:Ray

像PyTorch这样的训练器会在每个时期重复读取相同的数据集。 对于PyTorch来说,在每个 epoch 获取大量的数据集已经成为训练的瓶颈。 通过利用Alluxio的高性能分布式缓存,Ray上的训练器可以减少总训练时间,提高GPU利用率,并缩短端到端的模型生命周期。

先决条件: Ray版本 >= 2.8.2

# Pass the initialized Alluxio filesystem to Ray and read the dataset
ds = ray.data.read_parquet("s3://bucket/path/to/dataset/file1.parquet", filesystem=alluxio_fs)

# Get a count of the number of records in the single file
ds.count()

# Display the schema derived from the file header record
ds.schema()

# Display the header record
ds.take(1)

# Display the first data record
ds.take(2)

# Read multiple files:
ds2 = ray.data.read_parquet("s3://bucket/path/to/dataset/", filesystem=alluxio_fs)

# Get a count of the number of records in the files
ds2.count()

PyArrow

PyArrow通过提供高性能的内存列存储格式,可以让应用程序和数据无缝连接。 它实现了不同数据处理系统之间的高效数据交换。 通过将其存储接口委托给fsspec,PyArrow可以通过统一的接口访问各种存储后端。 通过使用alluxiofs,PyArrow可以利用Alluxio的分布式缓存能力来提高数据访问速度并降低延迟。

示例1:

# Pass the initialized Alluxio filesystem to Pyarrow and read the data set from the example csv file
import pyarrow.dataset as ds
dataset = ds.dataset("s3://bucket/path/to/dataset/file1.parquet", filesystem=alluxio_fs)

# Get a count of the number of records in the file
dataset.count_rows()

# Display the schema derived from the file header record
dataset.schema

# Display the first record
dataset.take(0)

示例2:

from pyarrow.fs import PyFileSystem, FSSpecHandler

# Create a python-based PyArrow filesystem using FsspecHandler
py_fs = PyFileSystem(FSSpecHandler(alluxio_fs))

# Read the data by using the Pyarrow filesystem interface
with py_fs.open_input_file("s3://bucket/path/to/dataset/file1.parquet") as f:
    f.read()

高级初始化参数

连接Alluxio集群的参数

  • etcd_hosts (str, 必需): ETCD服务器主机地址列表,格式为 "host1:port1,host2:port2,..."。 ETCD用于动态发现Alluxio工作节点。

  • etcd_port (int, 可选): 每个ETCD服务器使用的端口号。默认为 2379。

  • options (dict, 可选): 一个包含Alluxio配置选项的字典,其中键是属性名称,值是属性值。 这些选项用于配置Alluxio客户端的行为。

示例:配置 Alluxio fsspec。请注意,以下选项在 alluxiofs 和 Alluxio 集群之间必须保持一致。

  • alluxio.worker.page.store.page.size(默认值 1MB):Worker 分页块存储中每个页面的大小。对于大型 Parquet 文件,建议设置为 20MB。

  • alluxio.user.worker.selection.policy.consistent.hash.virtual.node.count.per.worker(默认值 2000):在一致性哈希算法中,每个 Worker 的虚拟节点数。在成员变更时,一致性哈希算法会重新分配部分虚拟节点,而不是重建整个哈希表,从而尽可能减少哈希表的变更。为了实现这一点,虚拟节点数应为集群中物理节点数的 X 倍,其中 X 是虚拟节点分配粒度和规模之间的平衡参数。建议将此值设置为 5。

alluxio_options = {
    "alluxio.user.worker.selection.policy.consistent.hash.virtual.node.count.per.worker": "2000",
    "alluxio.worker.page.store.page.size": "4MB",
}

(可选)初始化 alluxio_client 以进行分布式加载作业:

alluxio_client = AlluxioClient(etcd_hosts="host1,host2,host3", etcd_port=8888, )

初始化 alluxio_fs 以进行 fsspec 文件系统操作:

alluxio_fs = fsspec.filesystem("alluxiofs", etcd_hosts="localhost", target_protocol="s3", options=alluxio_options)

配置 alluxio_fs 的logger

import logging
import fsspec
from alluxiofs import AlluxioFileSystem, setup_logger

alluxio_fs = fsspec.filesystem("alluxiofs", etcd_hosts="localhost", logger=setup_logger("./", level=logging.DEBUG))

底层存储的参数

参数:

  • target_protocol (str, 可选): 指定要创建底层存储文件系统对象的底层存储协议。 常见的示例包括s3用于Amazon S3,hdfs 用于Hadoop分布式文件系统等。

  • target_options (dict, 可选): 提供一组与 target_protocol 相关的配置选项。 这些选项可能包括凭证、终端节点URL和其他与底层存储系统成功交互所需的特定协议的设置。

  • fs (object, 可选): 直接提供用于访问Alluxio底层存储的文件系统对象实例。

  • logger(object,可选): 配置日志文件的存储路径和日志级别,路径默认为当前路径,日志级别默认为 logging.INFO。

示例: 连接到 S3

要连接到S3,可以按照以下步骤进行:

  • anon bool (False): 是否使用匿名连接(仅限公共存储桶)。 如果为False,则使用给定的key/secret,或boto的凭证解析器;client_kwargs、环境变量、配置文件、EC2 IAM服务器,依次进行。

  • endpoint_url string (None): 使用此endpoint_url(如指定)。连接到非AWS S3存储桶时会需要用到。优先于client_kwargs中的 endpoint_url。

  • key string (None): 非匿名的情况下,请使用此访问密钥ID(如指定)。优先于client_kwargs中的 aws_access_key_id。

  • secret string (None): 非匿名的情况下,请使用此密钥访问密钥(如指定)。优先于client_kwargs中的 aws_secret_access_key。

  • token string (None): 非匿名的情况下,请使用此安全 token(如指定)。

  1. 将支持的参数作为target_options传递给Alluxio:然后,您可以使用这些参数使用fsspec创建Alluxio文件系统对象。

以下是如何使用fsspec创建连接到S3的Alluxio文件系统对象的示例:

import fsspec

# Example configuration for connecting to S3
s3_options = {
    "key": "your-aws-access-key",
    "secret": "your-aws-secret-key",
    "endpoint_url": "https://s3.your-endpoint.com"
}

# Creating the Alluxio file system object
alluxio_fs = fsspec.filesystem(
    "alluxiofs",
    etcd_hosts="localhost",
    target_protocol="s3",
    target_options=s3_options
)

# Now you can use alluxio_fs to interact with the Alluxio file system backed by S3

在此示例中:

  • 将your-aws-access-key 和 your-aws-secret-key 替换为您实际的AWS凭证。

  • 如果需要,将 https://s3.your-endpoint.com 替换为合适的且与您的S3兼容的服务端点URL。

按照这些步骤,您可以使用fsspec有效地连接到使用S3作为底层存储的Alluxio集群。

监控指标

监控系统搭建

  1. Prometheus 安装与配置 修改 prometheus.yml 配置文件,然后启动 Prometheus:

    nohup ./prometheus --web.enable-admin-api --config.file=./prometheus.yml >./prometheus.log 2>&1 &
  2. Grafana 安装与配置 启动 Grafana:

    nohup ./bin/grafana-server --homepath . web >./grafana.log 2>&1 &

监控指标的说明

指标名称

描述

标签

单位

实现代码

alluxio_http_server_call_latency_ms

HTTP 服务调用延迟的直方图(Bucket 边界: [10, 40, 160, 640] 毫秒)

method, success

毫秒 (ms)

HistogramWrapper

alluxio_http_server_result_total

HTTP 服务结果的总计数

method, state

次数 (count)

CounterWrapper

alluxio_http_server_call_latency_ms_sum

HTTP 服务调用延迟总和

method, success

毫秒 (ms)

HistogramWrapper

aluxio_http_server_call_latency_ms_count

HTTP 服务调用次数

method, success

次数 (count)

HistogramWrapper

如果您希望在 Kubernetes 中启动集群,请参考。

使用 FSSpec 与 Alluxio 交互 关于具体的细节,参考和

下面是一个简单的示例,该例子创建了一个客户端,用于连接到在 Kubernetes 中使用 ETCD 进行集群成员管理的 Alluxio 集群。集群的服务名称为 alluxio-etcd.alluxio-ai,集群名称为 alluxio,位于 alluxio-ai 命名空间,并使用 S3 作为底层存储。有关更多配置设置,请参见。

有关Alluxio集群和/或存储系统连接的高级参数设置,请参见。

更多关于fsspec的Python文件系统操作示例,可以参见。

是一个快速且简单的框架,用于构建和运行分布式应用程序。 在Ray之上运行的PyTorch,TensorFlow和XGBoost训练器可以利用Ray的高级功能,如创建异构集群,用CPU机器来进行数据加载和预处理,用GPU机器来进行训练。 可以使用Ray Data并行化进行数据加载、预处理和训练。

查看S3 fsspec文档:参考 以了解连接到S3所支持的参数。典型的参数包括:

详细的指标监控系统搭建可以参考 。

FSSpec
安装第三方S3存储
Alluxio 官方文档
创建alluxiofs实例
alluxiofs基本的文件操作
连接Alluxio集群的参数
高级初始化参数
fsspec官方文档
Ray
s3fs文档
Alluxio 官方文档