本指南介绍如何配置 Apache Spark 来访问 Alluxio。
应用程序如果使用 Spark 1.1 或更高版本,可通过 Alluxio 的 HDFS 兼容接口来访问 Alluxio。当使用 Alluxio 作为数据访问层后, Spark 应用可以透明地访问各种持久化存储服务中的数据。特别是当 Spark 部署远离数据时, 应用可主动抓取数据或将其透明地缓存到 Alluxio 中, 从而加速 I/O 性能。此外,Alluxio 可以通过解耦计算与物理存储来简化架构。当持久化存储中的数据路径对 Spark 隐藏时,对底层存储的更改可以独立于应用程序逻辑; 同时,Alluxio 作为近计算的缓存层,仍然可以为计算框架提供数据本地性支持。
本指南介绍了如何在 Kubernetes 环境下将 Apache Spark 与 Alluxio 集成。
先决条件
本指南假定 Alluxio集群已部署在Kubernetes上 。
此外,还需使用docker
来构建自定义的 Spark 镜像。
准备镜像
要与 Spark 集成,须在 Spark 镜像中添加 Alluxio 的 jar 文件和配置文件。 Spark 镜像修改后需启动 Spark 容器才能连接到 Alluxio 集群。
在Alluxio安装说明 中列出的下载文件中, 找到名为alluxio-enterprise-DA-3.2-8.0.0-release.tar.gz
的压缩包。从压缩包解压以下 Alluxio jar文件:
client/alluxio-DA-3.2-8.0.0-client.jar
client/ufs/alluxio-underfs-s3a-shaded-DA-3.2-8.0.0.jar
(如果使用 S3 存储桶作为 UFS)
准备一个空目录作为构建镜像的工作目录。在此目录中,创建目录files/alluxio/
,并将前面提到的 jar 文件复制到其中。
创建一个Dockerfile
来修改基础 Spark 镜像。定义参数示例如下:
SPARK_VERSION=3.5.2
作为 Spark 的版本
UFS_JAR=files/alluxio/alluxio-underfs-s3a-shaded-DA-3.2-8.0.0.jar
作为复制到 files/alluxio/
的 UFS jar 的路径
CLIENT_JAR=files/alluxio/alluxio-DA-3.2-8.0.0-client.jar
作为复制到 files/alluxio/
的 Alluxio 客户端 jar 的路径
Copy # 使用官方的 Spark 3.5.x(或更高版本)镜像作为基础镜像
ARG SPARK_VERSION= 3.5.2
ARG IMAGE=apache/spark: ${SPARK_VERSION}
FROM $IMAGE
ARG SPARK_UID= 185
ARG UFS_JAR=files/alluxio/alluxio-underfs-s3a-shaded-{{site.ALLUXIO_VERSION_STRING}}.jar
ARG CLIENT_JAR=files/alluxio/alluxio-{{site.ALLUXIO_VERSION_STRING}}-client.jar
USER root
# 创建 /opt/alluxio 目录
RUN mkdir -p /opt/alluxio && \
mkdir -p /opt/alluxio/lib
# 将 Alluxio 客户端 JAR 文件复制到 /opt/alluxio 目录
COPY ${UFS_JAR} /opt/alluxio/lib/
COPY ${CLIENT_JAR} /opt/alluxio/
ENV SPARK_HOME=/opt/spark
WORKDIR /opt/spark/work-dir
RUN chmod g+w /opt/spark/work-dir
RUN chmod a+x /opt/decom.sh
ENTRYPOINT [ "/opt/entrypoint.sh" ]
USER ${SPARK_UID}
运行以下命令来构建镜像,并将 <PRIVATE_REGISTRY>
替换为您的私有容器仓库的URL,将 <SPARK_VERSION>
替换为相应的 Spark 版本。 在以下示例中,我们将继续使用 3.5.2 作为 Spark 版本,以 <SPARK_VERSION>
表示。
Copy $ docker build --platform linux/amd64 -t < PRIVATE_REGISTRY > /alluxio/spark: < SPARK_VERSION > .
通过运行以下命令来上传镜像:
Copy $ docker push < PRIVATE_REGISTRY > /alluxio/spark: < SPARK_VERSION >
部署 Spark
在提交 Spark 作业之前,我们需要完成以下几项操作:
为 Alluxio 创建服务账户 (如果使用 IAM)
安装 Spark Operator
如果您使用 aws-samples/emr-on-eks-benchmark 来创建 EKS 集群, spark-operator
将会被安装在脚本中,因此无需再次安装。
以下说明来自 spark-operator
入门指南 。
在 Helm 中添加 spark-operator 仓库。
Copy $ helm repo add spark-operator https://kubeflow.github.io/spark-operator
$ helm repo update
要添加自定义配置,可以创建一个 spark-operator.yaml
文件。例如,以下示例将命名空间设置为 spark
(非必需,仅以此为例):
Copy spark :
jobNamespaces :
- spark
运行以下命令,来使用这些配置安装 spark-operator:
Copy $ helm install spark-operator spark-operator/spark-operator -f spark-operator.yaml \
--namespace spark-operator \
--create-namespace \
--set webhook.enable= true \
--set rbac.create= true
挂载 Alluxio 的 configMaps 需要进行webhook.enable
设置。
检查 spark-operator 的状态。如果状态为Running
,则可以提交作业。
Copy $ kubectl get pod -n spark-operator
NAME READY STATUS RESTARTS AGE
spark-operator-controller-5db84774f-2d72c 1/1 Running 0 28s
spark-operator-webhook-7c9fb4788-d98fw 1/1 Running 0 28s
完成 Spark 作业后,使用以下命令卸载 Spark operator 及其相关组件:
Copy $ helm uninstall spark-operator -n spark-operator
设置 Alluxio ConfigMap
这里的 configMap 是用于作为 Alluxio 客户端的Spark 作业,使其能够识别 Alluxio 配置。
该 configMap 可以从由 Alluxio operator 构建的现有 Alluxio 集群的 alluxio-site.properties
创建。要显示来自 Alluxio 集群 configMap 的 alluxio-site.properties
,运行以下命令:
Copy $ kubectl get configmap < ALLUXIO_NAMESPACE > - < ALLUXIO_CLUSTER_NAME > -conf -o yaml
如果按照在 Kubernetes 上安装 Alluxio 的说明进行操作, <ALLUXIO_NAMESPACE>-<ALLUXIO_CLUSTER_NAME>-conf
的值为default-alluxio-conf
。
使用以下命令将生成一个 alluxio-config.yaml
文件:
Copy $ cat << EOF > alluxio-config.yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: alluxio-config
namespace: spark
data:
alluxio-site.properties: |-
$( kubectl -n ${alluxioNamespace} get configmap ${alluxioNamespace}-${alluxioClusterName}-conf -o json \
| jq -r '.data["alluxio-site.properties"]' \
| sed -e's/^/ /' )
EOF
注意:
${alluxioNamespace}
和 ${alluxioClusterName}
应与现有 Alluxio 集群的值匹配。如果按照在 Kubernetes 上安装 Alluxio 的说明进行操作,该值应分别为 default
和 alluxio
。
运行以下命令来创建 configMap:
Copy kubectl -n spark apply -f alluxio-config.yaml
为 Alluxio 创建服务账户
如果使用 IAM 进行身份验证/授权,则需使用 Alluxio 服务账户。
创建一个 spark-s3-access-sa.yaml
文件,内容如下:
Copy apiVersion : v1
kind : ServiceAccount
metadata :
name : alluxio-s3-access
namespace : spark
annotations :
eks.amazonaws.com/role-arn : arn:aws:iam::<YOUR_AWS_ACCOUNT_ID>:role/S3EKSServiceAccountRole
其中 <YOUR_AWS_ACCOUNT_ID>
应替换为您的 AWS 账户 ID。
使用以下命令创建服务账户:
Copy $ kubectl -n spark apply -f spark-s3-access-sa.yaml
提供 Alluxio 属性作为 Spark 配置值
要使 Spark 集群与 Alluxio 集群正常通信,须在 Alluxio 客户端和 Alluxio 服务器之间对齐某些属性。
尤其是 hadoopConf
的值应设置为与 Alluxio 部署的值匹配。请注意之前创建的 alluxio-config.yaml
文件中的 alluxio-site.properties
中的以下属性:
alluxio.k8s.env.deployment
alluxio.mount.table.source
alluxio.worker.membership.manager.type
在 Spark 应用程序 YAML 文件中添加以下内容以提交作业;有关alluxio-sparkApplication.yaml
的完整示例请参见下一节。
Copy sparkConf :
spark.hadoop.fs.s3a.aws.credentials.provider : com.amazonaws.auth.DefaultAWSCredentialsProviderChain
spark.hadoop.fs.s3a.access.key : <YOUR_S3_ACCESS_KEY>
spark.hadoop.fs.s3a.secret.key : <YOUR_S3_SECRET_KEY>
spark.hadoop.fs.s3a.impl : alluxio.hadoop.FileSystem
spark.hadoop.fs.s3.impl : alluxio.hadoop.FileSystem
spark.driver.extraClassPath : /opt/alluxio/alluxio-DA-3.2-8.0.0-client.jar:/opt/alluxio/conf
spark.executor.extraClassPath : /opt/alluxio/alluxio-DA-3.2-8.0.0-client.jar:/opt/alluxio/conf
hadoopConf :
alluxio.etcd.endpoints : http://alluxio-etcd.default:2379
alluxio.cluster.name : default-alluxio
alluxio.k8s.env.deployment : "true"
alluxio.mount.table.source : ETCD
alluxio.worker.membership.manager.type : ETCD
上述示例假设 Alluxio 是按照在 Kubernetes 上安装 Alluxio 的说明进行部署的。
示例
使用 Spark 来读写文件
本节通过一个示例来介绍如何使用 Spark 读写文件。在这一简单示例中,我们将统计输入文件中的单词数。
需要执行以下步骤:
创建一个包含任意文本的输入文件。我们将统计该文本文件中每个单词出现的次数。
创建一个包含单词计数代码的 Scala 程序。该程序将读取上述输入文件并将结果输出到指定位置。
打包 Scala 程序,以便 Spark 可以启动和执行。
创建输入文件
创建一个输入文件 input.txt
。输入文件可以包含任何文本内容。该 Scala 示例将统计一个单词在该文件中出现的次数。以下为示例内容,可完整复制粘贴到输入文件中:
Copy Sure! Here’s an example of a simple text file you can use as an input file for word counting.
This file contains a few lines of text with various words:
# Example Input File (input.txt)
Hello world
Spark is awesome
Alluxio is awesome
Count the words in this file
Spark is great for big data
Alluxio is great for caching and speed up Spark
# How to Create This File
Create the File Locally.
Open a text editor (e.g., Notepad, TextEdit, VSCode).
Copy and paste the entire text above into the editor.
Save the file with the name input.txt.
# Upload to S3:
Log in to your AWS Management Console.
Navigate to the S3 service.
Select or create a bucket where you want to upload the file.
Upload input.txt to your chosen bucket.
# Verify the File:
Make sure the file is accessible via the S3 path you specified in your Spark job,
such as s3://your-bucket/input.txt.
If you need help with any specific steps or further customization, let me know!
创建 Scala 文件
我们需要编写一个 Scala 文件并生成 JAR,然后将 JAR 上传到 S3 供 spark 作业调用。
创建一个 Scala 文件 spark-scala-demo.scala
,包含内容示例如下:
Copy import org.apache.spark.sql.{SparkSession, DataFrame}
import org.apache.spark.sql.functions._
object S3WordCount {
def main (args: Array[String]) : Unit = {
// create SparkSession
val spark = SparkSession.builder
.appName( "AlluxioSparkExample" )
.master( "local[*]" )
.getOrCreate()
// create sparkContext
val sc = spark.sparkContext
// Define S3 paths
val inputPath = "s3://<MY_BUCKET>/test/input.txt" // Replace with your input path
val outputPath = "s3://<MY_BUCKET>/test/output" // Replace with your output path
// Read file from S3 into a DataFrame
val textDF = spark.read.text(inputPath)
// Perform word count transformation
val wordCounts = textDF
.select(explode(split(col( "value" ), "\\s+" )).as( "word" )) // Split and explode into individual words
.groupBy( "word" ) // Group by word
.agg(count( "*" ).as( "count" )) // Count occurrences of each word
// Save the result to S3 using overwrite mode
wordCounts.write.mode( "overwrite" ).format( "csv" ).save(outputPath)
// Stop SparkSession
spark.stop()
}
}
将 inputPath
更新为存放输入文件的 S3 路径,将 outputPath
更新为希望存放输出文件的 S3 路径。这两个路径需能通过提供的凭证访问。
打包 Scala 应用程序
首先创建一个包含 build.sbt
文件的文件夹,内容如下:
Copy scalaVersion := "2.12.8"
libraryDependencies += "org.apache.spark" %% "spark-core" % "3.3.1"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "3.3.1
在包含 Scala 文件的文件夹下使用 sbt
工具构建 JAR。如果尚未安装 sbt
,请运行 $ brew install sbt
在 ./target/scala-2.12/
目录下找到文件,注意其名称(即 <SPARK_JOB_JAR_FILE>.jar
)。将该文件上传到 S3, 如有需要,可以对其重命名。 在此示例中,我们将其命名为 alluxioread\_2.12-0.1.jar
:
Copy $ aws s3 cp ./target/scala-2.12/ < SPARK_JOB_JAR_FILE > .jar s3:// < BUCKET_NAME > / < S3_PATH > /alluxioread_2.12-0.1.jar
将 <BUCKET_NAME>/<S3_PATH>
替换为可访问的 S3 位置。
创建 Spark 应用
创建一个 alluxio-sparkApplication.yaml
文件,包含的内容如下所示:
Copy apiVersion : sparkoperator.k8s.io/v1beta2
kind : SparkApplication
metadata :
name : alluxio-word-count
namespace : spark
spec :
type : Scala
mode : cluster
image : <PRIVATE_REGISTRY>/alluxio/spark:<SPARK_VERSION>
imagePullPolicy : Always
mainClass : S3WordCount
mainApplicationFile : s3://<BUCKET_NAME>/<S3_PATH>/alluxioread_2.12-0.1.jar
sparkVersion : 3.5.2
driver :
javaOptions: "-Divy.cache.dir=/tmp -Divy.home=/tmp -Daws.accessKeyId=<YOUR_S3_ACCESS_KEY> -Daws.secretKey=<YOUR_S3_SECRET_KEY>"
labels :
version : 3.5.2
cores : 1
coreLimit : 1200m
memory : 512m
serviceAccount : alluxio-s3-access
configMaps :
- name : alluxio-config
path : /opt/alluxio/conf/
executor :
labels :
version : 3.5.2
cores : 1
coreLimit : 1200m
memory : 512m
configMaps :
- name : alluxio-config
path : /opt/alluxio/conf/
deps :
repositories :
- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-aws
packages :
- org.apache.hadoop:hadoop-aws:3.2.2
sparkConf :
spark.hadoop.fs.s3a.aws.credentials.provider : com.amazonaws.auth.DefaultAWSCredentialsProviderChain
spark.hadoop.fs.s3a.access.key : <YOUR_S3_ACCESS_KEY>
spark.hadoop.fs.s3a.secret.key : <YOUR_S3_SECRET_KEY>
spark.hadoop.fs.s3a.impl : alluxio.hadoop.FileSystem
spark.hadoop.fs.s3.impl : alluxio.hadoop.FileSystem
spark.driver.extraClassPath : /opt/alluxio/alluxio-DA-3.2-8.0.0-client.jar:/opt/alluxio/conf
spark.executor.extraClassPath : /opt/alluxio/alluxio-DA-3.2-8.0.0-client.jar:/opt/alluxio/conf
hadoopConf :
alluxio.etcd.endpoints : http://alluxio-etcd.default:2379
alluxio.mount.table.source : ETCD
alluxio.worker.membership.manager.type : ETCD
alluxio.k8s.env.deployment : "true"
alluxio.cluster.name : default-alluxio
注意以下自定义设置:
在 spec.image
下,指定自定义 Spark 镜像的位置。
在 spec.mainApplicationFile
中设置上传 jar 的 S3 路径,以替代 s3a://<BUCKET_NAME>/<S3_PATH>/alluxioread_2.12-0.1.jar
在以下位置设置 S3 访问凭证:
sparkConf
中的 spark.hadoop.fs.s3a.*
属性
提交 Spark 作业并查看结果
运行以下命令来部署 spark 应用:
Copy $ kubectl create -n spark -f alluxio-sparkApplication.yaml
完成后,您将在输出路径中看到结果,在我们的示例中为:
Copy $ aws s3 ls --recursive s3://<MY_BUCKET>/test/output
将输出复制到本地并查看.csv
文件。应该会在 .csv
文件中看到单词和计数对。
Copy $ aws s3 cp --recursive s3://<MY_BUCKET>/test/output ./output
注意:
如果要重新运行作业,可能需要删除输出目录。这个操作需要在 Alluxio 层使用 alluxio fs rm
命令完成。示例如下:
Copy # 将output目录删除,并从metastore缓存中移除
$ kubectl exec -it alluxio-coordinator-0 -c alluxio-coordinator -- alluxio fs rm -R s3://<MY_BUCKET>/test/output
如果重新构建了 Scala 应用程序或更改了输入文件,需要让缓存失效,以便新版本能够被 Alluxio 拉取并获得更新。示例如下:
Copy # 使缓存失效,以便任何新的输入文件或者 jar 文件都直接从 UFS 拉取
$ kubectl exec -it alluxio-coordinator-0 -c alluxio-coordinator -- alluxio job load --path <S3_PATH_OF_UPDATED_FILE> --submit