在 K8s 上运行 Spark

本指南介绍如何配置 Apache Spark 来访问 Alluxio。

应用程序如果使用 Spark 1.1 或更高版本,可通过 Alluxio 的 HDFS 兼容接口来访问 Alluxio。当使用 Alluxio 作为数据访问层后, Spark 应用可以透明地访问各种持久化存储服务中的数据。特别是当 Spark 部署远离数据时, 应用可主动抓取数据或将其透明地缓存到 Alluxio 中, 从而加速 I/O 性能。此外,Alluxio 可以通过解耦计算与物理存储来简化架构。当持久化存储中的数据路径对 Spark 隐藏时,对底层存储的更改可以独立于应用程序逻辑; 同时,Alluxio 作为近计算的缓存层,仍然可以为计算框架提供数据本地性支持。

本指南介绍了如何在 Kubernetes 环境下将 Apache Spark 与 Alluxio 集成。

先决条件

本指南假定 Alluxio集群已部署在Kubernetes上

此外,还需使用docker来构建自定义的 Spark 镜像。

准备镜像

要与 Spark 集成,须在 Spark 镜像中添加 Alluxio 的 jar 文件和配置文件。 Spark 镜像修改后需启动 Spark 容器才能连接到 Alluxio 集群。

Alluxio安装说明中列出的下载文件中, 找到名为alluxio-enterprise-DA-3.2-8.0.0-release.tar.gz的压缩包。从压缩包解压以下 Alluxio jar文件:

  • client/alluxio-DA-3.2-8.0.0-client.jar

  • client/ufs/alluxio-underfs-s3a-shaded-DA-3.2-8.0.0.jar (如果使用 S3 存储桶作为 UFS)

准备一个空目录作为构建镜像的工作目录。在此目录中,创建目录files/alluxio/,并将前面提到的 jar 文件复制到其中。

创建一个Dockerfile 来修改基础 Spark 镜像。定义参数示例如下:

  • SPARK_VERSION=3.5.2 作为 Spark 的版本

  • UFS_JAR=files/alluxio/alluxio-underfs-s3a-shaded-DA-3.2-8.0.0.jar作为复制到 files/alluxio/ 的 UFS jar 的路径

  • CLIENT_JAR=files/alluxio/alluxio-DA-3.2-8.0.0-client.jar 作为复制到 files/alluxio/ 的 Alluxio 客户端 jar 的路径

# 使用官方的 Spark 3.5.x(或更高版本)镜像作为基础镜像 
ARG  SPARK_VERSION=3.5.2
ARG  IMAGE=apache/spark:${SPARK_VERSION}
FROM $IMAGE

ARG  SPARK_UID=185
ARG  UFS_JAR=files/alluxio/alluxio-underfs-s3a-shaded-{{site.ALLUXIO_VERSION_STRING}}.jar
ARG  CLIENT_JAR=files/alluxio/alluxio-{{site.ALLUXIO_VERSION_STRING}}-client.jar

USER root

# 创建 /opt/alluxio 目录
RUN  mkdir -p /opt/alluxio && \
     mkdir -p /opt/alluxio/lib

# 将 Alluxio 客户端 JAR 文件复制到 /opt/alluxio 目录
COPY ${UFS_JAR} /opt/alluxio/lib/
COPY ${CLIENT_JAR} /opt/alluxio/
ENV  SPARK_HOME=/opt/spark

WORKDIR /opt/spark/work-dir
RUN  chmod g+w /opt/spark/work-dir
RUN  chmod a+x /opt/decom.sh

ENTRYPOINT [ "/opt/entrypoint.sh" ]
USER  ${SPARK_UID}

运行以下命令来构建镜像,并将 <PRIVATE_REGISTRY> 替换为您的私有容器仓库的URL,将 <SPARK_VERSION> 替换为相应的 Spark 版本。 在以下示例中,我们将继续使用 3.5.2 作为 Spark 版本,以 <SPARK_VERSION> 表示。

$ docker build --platform linux/amd64 -t <PRIVATE_REGISTRY>/alluxio/spark:<SPARK_VERSION> .

通过运行以下命令来上传镜像:

$ docker push <PRIVATE_REGISTRY>/alluxio/spark:<SPARK_VERSION>

部署 Spark

在提交 Spark 作业之前,我们需要完成以下几项操作:

  • 安装 Spark operator

  • 设置 Alluxio configMap

  • 为 Alluxio 创建服务账户 (如果使用 IAM)

  • 在Spark 作业中添加其他的参数

安装 Spark Operator

如果您使用 aws-samples/emr-on-eks-benchmark 来创建 EKS 集群, spark-operator 将会被安装在脚本中,因此无需再次安装。

以下说明来自 spark-operator 入门指南

在 Helm 中添加 spark-operator 仓库。

$ helm repo add spark-operator https://kubeflow.github.io/spark-operator
$ helm repo update

要添加自定义配置,可以创建一个 spark-operator.yaml 文件。例如,以下示例将命名空间设置为 spark(非必需,仅以此为例):

spark:
  jobNamespaces:
  - spark

运行以下命令,来使用这些配置安装 spark-operator:

$ helm install spark-operator spark-operator/spark-operator -f spark-operator.yaml \
  --namespace spark-operator \
  --create-namespace \
  --set webhook.enable=true \
  --set rbac.create=true

挂载 Alluxio 的 configMaps 需要进行webhook.enable 设置。

检查 spark-operator 的状态。如果状态为Running,则可以提交作业。

$ kubectl get pod -n spark-operator
NAME                                        READY   STATUS    RESTARTS   AGE
spark-operator-controller-5db84774f-2d72c   1/1     Running   0          28s
spark-operator-webhook-7c9fb4788-d98fw      1/1     Running   0          28s

完成 Spark 作业后,使用以下命令卸载 Spark operator 及其相关组件:

$ helm uninstall spark-operator -n spark-operator

设置 Alluxio ConfigMap

这里的 configMap 是用于作为 Alluxio 客户端的Spark 作业,使其能够识别 Alluxio 配置。

该 configMap 可以从由 Alluxio operator 构建的现有 Alluxio 集群的 alluxio-site.properties 创建。要显示来自 Alluxio 集群 configMap 的 alluxio-site.properties,运行以下命令:

$ kubectl get configmap <ALLUXIO_NAMESPACE>-<ALLUXIO_CLUSTER_NAME>-conf -o yaml

如果按照在 Kubernetes 上安装 Alluxio 的说明进行操作, <ALLUXIO_NAMESPACE>-<ALLUXIO_CLUSTER_NAME>-conf 的值为default-alluxio-conf

使用以下命令将生成一个 alluxio-config.yaml 文件:

$ cat <<EOF > alluxio-config.yaml
apiVersion: v1
kind: ConfigMap
metadata:
  name: alluxio-config
  namespace: spark
data:
  alluxio-site.properties: |-
$( kubectl -n ${alluxioNamespace} get configmap ${alluxioNamespace}-${alluxioClusterName}-conf -o json \
  | jq -r '.data["alluxio-site.properties"]' \
  | sed -e's/^/    /' )
EOF

注意:

  • ${alluxioNamespace}${alluxioClusterName} 应与现有 Alluxio 集群的值匹配。如果按照在 Kubernetes 上安装 Alluxio 的说明进行操作,该值应分别为 defaultalluxio

  • jq 命令用于解析 JSON。

运行以下命令来创建 configMap:

kubectl -n spark apply -f alluxio-config.yaml

为 Alluxio 创建服务账户

如果使用 IAM 进行身份验证/授权,则需使用 Alluxio 服务账户。

创建一个 spark-s3-access-sa.yaml 文件,内容如下:

apiVersion: v1
kind: ServiceAccount
metadata:
  name: alluxio-s3-access
  namespace: spark
  annotations:
    eks.amazonaws.com/role-arn: arn:aws:iam::<YOUR_AWS_ACCOUNT_ID>:role/S3EKSServiceAccountRole

其中 <YOUR_AWS_ACCOUNT_ID> 应替换为您的 AWS 账户 ID。

使用以下命令创建服务账户:

$ kubectl -n spark apply -f spark-s3-access-sa.yaml

提供 Alluxio 属性作为 Spark 配置值

要使 Spark 集群与 Alluxio 集群正常通信,须在 Alluxio 客户端和 Alluxio 服务器之间对齐某些属性。

尤其是 hadoopConf 的值应设置为与 Alluxio 部署的值匹配。请注意之前创建的 alluxio-config.yaml 文件中的 alluxio-site.properties 中的以下属性:

  • alluxio.etcd.endpoints

  • alluxio.cluster.name

  • alluxio.k8s.env.deployment

  • alluxio.mount.table.source

  • alluxio.worker.membership.manager.type

在 Spark 应用程序 YAML 文件中添加以下内容以提交作业;有关alluxio-sparkApplication.yaml的完整示例请参见下一节。

sparkConf:
  spark.hadoop.fs.s3a.aws.credentials.provider: com.amazonaws.auth.DefaultAWSCredentialsProviderChain
  spark.hadoop.fs.s3a.access.key: <YOUR_S3_ACCESS_KEY>
  spark.hadoop.fs.s3a.secret.key: <YOUR_S3_SECRET_KEY>
  spark.hadoop.fs.s3a.impl: alluxio.hadoop.FileSystem
  spark.hadoop.fs.s3.impl: alluxio.hadoop.FileSystem
  spark.driver.extraClassPath: /opt/alluxio/alluxio-DA-3.2-8.0.0-client.jar:/opt/alluxio/conf
  spark.executor.extraClassPath: /opt/alluxio/alluxio-DA-3.2-8.0.0-client.jar:/opt/alluxio/conf

hadoopConf:
  alluxio.etcd.endpoints: http://alluxio-etcd.default:2379
  alluxio.cluster.name: default-alluxio
  alluxio.k8s.env.deployment: "true"
  alluxio.mount.table.source: ETCD
  alluxio.worker.membership.manager.type: ETCD

上述示例假设 Alluxio 是按照在 Kubernetes 上安装 Alluxio 的说明进行部署的。

示例

使用 Spark 来读写文件

本节通过一个示例来介绍如何使用 Spark 读写文件。在这一简单示例中,我们将统计输入文件中的单词数。

需要执行以下步骤:

  • 创建一个包含任意文本的输入文件。我们将统计该文本文件中每个单词出现的次数。

  • 创建一个包含单词计数代码的 Scala 程序。该程序将读取上述输入文件并将结果输出到指定位置。

  • 打包 Scala 程序,以便 Spark 可以启动和执行。

  • 提交 Spark 作业并验证结果。

创建输入文件

创建一个输入文件 input.txt。输入文件可以包含任何文本内容。该 Scala 示例将统计一个单词在该文件中出现的次数。以下为示例内容,可完整复制粘贴到输入文件中:

Sure! Here’s an example of a simple text file you can use as an input file for word counting.
This file contains a few lines of text with various words:

# Example Input File (input.txt) 

Hello world
Spark is awesome
Alluxio is awesome
Count the words in this file
Spark is great for big data
Alluxio is great for caching and speed up Spark

# How to Create This File
Create the File Locally.
Open a text editor (e.g., Notepad, TextEdit, VSCode).
Copy and paste the entire text above into the editor.
Save the file with the name input.txt.

# Upload to S3:
Log in to your AWS Management Console.
Navigate to the S3 service.
Select or create a bucket where you want to upload the file.
Upload input.txt to your chosen bucket.

# Verify the File:
Make sure the file is accessible via the S3 path you specified in your Spark job,
such as s3://your-bucket/input.txt.

If you need help with any specific steps or further customization, let me know!

创建 Scala 文件

我们需要编写一个 Scala 文件并生成 JAR,然后将 JAR 上传到 S3 供 spark 作业调用。

创建一个 Scala 文件 spark-scala-demo.scala,包含内容示例如下:

import org.apache.spark.sql.{SparkSession, DataFrame}
import org.apache.spark.sql.functions._

object S3WordCount {
  def main(args: Array[String]): Unit = {
    // create SparkSession
    val spark = SparkSession.builder
      .appName("AlluxioSparkExample")
      .master("local[*]")
      .getOrCreate()

    // create sparkContext
    val sc = spark.sparkContext

    // Define S3 paths
    val inputPath = "s3://<MY_BUCKET>/test/input.txt" // Replace with your input path
    val outputPath = "s3://<MY_BUCKET>/test/output" // Replace with your output path

    // Read file from S3 into a DataFrame
    val textDF = spark.read.text(inputPath)

    // Perform word count transformation
    val wordCounts = textDF
      .select(explode(split(col("value"), "\\s+")).as("word")) // Split and explode into individual words
      .groupBy("word")                                         // Group by word
      .agg(count("*").as("count"))                             // Count occurrences of each word

    // Save the result to S3 using overwrite mode
    wordCounts.write.mode("overwrite").format("csv").save(outputPath)

    // Stop SparkSession
    spark.stop()
  }
}

inputPath 更新为存放输入文件的 S3 路径,将 outputPath 更新为希望存放输出文件的 S3 路径。这两个路径需能通过提供的凭证访问。

打包 Scala 应用程序

首先创建一个包含 build.sbt 文件的文件夹,内容如下:

scalaVersion := "2.12.8"
libraryDependencies += "org.apache.spark" %% "spark-core" % "3.3.1"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "3.3.1

在包含 Scala 文件的文件夹下使用 sbt 工具构建 JAR。如果尚未安装 sbt,请运行 $ brew install sbt

$ sbt package

./target/scala-2.12/ 目录下找到文件,注意其名称(即 <SPARK_JOB_JAR_FILE>.jar)。将该文件上传到 S3, 如有需要,可以对其重命名。 在此示例中,我们将其命名为 alluxioread\_2.12-0.1.jar

$ aws s3 cp ./target/scala-2.12/<SPARK_JOB_JAR_FILE>.jar s3://<BUCKET_NAME>/<S3_PATH>/alluxioread_2.12-0.1.jar 

<BUCKET_NAME>/<S3_PATH> 替换为可访问的 S3 位置。

创建 Spark 应用

创建一个 alluxio-sparkApplication.yaml 文件,包含的内容如下所示:

apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
  name: alluxio-word-count
  namespace: spark
spec:
  type: Scala
  mode: cluster
  image: <PRIVATE_REGISTRY>/alluxio/spark:<SPARK_VERSION>
  imagePullPolicy: Always
  mainClass: S3WordCount
  mainApplicationFile: s3://<BUCKET_NAME>/<S3_PATH>/alluxioread_2.12-0.1.jar
  sparkVersion: 3.5.2
  driver:
    javaOptions: "-Divy.cache.dir=/tmp -Divy.home=/tmp -Daws.accessKeyId=<YOUR_S3_ACCESS_KEY> -Daws.secretKey=<YOUR_S3_SECRET_KEY>"
    labels:
      version: 3.5.2
    cores: 1
    coreLimit: 1200m
    memory: 512m
    serviceAccount: alluxio-s3-access
    configMaps:
      - name: alluxio-config
        path: /opt/alluxio/conf/
  executor:
    labels:
      version: 3.5.2
    cores: 1
    coreLimit: 1200m
    memory: 512m
    configMaps:
      - name: alluxio-config
        path: /opt/alluxio/conf/
  deps:
    repositories:
      - https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-aws
    packages:
      - org.apache.hadoop:hadoop-aws:3.2.2
  sparkConf:
    spark.hadoop.fs.s3a.aws.credentials.provider: com.amazonaws.auth.DefaultAWSCredentialsProviderChain
    spark.hadoop.fs.s3a.access.key: <YOUR_S3_ACCESS_KEY>
    spark.hadoop.fs.s3a.secret.key: <YOUR_S3_SECRET_KEY>
    spark.hadoop.fs.s3a.impl: alluxio.hadoop.FileSystem
    spark.hadoop.fs.s3.impl: alluxio.hadoop.FileSystem
    spark.driver.extraClassPath: /opt/alluxio/alluxio-DA-3.2-8.0.0-client.jar:/opt/alluxio/conf
    spark.executor.extraClassPath: /opt/alluxio/alluxio-DA-3.2-8.0.0-client.jar:/opt/alluxio/conf
  hadoopConf:
    alluxio.etcd.endpoints: http://alluxio-etcd.default:2379
    alluxio.mount.table.source: ETCD
    alluxio.worker.membership.manager.type: ETCD
    alluxio.k8s.env.deployment: "true"
    alluxio.cluster.name: default-alluxio

注意以下自定义设置:

  • spec.image 下,指定自定义 Spark 镜像的位置。

  • spec.mainApplicationFile 中设置上传 jar 的 S3 路径,以替代 s3a://<BUCKET_NAME>/<S3_PATH>/alluxioread_2.12-0.1.jar

  • 在以下位置设置 S3 访问凭证:

    • 驱动程序和执行器的javaOptions

    • sparkConf 中的 spark.hadoop.fs.s3a.* 属性

  • 用于 sparkConfhadoopConf的 Alluxio 特定配置,具体参看上一节提供 Alluxio 属性作为 Spark 配置值

提交 Spark 作业并查看结果

运行以下命令来部署 spark 应用:

$ kubectl create -n spark -f alluxio-sparkApplication.yaml

完成后,您将在输出路径中看到结果,在我们的示例中为:

$ aws s3 ls --recursive s3://<MY_BUCKET>/test/output

将输出复制到本地并查看.csv 文件。应该会在 .csv 文件中看到单词和计数对。

$ aws s3 cp --recursive s3://<MY_BUCKET>/test/output ./output

注意:

  • 如果要重新运行作业,可能需要删除输出目录。这个操作需要在 Alluxio 层使用 alluxio fs rm 命令完成。示例如下:

# 将output目录删除,并从metastore缓存中移除
$ kubectl exec -it alluxio-coordinator-0 -c alluxio-coordinator -- alluxio fs rm -R s3://<MY_BUCKET>/test/output
  • 如果重新构建了 Scala 应用程序或更改了输入文件,需要让缓存失效,以便新版本能够被 Alluxio 拉取并获得更新。示例如下:

# 使缓存失效,以便任何新的输入文件或者 jar 文件都直接从 UFS 拉取
$ kubectl exec -it alluxio-coordinator-0 -c alluxio-coordinator -- alluxio job load --path <S3_PATH_OF_UPDATED_FILE> --submit

Last updated