# 在 K8s 上运行 Spark

本指南介绍如何配置 Apache Spark 来访问 Alluxio。

应用程序如果使用 Spark 1.1 或更高版本，可通过 Alluxio 的 HDFS 兼容接口来访问 Alluxio。当使用 Alluxio 作为数据访问层后，\
Spark 应用可以透明地访问各种持久化存储服务中的数据。特别是当 Spark 部署远离数据时， 应用可主动抓取数据或将其透明地缓存到 Alluxio 中，\
从而加速 I/O 性能。此外，Alluxio 可以通过解耦计算与物理存储来简化架构。当持久化存储中的数据路径对 Spark 隐藏时，对底层存储的更改可以独立于应用程序逻辑；\
同时，Alluxio 作为近计算的缓存层，仍然可以为计算框架提供数据本地性支持。

本指南介绍了如何在 Kubernetes 环境下将 Apache Spark 与 Alluxio 集成。

## 先决条件

本指南假定 [Alluxio集群已部署在Kubernetes上](https://documentation.alluxio.io/ee-da-cn/start/install-alluxio-on-kubernetes)。

此外，还需使用`docker`来构建自定义的 Spark 镜像。

## 准备镜像

要与 Spark 集成，须在 Spark 镜像中添加 Alluxio 的 jar 文件和配置文件。 Spark 镜像修改后需启动 Spark 容器才能连接到 Alluxio 集群。

在[Alluxio安装说明](https://documentation.alluxio.io/ee-da-cn/start/install-alluxio-on-kubernetes#准备)中列出的下载文件中，\
找到名为`alluxio-enterprise-DA-3.2-8.0.0-release.tar.gz`的压缩包。从压缩包解压以下 Alluxio jar文件：

* `client/alluxio-DA-3.2-8.0.0-client.jar`
* `client/ufs/alluxio-underfs-s3a-shaded-DA-3.2-8.0.0.jar` （如果使用 S3 存储桶作为 UFS）

准备一个空目录作为构建镜像的工作目录。在此目录中，创建目录`files/alluxio/`，并将前面提到的 jar 文件复制到其中。

创建一个`Dockerfile` 来修改基础 Spark 镜像。定义参数示例如下：

* `SPARK_VERSION=3.5.2` 作为 Spark 的版本
* `UFS_JAR=files/alluxio/alluxio-underfs-s3a-shaded-DA-3.2-8.0.0.jar`作为复制到 `files/alluxio/` 的 UFS jar 的路径
* `CLIENT_JAR=files/alluxio/alluxio-DA-3.2-8.0.0-client.jar` 作为复制到 `files/alluxio/` 的 Alluxio 客户端 jar 的路径

```shell
# 使用官方的 Spark 3.5.x（或更高版本）镜像作为基础镜像 
ARG  SPARK_VERSION=3.5.2
ARG  IMAGE=apache/spark:${SPARK_VERSION}
FROM $IMAGE

ARG  SPARK_UID=185
ARG  UFS_JAR=files/alluxio/alluxio-underfs-s3a-shaded-DA-3.5-10.2.0.jar
ARG  CLIENT_JAR=files/alluxio/alluxio-DA-3.5-10.2.0-client.jar

USER root

# 创建 /opt/alluxio 目录
RUN  mkdir -p /opt/alluxio && \
     mkdir -p /opt/alluxio/lib

# 将 Alluxio 客户端 JAR 文件复制到 /opt/alluxio 目录
COPY ${UFS_JAR} /opt/alluxio/lib/
COPY ${CLIENT_JAR} /opt/alluxio/
ENV  SPARK_HOME=/opt/spark

WORKDIR /opt/spark/work-dir
RUN  chmod g+w /opt/spark/work-dir
RUN  chmod a+x /opt/decom.sh

ENTRYPOINT [ "/opt/entrypoint.sh" ]
USER  ${SPARK_UID}
```

运行以下命令来构建镜像，并将 `<PRIVATE_REGISTRY>` 替换为您的私有容器仓库的URL，将 `<SPARK_VERSION>` 替换为相应的 Spark 版本。\
在以下示例中，我们将继续使用 3.5.2 作为 Spark 版本，以 `<SPARK_VERSION>` 表示。

```console
$ docker build --platform linux/amd64 -t <PRIVATE_REGISTRY>/alluxio/spark:<SPARK_VERSION> .
```

通过运行以下命令来上传镜像：

```console
$ docker push <PRIVATE_REGISTRY>/alluxio/spark:<SPARK_VERSION>
```

## 部署 Spark

在提交 Spark 作业之前，我们需要完成以下几项操作：

* 安装 Spark operator
* 设置 Alluxio configMap
* 为 Alluxio 创建服务账户 (如果使用 IAM)
* 在Spark 作业中添加其他的参数

### 安装 Spark Operator

如果您使用 [aws-samples/emr-on-eks-benchmark](https://github.com/aws-samples/emr-on-eks-benchmark) 来创建 EKS 集群，`spark-operator` 将会被安装在脚本中，因此无需再次安装。

以下说明来自[ `spark-operator` 入门指南](https://www.kubeflow.org/docs/components/spark-operator/getting-started/)。

在 Helm 中添加 spark-operator 仓库。

```console
$ helm repo add spark-operator https://kubeflow.github.io/spark-operator
$ helm repo update
```

要添加自定义配置，可以创建一个 `spark-operator.yaml` 文件。例如，以下示例将命名空间设置为 `spark`（非必需，仅以此为例）：

```yaml
spark:
  jobNamespaces:
  - spark
```

运行以下命令，来使用这些配置安装 spark-operator：

```shell
$ helm install spark-operator spark-operator/spark-operator -f spark-operator.yaml \
  --namespace spark-operator \
  --create-namespace \
  --set webhook.enable=true \
  --set rbac.create=true
```

挂载 Alluxio 的 configMaps 需要进行`webhook.enable` 设置。

检查 spark-operator 的状态。如果状态为`Running`，则可以提交作业。

```console
$ kubectl get pod -n spark-operator
NAME                                        READY   STATUS    RESTARTS   AGE
spark-operator-controller-5db84774f-2d72c   1/1     Running   0          28s
spark-operator-webhook-7c9fb4788-d98fw      1/1     Running   0          28s
```

完成 Spark 作业后，使用以下命令卸载 Spark operator 及其相关组件：

```console
$ helm uninstall spark-operator -n spark-operator
```

### 设置 Alluxio ConfigMap

这里的 configMap 是用于作为 Alluxio 客户端的Spark 作业，使其能够识别 Alluxio 配置。

该 configMap 可以从由 Alluxio operator 构建的现有 Alluxio 集群的 `alluxio-site.properties` 创建。要显示来自 Alluxio 集群 configMap\
的 `alluxio-site.properties`，运行以下命令：

```console
$ kubectl get configmap <ALLUXIO_NAMESPACE>-<ALLUXIO_CLUSTER_NAME>-conf -o yaml
```

如果按照[在 Kubernetes 上安装 Alluxio](https://documentation.alluxio.io/ee-da-cn/start/install-alluxio-on-kubernetes) 的说明进行操作，`<ALLUXIO_NAMESPACE>-<ALLUXIO_CLUSTER_NAME>-conf` 的值为`default-alluxio-conf`。

使用以下命令将生成一个 `alluxio-config.yaml` 文件：

```console
$ cat <<EOF > alluxio-config.yaml
apiVersion: v1
kind: ConfigMap
metadata:
  name: alluxio-config
  namespace: spark
data:
  alluxio-site.properties: |-
$( kubectl -n ${alluxioNamespace} get configmap ${alluxioNamespace}-${alluxioClusterName}-conf -o json \
  | jq -r '.data["alluxio-site.properties"]' \
  | sed -e's/^/    /' )
EOF
```

注意：

* `${alluxioNamespace}` 和 `${alluxioClusterName}` 应与现有 Alluxio 集群的值匹配。如果按照[在 Kubernetes 上安装 Alluxio](https://documentation.alluxio.io/ee-da-cn/start/install-alluxio-on-kubernetes)\
  的说明进行操作，该值应分别为 `default` 和 `alluxio`。
* [`jq` 命令](https://jqlang.github.io/jq/)用于解析 JSON。

运行以下命令来创建 configMap：

```console
kubectl -n spark apply -f alluxio-config.yaml
```

### 为 Alluxio 创建服务账户

如果使用 IAM 进行身份验证/授权，则需使用 Alluxio 服务账户。

创建一个 `spark-s3-access-sa.yaml` 文件，内容如下：

```yaml
apiVersion: v1
kind: ServiceAccount
metadata:
  name: alluxio-s3-access
  namespace: spark
  annotations:
    eks.amazonaws.com/role-arn: arn:aws:iam::<YOUR_AWS_ACCOUNT_ID>:role/S3EKSServiceAccountRole
```

其中 `<YOUR_AWS_ACCOUNT_ID>` 应替换为您的 AWS 账户 ID。

使用以下命令创建服务账户：

```console
$ kubectl -n spark apply -f spark-s3-access-sa.yaml
```

### 提供 Alluxio 属性作为 Spark 配置值

要使 Spark 集群与 Alluxio 集群正常通信，须在 Alluxio 客户端和 Alluxio 服务器之间对齐某些属性。

尤其是 `hadoopConf` 的值应设置为与 Alluxio 部署的值匹配。请注意之前创建的 `alluxio-config.yaml` 文件中的 `alluxio-site.properties` 中的以下属性：

* `alluxio.etcd.endpoints`
* `alluxio.cluster.name`
* `alluxio.k8s.env.deployment`
* `alluxio.mount.table.source`
* `alluxio.worker.membership.manager.type`

在 Spark 应用程序 YAML 文件中添加以下内容以提交作业；有关`alluxio-sparkApplication.yaml`的完整示例请参见下一节。

```yaml
sparkConf:
  spark.hadoop.fs.s3a.aws.credentials.provider: com.amazonaws.auth.DefaultAWSCredentialsProviderChain
  spark.hadoop.fs.s3a.access.key: <YOUR_S3_ACCESS_KEY>
  spark.hadoop.fs.s3a.secret.key: <YOUR_S3_SECRET_KEY>
  spark.hadoop.fs.s3a.impl: alluxio.hadoop.FileSystem
  spark.hadoop.fs.s3.impl: alluxio.hadoop.FileSystem
  spark.driver.extraClassPath: /opt/alluxio/alluxio-DA-3.2-8.0.0-client.jar:/opt/alluxio/conf
  spark.executor.extraClassPath: /opt/alluxio/alluxio-DA-3.2-8.0.0-client.jar:/opt/alluxio/conf

hadoopConf:
  alluxio.etcd.endpoints: http://alluxio-etcd.default:2379
  alluxio.cluster.name: default-alluxio
  alluxio.k8s.env.deployment: "true"
  alluxio.mount.table.source: ETCD
  alluxio.worker.membership.manager.type: ETCD
```

上述示例假设 Alluxio 是按照[在 Kubernetes 上安装 Alluxio ](https://documentation.alluxio.io/ee-da-cn/start/install-alluxio-on-kubernetes)的说明进行部署的。

## 示例

### 使用 Spark 来读写文件

本节通过一个示例来介绍如何使用 Spark 读写文件。在这一简单示例中，我们将统计输入文件中的单词数。

需要执行以下步骤：

* 创建一个包含任意文本的输入文件。我们将统计该文本文件中每个单词出现的次数。
* 创建一个包含单词计数代码的 Scala 程序。该程序将读取上述输入文件并将结果输出到指定位置。
* 打包 Scala 程序，以便 Spark 可以启动和执行。
* 提交 Spark 作业并验证结果。

#### 创建输入文件

创建一个输入文件 `input.txt`。输入文件可以包含任何文本内容。该 Scala 示例将统计一个单词在该文件中出现的次数。以下为示例内容，可完整复制粘贴到输入文件中：

```
Hello world
Spark is awesome
Alluxio is awesome
Count the words in this file
Spark is great for big data
Alluxio is great for caching and speed up Spark
```

#### 创建 Scala 文件

我们需要编写一个 Scala 文件并生成 JAR，然后将 JAR 上传到 S3 供 spark 作业调用。

创建一个 Scala 文件 `spark-scala-demo.scala`，包含内容示例如下：

```scala
import org.apache.spark.sql.{SparkSession, DataFrame}
import org.apache.spark.sql.functions._

object S3WordCount {
  def main(args: Array[String]): Unit = {
    // create SparkSession
    val spark = SparkSession.builder
      .appName("AlluxioSparkExample")
      .master("local[*]")
      .getOrCreate()

    // create sparkContext
    val sc = spark.sparkContext

    // Define S3 paths
    val inputPath = "s3://<MY_BUCKET>/test/input.txt" // Replace with your input path
    val outputPath = "s3://<MY_BUCKET>/test/output" // Replace with your output path

    // Read file from S3 into a DataFrame
    val textDF = spark.read.text(inputPath)

    // Perform word count transformation
    val wordCounts = textDF
      .select(explode(split(col("value"), "\\s+")).as("word")) // Split and explode into individual words
      .groupBy("word")                                         // Group by word
      .agg(count("*").as("count"))                             // Count occurrences of each word

    // Save the result to S3 using overwrite mode
    wordCounts.write.mode("overwrite").format("csv").save(outputPath)

    // Stop SparkSession
    spark.stop()
  }
}
```

将 `inputPath` 更新为存放输入文件的 S3 路径，将 `outputPath` 更新为希望存放输出文件的 S3 路径。这两个路径需能通过提供的凭证访问。

#### 打包 Scala 应用程序

#### 首先创建一个包含 `build.sbt` 文件的文件夹，内容如下：

```
scalaVersion := "2.12.8"
libraryDependencies += "org.apache.spark" %% "spark-core" % "3.3.1"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "3.3.1
```

在包含 Scala 文件的文件夹下使用 `sbt` 工具构建 JAR。如果尚未安装 `sbt`，请运行 `$ brew install sbt`

```console
$ sbt package
```

在 `./target/scala-2.12/` 目录下找到文件，注意其名称（即 `<SPARK_JOB_JAR_FILE>.jar`）。将该文件上传到 S3, 如有需要，可以对其重命名。\
在此示例中，我们将其命名为 `alluxioread\_2.12-0.1.jar`：

```console
$ aws s3 cp ./target/scala-2.12/<SPARK_JOB_JAR_FILE>.jar s3://<BUCKET_NAME>/<S3_PATH>/alluxioread_2.12-0.1.jar 
```

将 `<BUCKET_NAME>/<S3_PATH>` 替换为可访问的 S3 位置。

#### 创建 Spark 应用

创建一个 `alluxio-sparkApplication.yaml` 文件，包含的内容如下所示：

```yaml
apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
  name: alluxio-word-count
  namespace: spark
spec:
  type: Scala
  mode: cluster
  image: <PRIVATE_REGISTRY>/alluxio/spark:<SPARK_VERSION>
  imagePullPolicy: Always
  mainClass: S3WordCount
  mainApplicationFile: s3://<BUCKET_NAME>/<S3_PATH>/alluxioread_2.12-0.1.jar
  sparkVersion: 3.5.2
  driver:
    javaOptions: "-Divy.cache.dir=/tmp -Divy.home=/tmp -Daws.accessKeyId=<YOUR_S3_ACCESS_KEY> -Daws.secretKey=<YOUR_S3_SECRET_KEY>"
    labels:
      version: 3.5.2
    cores: 1
    coreLimit: 1200m
    memory: 512m
    serviceAccount: alluxio-s3-access
    configMaps:
      - name: alluxio-config
        path: /opt/alluxio/conf/
  executor:
    labels:
      version: 3.5.2
    cores: 1
    coreLimit: 1200m
    memory: 512m
    configMaps:
      - name: alluxio-config
        path: /opt/alluxio/conf/
  deps:
    repositories:
      - https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-aws
    packages:
      - org.apache.hadoop:hadoop-aws:3.2.2
  sparkConf:
    spark.hadoop.fs.s3a.aws.credentials.provider: com.amazonaws.auth.DefaultAWSCredentialsProviderChain
    spark.hadoop.fs.s3a.access.key: <YOUR_S3_ACCESS_KEY>
    spark.hadoop.fs.s3a.secret.key: <YOUR_S3_SECRET_KEY>
    spark.hadoop.fs.s3a.impl: alluxio.hadoop.FileSystem
    spark.hadoop.fs.s3.impl: alluxio.hadoop.FileSystem
    spark.driver.extraClassPath: /opt/alluxio/alluxio-DA-3.2-8.0.0-client.jar:/opt/alluxio/conf
    spark.executor.extraClassPath: /opt/alluxio/alluxio-DA-3.2-8.0.0-client.jar:/opt/alluxio/conf
  hadoopConf:
    alluxio.etcd.endpoints: http://alluxio-etcd.default:2379
    alluxio.mount.table.source: ETCD
    alluxio.worker.membership.manager.type: ETCD
    alluxio.k8s.env.deployment: "true"
    alluxio.cluster.name: default-alluxio
```

注意以下自定义设置：

* 在 `spec.image` 下，指定自定义 Spark 镜像的位置。
* 在 `spec.mainApplicationFile` 中设置上传 jar 的 S3 路径，以替代 `s3a://<BUCKET_NAME>/<S3_PATH>/alluxioread_2.12-0.1.jar`
* 在以下位置设置 S3 访问凭证：
  * 驱动程序和执行器的`javaOptions`
  * `sparkConf` 中的 `spark.hadoop.fs.s3a.*` 属性
* 用于 `sparkConf` 和 `hadoopConf`的 Alluxio 特定配置，具体参看上一节[提供 Alluxio 属性作为 Spark 配置值](#提供-Alluxio-属性作为-Spark-配置值)。

#### 提交 Spark 作业并查看结果

运行以下命令来部署 spark 应用：

```console
$ kubectl create -n spark -f alluxio-sparkApplication.yaml
```

完成后，您将在输出路径中看到结果，在我们的示例中为：

```
$ aws s3 ls --recursive s3://<MY_BUCKET>/test/output
```

将输出复制到本地并查看`.csv` 文件。应该会在 `.csv` 文件中看到单词和计数对。

```
$ aws s3 cp --recursive s3://<MY_BUCKET>/test/output ./output
```

注意：

* 如果要重新运行作业，可能需要删除输出目录。这个操作需要在 Alluxio 层使用 `alluxio fs rm` 命令完成。示例如下：

```
# 将output目录删除，并从metastore缓存中移除
$ kubectl exec -it alluxio-coordinator-0 -c alluxio-coordinator -- alluxio fs rm -R s3://<MY_BUCKET>/test/output
```

* 如果重新构建了 Scala 应用程序或更改了输入文件，需要让缓存失效，以便新版本能够被 Alluxio 拉取并获得更新。示例如下：

```console
# 使缓存失效，以便任何新的输入文件或者 jar 文件都直接从 UFS 拉取
$ kubectl exec -it alluxio-coordinator-0 -c alluxio-coordinator -- alluxio job load --path <S3_PATH_OF_UPDATED_FILE> --submit

```
