Apache Spark
Last updated
Last updated
该指南描述了如何配置 来访问 Alluxio。
Spark 1.1 或更高版本的 Spark 应用程序可以通过其与 HDFS 兼容的接口直接访问 Alluxio 集群。 使用 Alluxio 作为数据访问层,Spark 应用程序可以透明地访问许多不同类型的持久化存储服务(例如,AWS S3 bucket、Azure Object Store buckets、远程部署的 HDFS 等)的数据,也可以透明地访问同一类型持久化存储服务不同实例中的数据。 为了加快 I/O 性能,用户可以主动获取数据到 Alluxio 中或将数据透明地缓存到 Alluxio 中。 这种做法尤其是在 Spark 部署位置与数据相距较远时特别有效。 此外,通过将计算和物理存储解耦,Alluxio 能够有助于简化系统架构。 当底层持久化存储中真实数据的路径对 Spark 隐藏时,对底层存储的更改可以独立于应用程序逻辑;同时,Alluxio 作为邻近计算的缓存,仍然可以给计算框架提供类似 Spark 数据本地性的特性。
安装 Java 8 Update 60 或更高版本(8u60+)的 64 位 Java。
已经安装并运行 Alluxio。 本指南假设底层持久存储为本地部署的 HDFS。例如,${ALLUXIO_HOME}/conf/alluxio-site.properties
中包含alluxio.master.mount.table.root.ufs=hdfs://localhost:9000/alluxio/
这一行。 请注意,除了 HDFS,Alluxio 还支持许多其他底层存储系统。 从任意数量的这些系统中访问数据与本指南的重点是垂直的, 介绍了相关内容。
确保 Alluxio 客户端 jar 包是可用的。 在从 Alluxio 下载的压缩包的{{site.ALLUXIO_CLIENT_JAR_PATH}}
中,可以找到 Alluxio 客户端 jar 包。 高级用户也可以从源代码编译该客户端 jar 包,可以参考。
将 Alluxio客户端 jar 包分发在运行 Spark driver 或 executor 的节点上。具体地说,将客户端 jar 包放在每个节点上的同一本地路径(例如{{site.ALLUXIO_CLIENT_JAR_PATH}}
)。
将 Alluxio 客户端 jar 包添加到 Spark driver 和 executor 的 classpath 中,以便 Spark 应用程序能够使用客户端 jar 包在 Alluxio 中读取和写入文件。具体来说,在运行 Spark 的每个节点上,将以下几行添加到spark/conf/spark-defaults.conf
中。
本节介绍如何使用 Alluxio 作为 Spark 应用程序的输入和输出。
将本地数据复制到 Alluxio 文件系统中。 假设你在 Alluxio 项目目录中,将LICENSE
文件放入 Alluxio,运行:
假设 Alluxio Master 运行在localhost
上,在spark-shell
中运行如下命令:
给出准确路径后,Alluxio 支持透明地从底层存储系统中获取数据。 在本节中,使用 HDFS 作为分布式存储系统的示例。
将Input_HDFS
文件放入到 HDFS 中:
请注意,Alluxio 并不知道该文件。你可以通过访问 Web UI 来验证这一点。 假设 Alluxio Master 运行在localhost
上,在spark-shell
中运行如下命令:
让我们以设置 Spark 与 HA 模式的 Alluxio 服务进行通信为例。 如果你运行多个 Alluxio master,其中 Zookeeper 服务运行在zkHost1:2181
、zkHost2:2181
和zkHost3:2181
, 将以下几行添加到${SPARK_HOME}/conf/spark-defaults.conf
中:
或者,你也可以在 Hadoop 配置文件${SPARK_HOME}/conf/core-site.xml
中添加如下属性:
Spark 用户可以将 JVM 系统设置传递给 Spark 任务,通过将"-Dproperty=value"
添加到spark.executor.extraJavaOptions
来设置 Spark executor,将"-Dproperty=value"
添加到spark.driver.extraJavaOptions
中来设置 spark driver。例如,要在写入 Alluxio 时提交CACHE_THROUGH
写模式的 Spark 任务,请执行以下操作:
如果 Spark 已经根据 HA 模式的Alluxio中的步骤进行了设置, 你就可以使用"alluxio://
"方案编写 URI,而无需在权限中指定 Alluxio master。 这是因为在 HA 模式下,Alluxio 主 master 的地址将由配置的 ZooKeeper 服务提供 , 而不是由从 URI 推断的用户指定主机名提供。
或者,如果在 Spark 配置中没有设置 Alluxio HA 的 Zookeeper 地址,则可以在 URI 中以"zk@zkHost1:2181;zkHost2:2181;zkHost3:2181
"的格式指定 Zookeeper 地址:
存储 RDD 到 Alluxio 内存中就是将 RDD 作为文件保存到 Alluxio 中。 在 Alluxio 中将 RDD 保存为文件的两种常见方法是
saveAsTextFile
:将 RDD 作为文本文件写入,其中每个元素都是文件中的一行,
saveAsObjectFile
:通过对每个元素使用 Java 序列化,将 RDD 写到一个文件中。
通过分别使用sc.textFile
或sc.objectFile
,可以从内存中再次读取保存在 Alluxio 中的 RDD。
存储 Spark DataFrame 到 Alluxio 内存中就是将 DataFrame 作为文件保存到 Alluxio 中。 DataFrame 通常用df.write.parquet()
作为 parquet 文件写入。 将 parquet 写入 Alluxio 后,可以使用sqlContext.read.parquet()
从内存中读取。
注意:Alluxio worker 使用主机名来表示网络地址,以便与 HDFS 保持一致。 有一个变通方法,可以在启动 Spark 时实现数据本地性。用户可以使用 Spark 中提供的以下脚本显式指定主机名。在每个从节点中以 slave-hostname 启动 Spark worker:
例如:
你也可以在$SPARK_HOME/conf/spark-env.sh
中设置SPARK_LOCAL_HOSTNAME
来达到此目的。例如:
无论采用哪种方式,Spark Worker 地址都将变为主机名,并且本地性级别将变为NODE_LOCAL
,如下面的 Spark WebUI 所示。
为了最大化实现 Spark 作业的本地性,你应该尽可能多地使用 executor,我们希望每个节点至少有一个 executor。 和部署 Alluxio 的所有方法一样,所有计算节点上也应该有一个 Alluxio worker。
当 Spark 作业在 YARN 上运行时,Spark 会在不考虑数据本地性的情况下启动其 executor。 之后 Spark 在决定怎样为其 executor 分配任务时会正确地考虑数据的本地性。 例如,如果host1
包含blockA
,并且使用blockA
的作业已经在 YARN 集群上以--num-executors=1
的方式启动了,Spark 可能会将唯一的 executor 放置在host2
上,本地性会较差。 但是,如果以--num-executors=2
的方式启动,并且 executor 在host1
和host2
上启动,Spark 会足够智能地将作业优先放置在host1
上。
Class alluxio.hadoop.FileSystem not found
与 SparkSQL 和 Hive MetaStore 有关的问题java.io.IOException: No FileSystem for scheme: alluxio
与在 YARN 上运行 Spark 有关的问题如果你在 YARN 上使用基于 Alluxio 的 Spark 并遇到异常java.io.IOException: No FileSystem for scheme: alluxio
, 请将以下内容添加到${SPARK_HOME}/conf/core-site.xml
:
打开浏览器,查看 。 应该存在一个输出目录/Output
,其中包含了输入文件Input
的双倍内容。
打开浏览器,查看 。 应该存在一个输出目录Output_HDFS
,其中包含了输入文件Input_HDFS
的双倍内容。 同时,现在输入文件Input_HDFS
会被 100% 地加载到 Alluxio 的文件系统空间。
在 Alluxio 1.8 (不包含 1.8)后,用户可以在 Alluxio URI 中编码 Zookeeper 服务地址(见#ha-configuration-parameters)。 这样,就不需要为 Spark 配置额外设置。
如果需要自定义 Spark 任务中的 Alluxio 客户端侧属性,请参见。
请注意,在客户端模式中,你需要设置--driver-java-options "-Dalluxio.user.file.writetype.default=CACHE_THROUGH"
,而不是--conf spark.driver.extraJavaOptions=-Dalluxio.user.file.writetype.default=CACHE_THROUGH
(见)。
请注意,你必须使用分号而不是逗号来分隔不同的 ZooKeeper 地址,以便在 Spark 中引用 HA 模式的 Alluxio 的 URI;否则,Spark 会认为该 URI 无效。请参阅。
见博客文章。
见博客文章"".
如果是为了调试,你可以配置 Spark 应用程序的日志。 Spark 文档解释了 。
如果你用的是 YARN,则有单独一节来解释 。
如果 Spark 任务的本地性级别是ANY
(本应该是NODE_LOCAL
),这可能是因为 Alluxio 和 Spark 使用不同的网络地址表示,可能其中一个使用主机名,而另一个使用 IP 地址。更多详情请参考 JIRA ticket (这里可以找到 Spark 社区的解决方案)。
为了用 Alluxio 客户端运行spark-shell
,Alluxio 客户端 jar 包必须如的那样,被添加到 Spark driver 和 Spark executor 的 classpath 中。 然而有的时候,SparkSQL 在保存表到 Hive MetaStore(位于 Alluxio 中)中时可能会失败,出现类似于下面的错误信息:
推荐的解决方案是配置。 在 Spark 1.4.0 和之后的版本中,Spark 为了访问 Hive MetaStore 使用了独立的类加载器来加载 java 类。 然而,这个独立的类加载器忽视了特定的包,并且让主类加载器去加载"共享"类(Hadoop 的 HDFS 客户端就是一种"共享"类)。 Alluxio 客户端也应该由主类加载器加载,你可以将alluxio
包加到配置参数spark.sql.hive.metastore.sharedPrefixes
中,以通知 Spark 用主类加载器加载 Alluxio。例如,该参数可以在spark/conf/spark-defaults.conf
中这样设置: