通过 FSSpec 的 Python API
此功能为实验性功能。
Alluxio Python SDK (alluxiofs
) 基于 FSSpec,它允许应用程序使用统一的 Python 文件系统接口与各种存储后端无缝交互。它利用高性能的分布式缓存层,即 Alluxio 集群,显著提高数据访问速度并减少延迟。这对于数据密集型应用程序和工作流,尤其是需要快速、重复访问大型数据集的 AI 训练工作负载,尤其有益。
先决条件
一个正在运行的 Alluxio 集群
Python 版本 >= 3.8
安装
安装依赖
安装底层数据湖存储(例如 s3、oss)
示例:将 S3 部署为底层数据湖存储 安装第三方 S3 存储
pip install s3fs
安装 alluxiofs
pip install alluxiofs
环境设置
启动至少有 1 个协调器和 1 个 worker 的 Alluxio 集群
如果要在 K8s 中启动集群,请参阅 Alluxio 官方文档。
如果要在裸机上启动集群,可以使用
./bin/alluxio process start
cli。
使用 UFS 的凭据配置 Alluxio
#s3 相关
s3a.accessKeyId=your-s3a-accessKeyId
s3a.secretKey=your-s3a-secretKey
#oss 相关
fs.oss.accessKeyId=your-oss-keyid
fs.oss.accessKeySecret=your-oss-secert
fs.oss.endpoint=your-oss-endpoint
使用 alluxiofs 与 alluxio 交互
有关详细信息,请参阅创建 alluxiofs 实例和alluxiofs 基本文件操作。
将数据加载到 Alluxio 集群
如果数据已加载到 Alluxio 集群中,请跳过此步骤。
这个简单的示例创建了一个客户端,该客户端连接到 Kubernetes 中具有 ETCD 成员资格的 Alluxio 集群,集群名称为 alluxio
,命名空间为 ai
,并使用 S3 作为底层存储。 有关更多配置设置,请参阅连接到 Alluxio 服务器的高级参数。
向 Alluxio 集群提交分布式加载作业:
from alluxiofs.client import AlluxioClient
alluxio_client = AlluxioClient(cluster_name="ai-alluxio", etcd_hosts="127.0.0.1", etcd_port=2379)
alluxio_client.submit_load("s3://bucket/path/to/dataset/")
这将异步触发加载作业。您可以使用以下命令等待加载完成或检查此加载过程的进度:
alluxio_client.load_progress("s3://bucket/path/to/dataset/")
要取消分布式加载作业:
alluxio_client.stop_load("s3://bucket/path/to/dataset/")
创建 alluxiofs 实例
这个简单的示例创建了一个文件系统处理程序,该处理程序连接到 Kubernetes 中具有 ETCD 成员资格的 Alluxio 集群,集群名称为 alluxio
,命名空间为 ai
,并使用 S3 作为底层存储。
import fsspec
from alluxiofs import AlluxioFileSystem, setup_logger
# Register Alluxio to fsspec
fsspec.register_implementation("alluxiofs", AlluxioFileSystem, clobber=True)
# Create Alluxio filesystem
alluxio_options = {
"alluxio.worker.page.store.page.size": "4MB",
}
alluxio_fs = fsspec.filesystem(
"alluxiofs",
cluster_name="ai-alluxio",
etcd_hosts="127.0.0.1", # the ip address of etcd
etcd_port=2379, # the port of etcd
target_protocol="s3",
options=alluxio_options
logger=setup_logger("./", level=logging.DEBUG),
)
有关为 Alluxio 集群和/或存储系统连接设置高级参数,请参阅高级初始化参数。
alluxiofs 的基本文件操作
以下是 alluxio_fs
支持的用于与文件和目录交互的常见操作:
1. ls
ls
ls(self, path: str, detail: bool=False) -> list[dict]
列出目录的内容。
参数
path (str)
: 要列出的目录的路径。detail (bool, optional)
: 如果为 True,则返回有关每个条目的详细信息。默认为 False。
返回
list[dict]
: 具有类 JSON 结构的字典列表。
示例
contents = alluxio_fs.ls("/path/to/directory", False)
print(contents)
2. info
info
info(self, path: str) -> dict
检索有关文件或目录的信息。
参数
path (str)
: 文件或目录的路径。
返回
dict
: 具有文件或目录信息的类 JSON 结构。
示例
file_info = alluxio_fs.info("/path/to/file")
print(file_info)
3. isdir
isdir
isdir(self, path: str) -> bool
检查路径是否为目录。
参数
path (str)
: 要检查的路径。
返回
bool
: 路径是否为目录。
示例
is_directory = alluxio_fs.isdir("/path/to/directory")
print(is_directory)
4. _open
_open
_open(self, path: str, mode: str="rb", block_size: int=None, autocommit: bool=True, cache_options: dict=None, **kwargs) -> AlluxioFile
打开文件以进行读取或写入。
参数
path (str)
: 要打开的文件的路径。mode (str, optional)
: 打开文件的模式。默认为 "rb"。block_size (int, optional)
: 读取的块大小。autocommit (bool, optional)
: 如果为 True,则自动提交更改。cache_options (dict, optional)
: 缓存选项。**kwargs
: 其他关键字参数。
返回
AlluxioFile
: 支持read()
、write()
等的对象。
示例
with alluxio_fs._open("/path/to/file", mode="rb") as f:
data = f.read()
print(data)
with alluxio_fs._open("/path/to/file", mode="wb") as f:
f.write(data)
5. cat_file
cat_file
cat_file(self, path: str, start: int=0, end: int=None) -> bytes
从文件中读取一定范围的字节(适用于小于 10MB 的小文件)。
参数
path (str)
: 要读取的文件的路径。start (int, optional)
: 起始字节。默认为 0。end (int, optional)
: 结束字节。默认为 None。
返回
bytes
: 读取的字节。
示例
data = alluxio_fs.cat_file("/path/to/file", start=0, end=100)
print(data)
6. mkdir
mkdir
mkdir(self, path: str) -> bool
创建目录。
参数
path (str)
: 新目录的路径。
返回
bool
: 目录是否成功创建。
示例
alluxio_fs.mkdir("/path/to/new_directory")
7. rm
rm
rm(self, path: str, recursive: bool=False, recursiveAlias: bool=False, removeAlluxioOnly: bool=False, deleteMountPoint: bool=False, syncParentNextTime: bool=False, removeUncheckedOptionChar: bool=False) -> bool
删除文件或目录。
参数: 用于控制的多个可选标志。
返回
bool
: 操作是否成功。
示例
alluxio_fs.rm("/path/to/file_or_directory", recursive=True)
8. touch
touch
touch(self, path: str) -> bool
创建空文件。
参数
path (str)
: 文件路径。
返回
bool
: 文件是否已创建。
示例
alluxio_fs.touch("/path/to/new_file")
9. head
head
head(self, path: str, num_of_bytes: int=1024) -> bytes
读取文件的前几个字节。
参数
path (str)
: 文件路径。num_of_bytes (int)
: 字节数。
返回
bytes
: 读取的字节。
示例
data = alluxio_fs.head("/path/to/file", num_of_bytes=1024)
print(data)
10. tail
tail
tail(self, path: str, num_of_bytes: int=1024) -> bytes
读取文件的最后几个字节。
参数
path (str)
: 文件路径。num_of_bytes (int)
: 字节数。
返回
bytes
: 读取的字节。
示例
data = alluxio_fs.tail("/path/to/file", num_of_bytes=1024)
print(data)
11. mv
mv
mv(self, path1: str, path2: str) -> bool
移动或重命名文件或目录。
参数
path1 (str)
: 源路径。path2 (str)
: 目标路径。
返回
bool
: 移动/重命名是否成功。
示例
alluxio_fs.mv("/path/to/source", "/path/to/destination")
12. copy
/ cp_file
copy
/ cp_file
copy(self, path1: str, path2: str, recursive: bool=False, recursiveAlias: bool=False, force: bool=False, thread: int=None, bufferSize: int=None, preserve: bool=None) -> bool
复制文件或目录。
参数: 用于递归、强制、线程等的各种选项。
返回
bool
: 复制是否成功。
示例
alluxio_fs.copy("/path/to/source", "/path/to/destination", recursive=True)
13. read
read
read(self, path: str) -> bytes
读取整个文件(适用于小于 10MB 的小文件)。
参数
path (str)
: 文件路径。
返回
bytes
: 读取的内容。
示例
data = alluxio_fs.read("/path/to/file")
print(data)
14. rename
/ move
rename
/ move
rename(self, path1: str, path2: str) -> bool
mv
的别名。重命名或移动文件或目录。
参数
path1 (str)
: 源路径。path2 (str)
: 目标路径。
返回
bool
: 操作是否成功。
示例
alluxio_fs.rename("/path/old", "/path/new")
15. cp_file
cp_file
cp_file(self, path1: str, path2: str, recursive: bool=False) -> bool
copy
的别名。复制文件或目录。
参数
path1 (str)
: 源路径。path2 (str)
: 目标路径。recursive (bool)
: 是否为目录递归复制。
返回
bool
: 复制是否成功。
示例
alluxio_fs.cp_file("/path/source", "/path/destination", recursive=True)
16. upload
upload
upload(self, lpath: str, rpath: str) -> bool
将大文件从本地操作系统文件系统上传到 Alluxio(以及 UFS,取决于 WriteType)。
参数
lpath (str)
: 本地操作系统中文件的源路径。rpath (str)
: Alluxio/UFS 中文件的目标路径。
返回
bool
: 上传任务是否成功完成。
示例
# 将大文件上传到 Alluxio
alluxio_fs.upload("/path/in/local", "/path/in/UFS")
17. upload_data
upload_data
upload_data(self, lpath: str, data: bytes) -> bool
使用字节流将大文件上传到 Alluxio。与从磁盘读取的 upload
不同,upload_data
直接接受字节。
参数
lpath (str)
: Alluxio/UFS 中的目标路径。data (bytes)
: 要上传的字节内容。
返回
bool
: 上传任务是否成功完成。
示例
# upload data as byte stream
alluxio_fs.upload_data("/path/in/UFS", b"binary data")
18. download
download
download(self, lpath: str, rpath: str) -> bool
将大文件从 Alluxio 下载到本地操作系统文件系统。
参数
lpath (str)
: 本地文件系统中的目标路径。rpath (str)
: Alluxio 中的源路径。
返回
bool
: 下载任务是否成功完成。
示例
# download a large file from Alluxio
alluxio_fs.download("/local/path", "/alluxio/path")
19. download_data
download_data
download_data(self, lpath: str) -> io.BytesIO
从 Alluxio 下载文件并将其作为内存中的字节流返回。
参数
lpath (str)
: Alluxio 中文件的路径。
返回
io.BytesIO
: 包含文件内容的字节流。
示例
# download as byte stream
byte_stream = alluxio_fs.download_data("/path/in/UFS")
data = byte_stream.read()
20. write
write
write(self, path: str, value: bytes) -> bool
将字节数据写入 Alluxio 中的文件。等同于 upload_data
。
参数
path (str)
: 要写入数据的路径。value (bytes)
: 字节内容。
返回
bool
: 写入是否成功。
示例
alluxio_fs.write("/path/to/file", b"data to write")
更多 Python 文件系统操作示例可以在这里找到。
与其他框架集成
示例:Ray
Ray 是一个用于构建和运行分布式应用程序的快速简单的框架。在 Ray 上运行的 PyTorch、TensorFlow 和 XGBoost 训练器可以利用 Ray 的高级功能,例如创建由用于数据加载和预处理的 CPU 机器和用于训练的 GPU 机器组成的异构集群。数据加载、预处理和训练可以使用 Ray Data 进行并行化。
像 PyTorch 这样的训练器会在每个 epoch 中一遍又一遍地读取相同的数据集。在每个 epoch 中为 PyTorch 获取大型数据集成为训练瓶颈。通过利用 Alluxio 的高性能分布式缓存,Ray 上的训练器可以减少总训练时间,提高 GPU 利用率,并加速端到端模型生命周期。
先决条件:Ray 版本 >= 2.8.2
# Pass the initialized Alluxio filesystem to Ray and read the dataset
ds = ray.data.read_parquet("s3://bucket/path/to/dataset/file1.parquet", filesystem=alluxio_fs)
# Get a count of the number of records in the single file
ds.count()
# Display the schema derived from the file header record
ds.schema()
# Display the header record
ds.take(1)
# Display the first data record
ds.take(2)
# Read multiple files:
ds2 = ray.data.read_parquet("s3://bucket/path/to/dataset/", filesystem=alluxio_fs)
# Get a count of the number of records in the files
ds2.count()
示例:PyArrow
PyArrow 通过提供高性能的内存中列式存储格式,允许应用程序和数据无缝连接。它实现了不同数据处理系统之间的高效数据交换。通过将其存储接口委托给 fsspec,PyArrow 可以通过统一的接口访问各种存储后端。通过使用 alluxiofs,PyArrow 可以利用 Alluxio 的分布式缓存功能来提高数据访问速度并减少延迟。
示例 1:
# Pass the initialized Alluxio filesystem to Pyarrow and read the data set from the example csv file
import pyarrow.dataset as ds
dataset = ds.dataset("s3://bucket/path/to/dataset/file1.parquet", filesystem=alluxio_fs)
# Get a count of the number of records in the file
dataset.count_rows()
# Display the schema derived from the file header record
dataset.schema
# Display the first record
dataset.take(0)
示例 2:
from pyarrow.fs import PyFileSystem, FSSpecHandler
# Create a python-based PyArrow filesystem using FsspecHandler
py_fs = PyFileSystem(FSSpecHandler(alluxio_fs))
# Read the data by using the Pyarrow filesystem interface
with py_fs.open_input_file("s3://bucket/path/to/dataset/file1.parquet") as f:
f.read()
高级初始化参数
连接到 Alluxio 集群的参数
etcd_hosts (str, required): ETCD 服务器主机的逗号分隔列表,格式为 "host1:port1,host2:port2,..."。ETCD 用于动态发现 Alluxio worker。
etcd_port (int, optional): 每个 ETCD 服务器使用的端口号。默认为
2379
。options (dict, optional): Alluxio 配置选项的字典,其中键是属性名称,值是属性值。这些选项配置 Alluxio 客户端行为。
示例:配置 Alluxio fsspec。请注意,以下选项在 alluxiofs 和 alluxio 集群之间必须相同
alluxio.worker.page.store.page.size
(默认1MB
): worker 分页块存储中每个页面的大小。对于大型 parquet 文件,建议设置为20MB
。alluxio.user.worker.selection.policy.consistent.hash.virtual.node.count.per.worker
(默认2000
): 这是一致性哈希算法中一个 worker 的虚拟节点数。在一致性哈希算法中,当成员资格发生变化时,会重新分配一些虚拟节点,而不是重建整个哈希表。这保证了哈希表仅发生最小程度的更改。为了实现这一点,虚拟节点的数量应该是集群中物理节点的 X 倍,其中 X 是重新分配粒度和大小之间的平衡。建议设置为5
。
alluxio_options = {
"alluxio.user.worker.selection.policy.consistent.hash.virtual.node.count.per.worker": "2000",
"alluxio.worker.page.store.page.size": "4MB",
}
(可选)为分布式加载操作初始化 alluxio_client
:
alluxio_client = AlluxioClient(etcd_hosts="host1,host2,host3", etcd_port=8888, options=alluxio_options)
为 fsspec 文件系统操作初始化 alluxio_fs
:
alluxio_fs = fsspec.filesystem("alluxiofs", cluster_name="ai-alluxio", etcd_hosts="127.0.0.1", etcd_port=2379, target_protocol="s3", options=alluxio_options)
配置 alluxio_fs
的记录器
import logging
import fsspec
from alluxiofs import AlluxioFileSystem, setup_logger
alluxio_fs = fsspec.filesystem("alluxiofs", cluster_name="ai-alluxio", etcd_hosts="127.0.0.1", etcd_port=2379, logger=setup_logger("./", level=logging.DEBUG))
存储后端的参数
参数:
target_protocol (str, optional): 指定用于创建底层存储文件系统对象的底层存储协议。常见示例包括用于 Amazon S3 的
s3
、用于 Hadoop 分布式文件系统的hdfs
等。target_options (dict, optional): 提供与
target_protocol
相关的一组配置选项。这些选项可能包括凭据、端点 URL 以及与底层存储系统成功交互所需的其他协议特定设置。fs (object, optional): 直接提供用于访问 Alluxio 底层存储的文件系统对象实例
logger(object, optional): 配置存储日志文件的路径和记录器的级别,路径默认为当前路径,级别默认为 logging.INFO。
示例:连接到 S3
要连接到 S3,您可以按照以下步骤操作:
查看 S3 fsspec 文档:请参阅 s3fs 文档 以了解连接到 S3 支持的参数。典型参数包括:
anon bool (False): 是否使用匿名连接(仅限公共存储桶)。如果为 False,则使用给定的密钥/秘密,或 boto 的凭据解析器;client_kwargs、环境变量、配置文件、EC2 IAM 服务器,按此顺序
endpoint_url string (None): 如果指定,则使用此 endpoint_url。连接到非 AWS S3 存储桶时需要。优先于 client_kwargs 中的
endpoint_url
。key string (None): 如果不是匿名,则使用此访问密钥 ID(如果指定)。优先于 client_kwargs 中的
aws_access_key_id
。secret string (None): 如果不是匿名,则使用此秘密访问密钥(如果指定)。优先于 client_kwargs 中的
aws_secret_access_key
。token string (None): 如果不是匿名,则使用此安全令牌(如果指定)
将支持的参数作为 target_options 传递给 Alluxio:然后,您可以使用这些参数使用 fsspec 创建 Alluxio 文件系统对象。
以下是使用 fsspec 创建连接到 S3 的 Alluxio 文件系统对象的方法:
import fsspec
# Example configuration for connecting to S3
s3_options = {
"key": "your-aws-access-key",
"secret": "your-aws-secret-key",
"endpoint_url": "https://s3.your-endpoint.com"
}
# Creating the Alluxio file system object
alluxio_fs = fsspec.filesystem(
"alluxiofs",
cluster_name="ai-alluxio",
etcd_hosts="127.0.0.1",
etcd_port=2379,
target_protocol="s3",
target_options=s3_options
)
# Now you can use alluxio_fs to interact with the Alluxio file system backed by S3
在此示例中:
将
your-aws-access-key
和your-aws-secret-key
替换为您的实际 AWS 凭据。如果需要,将
https://s3.your-endpoint.com
替换为您的 S3 兼容服务的相应端点 URL。
通过执行这些步骤,您可以使用 fsspec 有效地连接到具有 S3 后端的 Alluxio。
监控指标
监控系统设置
Prometheus 安装和配置 编辑
prometheus.yml
配置文件,然后启动 Prometheus:nohup ./prometheus --web.enable-admin-api --config.file=./prometheus.yml >./prometheus.log 2>&1 &
Grafana 安装和配置 启动 Grafana:
nohup ./bin/grafana-server --homepath . web >./grafana.log 2>&1 &
有关设置监控系统的更详细说明,请参阅 Alluxio 官方文档。
监控指标说明
指标名称
描述
标签
单位
实现代码
alluxio_http_server_call_latency_ms
HTTP 服务调用延迟的直方图(桶边界:[10, 40, 160, 640] ms)
method
, success
毫秒 (ms)
HistogramWrapper
alluxio_http_server_result_total
HTTP 服务结果总数
method
, state
计数
CounterWrapper
alluxio_http_server_call_latency_ms_sum
HTTP 服务调用总延迟
method
, success
毫秒 (ms)
HistogramWrapper
alluxio_http_server_call_latency_ms_count
HTTP 服务调用计数
method
, success
计数
HistogramWrapper
Last updated