RAPIDS Accelerator for Apache Spark
什么是RAPIDS Accelerator for Apache Spark
RAPIDS Accelerator for Apache Spark是RAPIDS项目的一个重要组件,旨在为Apache Spark提供GPU加速功能。它通过将数据处理任务和算法移至GPU上执行,以实现Spark程序的高性能和高效率。
RAPIDS Accelerator for Apache Spark的主要功能包括以下几个方面:
GPU加速的数据处理:RAPIDS Accelerator for Apache Spark利用GPU的并行计算能力,加速了数据处理操作,如数据的加载、转换、过滤、排序等。通过在GPU上执行这些操作,可以极大地提高Spark程序的处理速度和吞吐量。
GPU加速的机器学习算法:RAPIDS Accelerator for Apache Spark还提供了一系列GPU加速的机器学习算法,包括回归、分类、聚类、降维等。这些算法在GPU上执行,可以显著缩短训练和推断的时间,加速模型构建和预测的过程。
兼容性:RAPIDS Accelerator for Apache Spark与Apache Spark的生态系统无缝集成,可以直接在Spark程序中使用。它提供了与Spark DataFrame和SQL API兼容的接口,使得在Spark中使用GPU加速变得更加简单和方便。
Spark RAPIDS 插件具有以下功能和限制:
支持在 GPU 上通过列式处理运行 Spark SQL。
不需要用户更改 API。
支持行与列之间的转换处理。
在 GPU 上运行支持的Spark SQL 操作,如果某项操作没有在 GPU 上实现或不兼容 GPU,会回退以使用 Spark CPU 版本。
不能运行 RDD 操作。
若启用 shuffle 插件,则支持溢出到磁盘。在这种情况下,如果某个操作耗尽内存,shuffle数据便会溢出以释放空间。
使用方法
Spark RAPIDS的使用步骤如下:
- 下载并安装相应版本的JDK,GPU驱动和CUDA Toolkit
- 下载并安装Spark
- 下载对应的Spark RAPIDS的jar包,需要根据使用的驱动版本,CUDA版本,Spark版本和显卡型号确定需要使用的Spark RAPIDS版本。最新版本和历史版本
- 下载GPU发现脚本,在集群中需要在每个节点上下载
- 在创建spark session时进行配置:
Spark shell 和 ./bin/spark-submit 通过 --conf 等命令行选项,或通过从 conf/spark- defaults.conf 读取配置选项来支持动态加载配置属性。 可以使用 --conf 键值对请求 GPU 并将其分配给相关任务。 分配 GPU 时所用的几个配置键值属性如下:
- 请求 executor 使用 GPU --conf spark.executor.resource.gpu.amount=1
- 指定每个任务的 GPU 数量:--conf spark.task.resource.gpu.amount=1
- 指定GPU发现脚本(需在 YARN 和 K8s 上执行此操作):--conf spark.executor.resource.gpu.discoveryScript=./getGpusResources.sh
更多配置项请参阅RAPIDS Accelerator for Apache Spark Configuration
配置示例:
Local Mode
$SPARK_HOME/bin/spark-shell \
--master local \
--num-executors 1 \
--conf spark.executor.cores=1 \
--conf spark.rapids.sql.concurrentGpuTasks=1 \
--driver-memory 10g \
--conf spark.rapids.memory.pinnedPool.size=2G \
--conf spark.sql.files.maxPartitionBytes=512m \
--conf spark.plugins=com.nvidia.spark.SQLPlugin \
--jars ${SPARK_RAPIDS_PLUGIN_JAR}
K8s Cluster Mode
$SPARK_HOME/bin/spark-shell \
--master $K8SMASTER \
--name mysparkshell \
--deploy-mode client \
--conf spark.executor.instances=1 \
--conf spark.executor.resource.gpu.amount=1 \
--conf spark.executor.memory=4G \
--conf spark.executor.cores=1 \
--conf spark.task.cpus=1 \
--conf spark.task.resource.gpu.amount=1 \
--conf spark.rapids.memory.pinnedPool.size=2G \
--conf spark.executor.memoryOverhead=3G \
--conf spark.sql.files.maxPartitionBytes=512m \
--conf spark.sql.shuffle.partitions=10 \
--conf spark.plugins=com.nvidia.spark.SQLPlugin \
--conf spark.kubernetes.namespace=$SPARK_NAMESPACE \
--conf spark.executor.resource.gpu.discoveryScript=/opt/sparkRapidsPlugin/getGpusResources.sh \ # 需配置GPU发现脚本
--conf spark.executor.resource.gpu.vendor=nvidia.com \
--conf spark.kubernetes.container.image=$IMAGE_NAME \
--conf spark.executor.extraClassPath=/opt/sparkRapidsPlugin/rapids-4-spark_<version>.jar \
--driver-class-path=./rapids-4-spark_<version>.jar \
--driver-memory 2G
Spark RAPIDS支持的算子
Spark RAPIDS支持的算子以及各个算子支持的数据类型在官方文档中有详细说明,需要注意的是,Spark RAPIDS对UDF的支持非常有限,具体如下:
支持以下UDF
- 实现Function接口的Scala UDF,并通过SparkSession.udf.register进行注册
- 实现org.apache.spark.sql.api.java.UDF接口之一的Java UDF,并通过SparkSession.udf.register或在PySpark中的spark.udf.registerJavaFunction进行注册
- 简单或通用的Hive UDF
不支持其他形式的Spark UDF,例如:
- Scala或Java用户定义的聚合函数(User-Defined Aggregate Functions)
- Hive聚合函数(UDAF)
- Hive表函数(UDTF)
- Lambda函数
一些简单测试
测试环境:
- 系统:Ubuntu 20.04
- 显卡型号:RTX A4000 * 1
- 驱动版本:535.129.03
- CUDA版本:CUDA 11.3
- Spark版本:3.5.0
- Spark RAPIDS: Release v23.10.0
加速成功示例
测试代码
val startTIme = System.currentTimeMillis()
val df = sc.makeRDD(1 to 10000000, 6).toDF
val df2 = sc.makeRDD(1 to 10000000, 6).toDF
df.select( $"value" as "a").join(df2.select($"value" as "b"), $"a" === $"b").count
val endTime = System.currentTimeMillis()
println(s"used time is ${endTime - startTIme} ms")
使用GPU加速
spark-shell配置命令如下:
$SPARK_HOME/bin/spark-shell \
--master local \
--num-executors 1 \
--conf spark.executor.cores=1 \
--conf spark.rapids.sql.concurrentGpuTasks=1 \
--driver-memory 10g \
--conf spark.rapids.memory.pinnedPool.size=2G \
--conf spark.sql.files.maxPartitionBytes=512m \
--conf spark.plugins=com.nvidia.spark.SQLPlugin \
--jars ${SPARK_RAPIDS_PLUGIN_JAR}
运行结果:
used time is 9830 ms
startTIme: Long = 1702024612450
df: org.apache.spark.sql.DataFrame = [value: int]
df2: org.apache.spark.sql.DataFrame = [value: int]
endTime: Long = 1702024622280
不使用GPU加速
spark-shell配置命令如下:
$SPARK_HOME/bin/spark-shell \
--master local \
--num-executors 1 \
--conf spark.executor.cores=1 \
--driver-memory 10g
运行结果:
used time is 24067 ms
startTIme: Long = 1702024623300
df: org.apache.spark.sql.DataFrame = [value: int]
df2: org.apache.spark.sql.DataFrame = [value: int]
endTime: Long = 1702024647367
结果对比
可以看到,GPU加速的spark运算速度显著高于原生spark(9830 ms vs 24067 ms)
未加速成功的示例
在以上相同环境中运行以下示例代码时,spark未获得GPU加速:
val NUM_SAMPLES = 100000000
val startTIme = System.currentTimeMillis()
val count = spark.sparkContext.parallelize(1 to NUM_SAMPLES).filter { _ =>
val x = math.random
val y = math.random
x * x + y * y < 1
}.count()
val endTime = System.currentTimeMillis()
println(s"used time is ${endTime - startTIme} ms")
实际上从官方文档上看,Spark RAPIDS显然是支持filter算子的,未获得加速的原因可能是它不支持某些UDF,例如Lambda函数(参见前文)。
未完待续
更多信息请参阅
基于NVIDIA GPU和RAPIDS加速Spark 3.0
Accelerating_Apache_Spark_3.X-zhCN
为了解决数据处理的延迟,后续将进行RDMA等技术的研究。
💬 评论讨论