# Spark on K8s

This guide describes how to configure Apache Spark to access Alluxio.

Applications using Spark 1.1 or later can access Alluxio through its HDFS-compatible interface. Using Alluxio as the data access layer, Spark applications can transparently access data in many different types of persistent storage services. Data can be actively fetched or transparently cached into Alluxio to speed up I/O performance especially when the Spark deployment is remote to the data. In addition, Alluxio can help simplify the architecture by decoupling compute and physical storage. When the data path in persistent under storage is hidden from Spark, changes to under storage can be independent from application logic; meanwhile, as a near-compute cache, Alluxio can still provide compute frameworks data-locality.

This guide describes how to integrate Apache Spark with Alluxio in a Kubernetes environment.

## Prerequisites

This guide assumes that the [Alluxio cluster is deployed on Kubernetes](https://documentation.alluxio.io/ee-da-en/da-3.5/start/install-alluxio-on-kubernetes).

`docker` is also required to build the custom Spark image.

## Prepare Image

To integrate with Spark, Alluxio jars and configuration files must be added within the Spark image. Spark containers need to be launched with this modified image in order to connect to the Alluxio cluster.

Among the files listed in the [Alluxio installation instructions](https://documentation.alluxio.io/ee-da-en/da-3.5/start/install-alluxio-on-kubernetes#Preparation) to download, locate the tarball named `alluxio-enterprise-DA-3.5-10.2.0-release.tar.gz`. Extract the following Alluxio jars from the tarball:

* `client/alluxio-DA-3.5-10.2.0-client.jar`
* `client/ufs/alluxio-underfs-s3a-shaded-DA-3.5-10.2.0.jar` if using a S3 bucket as an UFS

Prepare an empty directory as the working directory to build an image from. Within this directory, create the directory `files/alluxio/` and copy the aforementioned jar files into it.

Create a `Dockerfile` with the operations to modify the base Spark image. The following example defines arguments for:

* `SPARK_VERSION=3.5.2` as the Spark version
* `UFS_JAR=files/alluxio/alluxio-underfs-s3a-shaded-DA-3.5-10.2.0.jar` as the path to the UFS jar copied into `files/alluxio/`
* `CLIENT_JAR=files/alluxio/alluxio-DA-3.5-10.2.0-client.jar` as the path to the Alluxio client jar copied into `files/alluxio/`

```shell
# Use the official Spark 3.5.x (or above) image as the base image
ARG  SPARK_VERSION=3.5.2
ARG  IMAGE=apache/spark:${SPARK_VERSION}
FROM $IMAGE

ARG  SPARK_UID=185
ARG  UFS_JAR=files/alluxio/alluxio-underfs-s3a-shaded-DA-3.5-10.2.0.jar
ARG  CLIENT_JAR=files/alluxio/alluxio-DA-3.5-10.2.0-client.jar

USER root

# Create the /opt/alluxio directory
RUN  mkdir -p /opt/alluxio && \
     mkdir -p /opt/alluxio/lib

# Copy the Alluxio client JAR file to the /opt/alluxio directory
COPY ${UFS_JAR} /opt/alluxio/lib/
COPY ${CLIENT_JAR} /opt/alluxio/
ENV  SPARK_HOME=/opt/spark

WORKDIR /opt/spark/work-dir
RUN  chmod g+w /opt/spark/work-dir
RUN  chmod a+x /opt/decom.sh

ENTRYPOINT [ "/opt/entrypoint.sh" ]
USER  ${SPARK_UID}
```

Build the image by running, replacing `<PRIVATE_REGISTRY>` with the URL of your private container registry and `<SPARK_VERSION>` with the corresponding Spark version. In following examples, we will continue to utilize `3.5.2` as the Spark version as indicated by `<SPARK_VERSION>`.

```console
$ docker build --platform linux/amd64 -t <PRIVATE_REGISTRY>/alluxio/spark:<SPARK_VERSION> .
```

Push the image by running:

```console
$ docker push <PRIVATE_REGISTRY>/alluxio/spark:<SPARK_VERSION>
```

## Deploy Spark

There are a few things we need to do before submitting a Spark job:

* Install Spark operator
* Set Alluxio config map
* Create a service account for Alluxio (if you are using IAM)
* Add additional parameters in the Spark job

### Install Spark Operator

If you are using [aws-samples/emr-on-eks-benchmark](https://github.com/aws-samples/emr-on-eks-benchmark) to create the EKS cluster, the spark-operator will be installed in the scripts, so you do not need to install it again.

The following instructions are derived from the [spark-operator getting started guide](https://www.kubeflow.org/docs/components/spark-operator/getting-started/).

Add the spark-operator repo in Helm.

```console
$ helm repo add spark-operator https://kubeflow.github.io/spark-operator
$ helm repo update
```

To add custom configurations, you can create a `spark-operator.yaml` file. For example, the following example sets the namespace to `spark` (not required, but we will use this as example):

```yaml
spark:
  jobNamespaces:
  - spark
```

Install the spark operator with those configurations by running the command:

```shell
$ helm install spark-operator spark-operator/spark-operator -f spark-operator.yaml \
  --namespace spark-operator \
  --create-namespace \
  --set webhook.enable=true \
  --set rbac.create=true
```

The `webhook.enable` setting is needed to mount configmaps for Alluxio.

Check the status of the spark operator. If the status is `Running`, it is ready for jobs to be submitted.

```console
$ kubectl get pod -n spark-operator
NAME                                        READY   STATUS    RESTARTS   AGE
spark-operator-controller-5db84774f-2d72c   1/1     Running   0          28s
spark-operator-webhook-7c9fb4788-d98fw      1/1     Running   0          28s
```

When complete with Spark, uninstall the Spark operator and its related components with the command:

```console
$ helm uninstall spark-operator -n spark-operator
```

### Create a ConfigMap for Alluxio

This configmap is for the Spark jobs, as an Alluxio client, to understand the Alluxio configuration.

The configmap can be created from `alluxio-site.properties` of the existing Alluxio cluster config map built by the Alluxio operator. To show `alluxio-site.properties` from the Alluxio cluster config map, run:

```console
$ kubectl get configmap <ALLUXIO_NAMESPACE>-<ALLUXIO_CLUSTER_NAME>-conf -o yaml
```

If following the [Install Alluxio on Kubernetes](https://documentation.alluxio.io/ee-da-en/da-3.5/start/install-alluxio-on-kubernetes) instructions, the value of `<ALLUXIO_NAMESPACE>-<ALLUXIO_CLUSTER_NAME>-conf` would be `default-alluxio-conf`.

Using the following command will generate a `alluxio-config.yaml` file:

```console
$ cat <<EOF > alluxio-config.yaml
apiVersion: v1
kind: ConfigMap
metadata:
  name: alluxio-config
  namespace: spark
data:
  alluxio-site.properties: |-
$( kubectl -n ${alluxioNamespace} get configmap ${alluxioNamespace}-${alluxioClusterName}-conf -o json \
  | jq -r '.data["alluxio-site.properties"]' \
  | sed -e's/^/    /' )
EOF
```

Note:

* `${alluxioNamespace}` and `${alluxioClusterName}` should match the values for the existing Alluxio cluster. If you followed the [Install Alluxio on Kubernetes](https://documentation.alluxio.io/ee-da-en/da-3.5/start/install-alluxio-on-kubernetes) instructions, it would be `default` and `alluxio` respectively.
* The [`jq` command](https://jqlang.github.io/jq/) is used to parse JSON

Create the configmap by running the command:

```console
kubectl -n spark apply -f alluxio-config.yaml
```

### Create a Service Account for Alluxio

An Alluxio service account is used if you are using IAM for authentication/authorization.

Create a `spark-s3-access-sa.yaml` file, with the following contents:

```yaml
apiVersion: v1
kind: ServiceAccount
metadata:
  name: alluxio-s3-access
  namespace: spark
  annotations:
    eks.amazonaws.com/role-arn: arn:aws:iam::<YOUR_AWS_ACCOUNT_ID>:role/S3EKSServiceAccountRole
```

where `<YOUR_AWS_ACCOUNT_ID>` should be replaced with your AWS account ID.

Create the service account with the command:

```console
$ kubectl -n spark apply -f spark-s3-access-sa.yaml
```

### Provide Alluxio Properties as Spark Configuration Values for Job Submission

For the Spark cluster to properly communicate with the Alluxio cluster, certain properties must be aligned between the Alluxio client and Alluxio server.

In particular, the values for `hadoopConf` should be set to match the values of your Alluxio deployment. Take a note of these properties under `alluxio-site.properties` of the previously created `alluxio-config.yaml` file:

* `alluxio.etcd.endpoints`
* `alluxio.cluster.name`
* `alluxio.k8s.env.deployment`
* `alluxio.mount.table.source`
* `alluxio.worker.membership.manager.type`

Add the following in your spark application yaml file for job submission; see the next section for a full example of `alluxio-sparkApplication.yaml`.

```yaml
sparkConf:
  spark.hadoop.fs.s3a.aws.credentials.provider: com.amazonaws.auth.DefaultAWSCredentialsProviderChain
  spark.hadoop.fs.s3a.access.key: <YOUR_S3_ACCESS_KEY>
  spark.hadoop.fs.s3a.secret.key: <YOUR_S3_SECRET_KEY>
  spark.hadoop.fs.s3a.impl: alluxio.hadoop.FileSystem
  spark.hadoop.fs.s3.impl: alluxio.hadoop.FileSystem
  spark.driver.extraClassPath: /opt/alluxio/alluxio-DA-3.5-10.2.0-client.jar:/opt/alluxio/conf
  spark.executor.extraClassPath: /opt/alluxio/alluxio-DA-3.5-10.2.0-client.jar:/opt/alluxio/conf

hadoopConf:
  alluxio.etcd.endpoints: http://alluxio-etcd.default:2379
  alluxio.cluster.name: default-alluxio
  alluxio.k8s.env.deployment: "true"
  alluxio.mount.table.source: ETCD
  alluxio.worker.membership.manager.type: ETCD
```

The above example assumes Alluxio was deployed following the [Install Alluxio on Kubernetes](https://documentation.alluxio.io/ee-da-en/da-3.5/start/install-alluxio-on-kubernetes) instructions.

## Examples

### Using Spark to Read and Write a File

This section provides an example on how to use Spark to read and write a file. In this simple example, we will count words in an input file. To do that, we will need to do the following

* Create an input file containing any text. We will count how many times each word appears in that text file.
* Create an Scala program that contains the code to do the world count. It will take the input file above and output the result the specified location.
* Package the Scala program so that Spark can launch and execute.
* Submit the Spark job and validate the result

#### Create an input file

Create a input file `input.txt`. The input file can have any text content. This Scala example will count how many times a word occur in this file. Here is some sample content that you can copy paste into the input file entirely.

```
Hello world
Spark is awesome
Alluxio is awesome
Count the words in this file
Spark is great for big data
Alluxio is great for caching and speed up Spark
```

#### Create a Scala program

We need to write a Scala file and generate a JAR, and then upload JAR to s3 for spark job to call.

Create a Scala file `spark-scala-demo.scala` with the following example content:

```scala
import org.apache.spark.sql.{SparkSession, DataFrame}
import org.apache.spark.sql.functions._

object S3WordCount {
  def main(args: Array[String]): Unit = {
    // create SparkSession
    val spark = SparkSession.builder
      .appName("AlluxioSparkExample")
      .master("local[*]")
      .getOrCreate()

    // create sparkContext
    val sc = spark.sparkContext

    // Define S3 paths
    val inputPath = "s3://<MY_BUCKET>/test/input.txt" // Replace with your input path
    val outputPath = "s3://<MY_BUCKET>/test/output" // Replace with your output path

    // Read file from S3 into a DataFrame
    val textDF = spark.read.text(inputPath)

    // Perform word count transformation
    val wordCounts = textDF
      .select(explode(split(col("value"), "\\s+")).as("word")) // Split and explode into individual words
      .groupBy("word")                                         // Group by word
      .agg(count("*").as("count"))                             // Count occurrences of each word

    // Save the result to S3 using overwrite mode
    wordCounts.write.mode("overwrite").format("csv").save(outputPath)

    // Stop SparkSession
    spark.stop()
  }
}
```

Update the `inputPath` to the S3 path where you put your input file, and the `outputPath` path is the S3 path where you want your output file be. They should be accessible by the provided credentials.

#### Package the Scala application

First create file `build.sbt` with the following contents:

```
scalaVersion := "2.12.8"
libraryDependencies += "org.apache.spark" %% "spark-core" % "3.3.1"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "3.3.1"
```

Use the `sbt` tool to build JAR under the folder with the scala file. If `sbt` is not already installed, run `$ brew install sbt`

```bash
$ sbt package
```

Find the file in `./target/scala-2.12/` directory, note its name (i.e. `<SPARK_JOB_JAR_FILE>.jar`). Upload it to S3, rename it if you want. We will call it `alluxioread_2.12-0.1.jar` in this example:

```console
$ aws s3 cp ./target/scala-2.12/<SPARK_JOB_JAR_FILE>.jar s3://<BUCKET_NAME>/<S3_PATH>/alluxioread_2.12-0.1.jar 
```

replacing `<BUCKET_NAME>/<S3_PATH>` with an accessible S3 location.

#### Create Spark Application

Create a `alluxio-sparkApplication.yaml` file with the following example content:

```yaml
apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
  name: alluxio-word-count
  namespace: spark
spec:
  type: Scala
  mode: cluster
  image: <PRIVATE_REGISTRY>/alluxio/spark:<SPARK_VERSION>
  imagePullPolicy: Always
  mainClass: S3WordCount
  mainApplicationFile: s3://<BUCKET_NAME>/<S3_PATH>/alluxioread_2.12-0.1.jar
  sparkVersion: 3.5.2
  driver:
    javaOptions: "-Divy.cache.dir=/tmp -Divy.home=/tmp -Daws.accessKeyId=<YOUR_S3_ACCESS_KEY> -Daws.secretKey=<YOUR_S3_SECRET_KEY>"
    labels:
      version: 3.5.2
    cores: 1
    coreLimit: 1200m
    memory: 512m
    serviceAccount: alluxio-s3-access
    configMaps:
      - name: alluxio-config
        path: /opt/alluxio/conf/
  executor:
    labels:
      version: 3.5.2
    cores: 1
    coreLimit: 1200m
    memory: 512m
    configMaps:
      - name: alluxio-config
        path: /opt/alluxio/conf/
  deps:
    repositories:
      - https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-aws
    packages:
      - org.apache.hadoop:hadoop-aws:3.2.2
  sparkConf:
    spark.hadoop.fs.s3a.aws.credentials.provider: com.amazonaws.auth.DefaultAWSCredentialsProviderChain
    spark.hadoop.fs.s3a.access.key: <YOUR_S3_ACCESS_KEY>
    spark.hadoop.fs.s3a.secret.key: <YOUR_S3_SECRET_KEY>
    spark.hadoop.fs.s3a.impl: alluxio.hadoop.FileSystem
    spark.hadoop.fs.s3.impl: alluxio.hadoop.FileSystem
    spark.driver.extraClassPath: /opt/alluxio/alluxio-DA-3.5-10.2.0-client.jar:/opt/alluxio/conf
    spark.executor.extraClassPath: /opt/alluxio/alluxio-DA-3.5-10.2.0-client.jar:/opt/alluxio/conf
  hadoopConf:
    alluxio.etcd.endpoints: http://alluxio-etcd.default:2379
    alluxio.mount.table.source: ETCD
    alluxio.worker.membership.manager.type: ETCD
    alluxio.k8s.env.deployment: "true"
    alluxio.cluster.name: default-alluxio
```

Note the following customizations:

* Under `spec.image`, specify the location of the custom Spark image
* Set the S3 path to the uploaded jar in `spec.mainApplicationFile` in place of `s3://<BUCKET_NAME>/<S3_PATH>/alluxioread_2.12-0.1.jar`
* Set the access credentials to S3 in the following locations:
  * `javaOptions` for both the driver and executor
  * As `spark.hadoop.fs.s3a.*` properties in `sparkConf`
* Alluxio specific configurations for `sparkConf` and `hadoopConf`, as previously described in [provide Alluxio properties to Spark](#provide-alluxio-properties-as-spark-configuration-values-for-job-submission)

#### Submit Spark job and see results

Deploy the spark application with the command:

```console
$ kubectl create -n spark -f alluxio-sparkApplication.yaml
```

Once it finish, you will see result in the output path, in our example it will be

```
$ aws s3 ls --recursive s3://<MY_BUCKET>/test/output
```

Copy output to local and inspect the `.csv` file. You should see `word, count` pairs in the `.csv` file.

```
$ aws s3 cp --recursive s3://<MY_BUCKET>/test/output ./output
```

Note

* If you want to rerun the job, you may need to remove the output directory. It needs to be done in the Alluxio layer using `alluxio fs rm`. Here is an example

```
# remove the output location physically and from the metastore cache
$ kubectl exec -it alluxio-coordinator-0 -c alluxio-coordinator -- alluxio fs rm -R s3://<MY_BUCKET>/test/output
```

* If you rebuild your Scala application or change the input file, you need to invalidate the cache, so the new version will be pulled into Alluxio and get the update. Here is an example

```console
# invalidate the cache so any new inputfile and/or jar file is pulled from UFS
$ kubectl exec -it alluxio-coordinator-0 -c alluxio-coordinator -- alluxio job load --path <S3_PATH_OF_UPDATED_FILE> --submit
```
