通过 PythonSDK/FSSpec 访问
Last updated
Last updated
实验性功能
Alluxio Python SDK (alluxiofs
) 是基于的,允许应用程序通过统一的Python文件系统接口无缝地与各种存储后端进行交互。它利用高性能的分布式缓存层,即Alluxio集群,显著提高数据访问的速度,降低延迟,这对于数据密集型应用程序和工作流非常有好处,特别是针对AI训练场景,在模型的训练过程中需要快速且重复地访问大型数据集,加速效果异常明显。
正在运行的Alluxio集群
Python版本 >= 3.8
示例:将S3作为底层数据湖存储,
启动 Alluxio 集群,至少包含 1 个 coordinator 和 1 个 worker
如果您希望在物理裸机上启动集群,可以使用 ./bin/alluxio process start
命令行工具。
使用 UFS 的凭证配置 Alluxio
如果数据已经加载到Alluxio集群中,请跳过此步骤。
向Alluxio集群提交分布式加载作业:
这将异步触发加载作业。您可以等待加载完成或使用以下命令检查此加载的进度:
取消分布式加载作业:
以下是一个简单的文件系统处理程序创建过程,用于连接到在 Kubernetes 中使用 ETCD 进行集群成员管理的 Alluxio 集群。该服务名为 alluxio-etcd.alluxio-ai
,集群名称为 alluxio
,位于 alluxio-ai
命名空间,并使用 S3 作为底层存储。
以下是 alluxio_fs
支持的文件和目录常用操作:
ls
列出目录内容
参数
• path (str)
: 目标目录路径
• detail (bool, 可选)
: 为True时返回详细条目信息,默认为False
返回值
• list[dict]
: 包含JSON格式信息的字典列表
示例
info
获取文件/目录信息
参数
• path (str)
: 目标路径
返回值
• dict
: JSON格式的文件/目录信息
示例
isdir
检查路径是否为目录
参数
• path (str)
: 目标路径
返回值
• bool
: 是否为目录
示例
_open
打开文件进行读写操作
参数
• path (str)
: 文件路径
• mode (str, 可选)
: 打开模式,默认为"rb"
• block_size (int, 可选)
: 读取块大小
• autocommit (bool, 可选)
: 是否自动提交更改
• cache_options (dict, 可选)
: 缓存选项
• **kwargs
: 其他关键字参数
返回值
• AlluxioFile
: 支持read()
/write()
等操作的文件对象
示例
cat_file
读取文件指定字节范围(适用于<10MB的小文件)
参数
• path (str)
: 文件路径
• start (int, 可选)
: 起始字节,默认为0
• end (int, 可选)
: 结束字节,默认为None
返回值
• bytes
: 读取的字节数据
示例
mkdir
创建新目录
参数
• path (str)
: 新目录路径
返回值
• bool
: 是否创建成功
示例
rm
删除文件或目录
参数: 多个控制选项标志
返回值
• bool
: 是否操作成功
示例
touch
创建空文件
参数
• path (str)
: 文件路径
返回值
• bool
: 是否创建成功
示例
head
读取文件开头部分
参数
• path (str)
: 文件路径
• num_of_bytes (int)
: 读取字节数
返回值
• bytes
: 读取的字节数据
示例
tail
读取文件末尾部分
参数
• path (str)
: 文件路径
• num_of_bytes (int)
: 读取字节数
返回值
• bytes
: 读取的字节数据
示例
mv
移动或重命名文件/目录
参数
• path1 (str)
: 源路径
• path2 (str)
: 目标路径
返回值
• bool
: 是否操作成功
示例
copy
/ cp_file
复制文件或目录
参数: 递归/强制/线程等多种选项
返回值
• bool
: 是否复制成功
示例
read
读取整个文件(适用于<10MB的小文件)
参数
• path (str)
: 文件路径
返回值
• bytes
: 文件完整内容
示例
14. rename
/ move
rename
是 mv
的别名,用于重命名或移动文件或目录。
参数
path1 (str)
:源路径
path2 (str)
:目标路径
返回值
bool
:操作是否成功
示例
15. cp_file
cp_file
是 copy
的别名,用于复制文件或目录。
参数
path1 (str)
:源路径
path2 (str)
:目标路径
recursive (bool)
:是否递归复制目录(默认 False
)
返回值
bool
:复制操作是否成功
示例
upload
从本地文件系统上传大文件到Alluxio(根据WriteType可能同时写入UFS)。该方法采用分块读取和流式写入方式。
参数
• lpath (str)
: 本地文件系统源路径
• rpath (str)
: Alluxio/UFS目标路径
返回值
• bool
: 是否上传成功
示例
17. upload_data
使用字节流将大型文件上传到 Alluxio。与从磁盘读取的 upload
方法不同,upload_data
直接接收字节内容。
参数
lpath (str)
:Alluxio/UFS 中的目标路径
data (bytes)
:要上传的字节内容
返回值
bool
:上传任务是否成功完成
示例
download
将 Alluxio 中的大文件下载到本地文件系统中。
参数
lpath (str)
:本地文件系统中的目标路径。
rpath (str)
:Alluxio 中的源路径。
返回值
bool
:下载任务是否成功完成。
示例
download_data
将 Alluxio 中的文件以内存字节流的方式下载返回。
参数
lpath (str)
:Alluxio 文件路径。
返回值
io.BytesIO
:包含文件内容的字节流对象。
示例
write
像PyTorch这样的训练器会在每个时期重复读取相同的数据集。 对于PyTorch来说,在每个 epoch 获取大量的数据集已经成为训练的瓶颈。 通过利用Alluxio的高性能分布式缓存,Ray上的训练器可以减少总训练时间,提高GPU利用率,并缩短端到端的模型生命周期。
先决条件: Ray版本 >= 2.8.2
PyArrow通过提供高性能的内存列存储格式,可以让应用程序和数据无缝连接。 它实现了不同数据处理系统之间的高效数据交换。 通过将其存储接口委托给fsspec,PyArrow可以通过统一的接口访问各种存储后端。 通过使用alluxiofs,PyArrow可以利用Alluxio的分布式缓存能力来提高数据访问速度并降低延迟。
示例1:
示例2:
etcd_hosts (str, 必需): ETCD服务器主机地址列表,格式为 "host1:port1,host2:port2,..."。 ETCD用于动态发现Alluxio工作节点。
etcd_port (int, 可选): 每个ETCD服务器使用的端口号。默认为 2379。
options (dict, 可选): 一个包含Alluxio配置选项的字典,其中键是属性名称,值是属性值。 这些选项用于配置Alluxio客户端的行为。
示例:配置 Alluxio fsspec。请注意,以下选项在 alluxiofs 和 Alluxio 集群之间必须保持一致。
alluxio.worker.page.store.page.size
(默认值 1MB):Worker 分页块存储中每个页面的大小。对于大型 Parquet 文件,建议设置为 20MB。
alluxio.user.worker.selection.policy.consistent.hash.virtual.node.count.per.worker
(默认值 2000
):在一致性哈希算法中,每个 Worker 的虚拟节点数。在成员变更时,一致性哈希算法会重新分配部分虚拟节点,而不是重建整个哈希表,从而尽可能减少哈希表的变更。为了实现这一点,虚拟节点数应为集群中物理节点数的 X 倍,其中 X 是虚拟节点分配粒度和规模之间的平衡参数。建议将此值设置为 5。
(可选)初始化 alluxio_client
以进行分布式加载作业:
初始化 alluxio_fs
以进行 fsspec 文件系统操作:
配置 alluxio_fs
的logger
参数:
target_protocol (str, 可选): 指定要创建底层存储文件系统对象的底层存储协议。 常见的示例包括s3
用于Amazon S3,hdfs
用于Hadoop分布式文件系统等。
target_options (dict, 可选): 提供一组与 target_protocol
相关的配置选项。 这些选项可能包括凭证、终端节点URL和其他与底层存储系统成功交互所需的特定协议的设置。
fs (object, 可选): 直接提供用于访问Alluxio底层存储的文件系统对象实例。
logger(object,可选): 配置日志文件的存储路径和日志级别,路径默认为当前路径,日志级别默认为 logging.INFO
。
要连接到S3,可以按照以下步骤进行:
anon bool (False): 是否使用匿名连接(仅限公共存储桶)。 如果为False,则使用给定的key/secret,或boto的凭证解析器;client_kwargs、环境变量、配置文件、EC2 IAM服务器,依次进行。
endpoint_url string (None): 使用此endpoint_url(如指定)。连接到非AWS S3存储桶时会需要用到。优先于client_kwargs中的 endpoint_url
。
key string (None): 非匿名的情况下,请使用此访问密钥ID(如指定)。优先于client_kwargs中的 aws_access_key_id
。
secret string (None): 非匿名的情况下,请使用此密钥访问密钥(如指定)。优先于client_kwargs中的 aws_secret_access_key
。
token string (None): 非匿名的情况下,请使用此安全 token(如指定)。
将支持的参数作为target_options传递给Alluxio:然后,您可以使用这些参数使用fsspec创建Alluxio文件系统对象。
以下是如何使用fsspec创建连接到S3的Alluxio文件系统对象的示例:
在此示例中:
将your-aws-access-key
和 your-aws-secret-key
替换为您实际的AWS凭证。
如果需要,将 https://s3.your-endpoint.com
替换为合适的且与您的S3兼容的服务端点URL。
按照这些步骤,您可以使用fsspec有效地连接到使用S3作为底层存储的Alluxio集群。
Prometheus 安装与配置
修改 prometheus.yml
配置文件,然后启动 Prometheus:
Grafana 安装与配置 启动 Grafana:
指标名称
描述
标签
单位
实现代码
alluxio_http_server_call_latency_ms
HTTP 服务调用延迟的直方图(Bucket 边界: [10, 40, 160, 640] 毫秒)
method
, success
毫秒 (ms)
HistogramWrapper
alluxio_http_server_result_total
HTTP 服务结果的总计数
method
, state
次数 (count)
CounterWrapper
alluxio_http_server_call_latency_ms_sum
HTTP 服务调用延迟总和
method
, success
毫秒 (ms)
HistogramWrapper
aluxio_http_server_call_latency_ms_count
HTTP 服务调用次数
method
, success
次数 (count)
HistogramWrapper
如果您希望在 Kubernetes 中启动集群,请参考。
使用 FSSpec 与 Alluxio 交互 关于具体的细节,参考和
下面是一个简单的示例,该例子创建了一个客户端,用于连接到在 Kubernetes 中使用 ETCD 进行集群成员管理的 Alluxio 集群。集群的服务名称为 alluxio-etcd.alluxio-ai
,集群名称为 alluxio
,位于 alluxio-ai
命名空间,并使用 S3 作为底层存储。有关更多配置设置,请参见。
有关Alluxio集群和/或存储系统连接的高级参数设置,请参见。
更多关于fsspec的Python文件系统操作示例,可以参见。
是一个快速且简单的框架,用于构建和运行分布式应用程序。 在Ray之上运行的PyTorch,TensorFlow和XGBoost训练器可以利用Ray的高级功能,如创建异构集群,用CPU机器来进行数据加载和预处理,用GPU机器来进行训练。 可以使用Ray Data并行化进行数据加载、预处理和训练。
查看S3 fsspec文档:参考 以了解连接到S3所支持的参数。典型的参数包括:
详细的指标监控系统搭建可以参考 。