RAPIDS Accelerator for Apache Spark

什么是RAPIDS Accelerator for Apache Spark

RAPIDS Accelerator for Apache Spark是RAPIDS项目的一个重要组件,旨在为Apache Spark提供GPU加速功能。它通过将数据处理任务和算法移至GPU上执行,以实现Spark程序的高性能和高效率。

RAPIDS Accelerator for Apache Spark的主要功能包括以下几个方面:

  1. GPU加速的数据处理:RAPIDS Accelerator for Apache Spark利用GPU的并行计算能力,加速了数据处理操作,如数据的加载、转换、过滤、排序等。通过在GPU上执行这些操作,可以极大地提高Spark程序的处理速度和吞吐量。

  2. GPU加速的机器学习算法:RAPIDS Accelerator for Apache Spark还提供了一系列GPU加速的机器学习算法,包括回归、分类、聚类、降维等。这些算法在GPU上执行,可以显著缩短训练和推断的时间,加速模型构建和预测的过程。

  3. 兼容性:RAPIDS Accelerator for Apache Spark与Apache Spark的生态系统无缝集成,可以直接在Spark程序中使用。它提供了与Spark DataFrame和SQL API兼容的接口,使得在Spark中使用GPU加速变得更加简单和方便。

Spark RAPIDS 插件具有以下功能和限制:

支持在 GPU 上通过列式处理运行 Spark SQL。

不需要用户更改 API。

支持行与列之间的转换处理。

在 GPU 上运行支持的Spark SQL 操作,如果某项操作没有在 GPU 上实现或不兼容 GPU,会回退以使用 Spark CPU 版本。

不能运行 RDD 操作。

若启用 shuffle 插件,则支持溢出到磁盘。在这种情况下,如果某个操作耗尽内存,shuffle数据便会溢出以释放空间。

使用方法

Spark RAPIDS的使用步骤如下:

  1. 下载并安装相应版本的JDK,GPU驱动和CUDA Toolkit
  2. 下载并安装Spark
  3. 下载对应的Spark RAPIDS的jar包,需要根据使用的驱动版本,CUDA版本,Spark版本和显卡型号确定需要使用的Spark RAPIDS版本。最新版本历史版本
  4. 下载GPU发现脚本,在集群中需要在每个节点上下载
  5. 在创建spark session时进行配置:

    Spark shell 和 ./bin/spark-submit 通过 --conf 等命令行选项,或通过从 conf/spark- defaults.conf 读取配置选项来支持动态加载配置属性。 可以使用 --conf 键值对请求 GPU 并将其分配给相关任务。 分配 GPU 时所用的几个配置键值属性如下:

    • 请求 executor 使用 GPU --conf spark.executor.resource.gpu.amount=1
    • 指定每个任务的 GPU 数量:--conf spark.task.resource.gpu.amount=1
    • 指定GPU发现脚本(需在 YARN 和 K8s 上执行此操作):--conf spark.executor.resource.gpu.discoveryScript=./getGpusResources.sh

更多配置项请参阅RAPIDS Accelerator for Apache Spark Configuration

配置示例:

Local Mode

$SPARK_HOME/bin/spark-shell \
       --master local \
       --num-executors 1 \
       --conf spark.executor.cores=1 \
       --conf spark.rapids.sql.concurrentGpuTasks=1 \
       --driver-memory 10g \
       --conf spark.rapids.memory.pinnedPool.size=2G \
       --conf spark.sql.files.maxPartitionBytes=512m \
       --conf spark.plugins=com.nvidia.spark.SQLPlugin \
       --jars ${SPARK_RAPIDS_PLUGIN_JAR}

K8s Cluster Mode

$SPARK_HOME/bin/spark-shell \
     --master $K8SMASTER \
     --name mysparkshell \
     --deploy-mode client  \
     --conf spark.executor.instances=1 \
     --conf spark.executor.resource.gpu.amount=1 \
     --conf spark.executor.memory=4G \
     --conf spark.executor.cores=1 \
     --conf spark.task.cpus=1 \
     --conf spark.task.resource.gpu.amount=1 \
     --conf spark.rapids.memory.pinnedPool.size=2G \
     --conf spark.executor.memoryOverhead=3G \
     --conf spark.sql.files.maxPartitionBytes=512m \
     --conf spark.sql.shuffle.partitions=10 \
     --conf spark.plugins=com.nvidia.spark.SQLPlugin \
     --conf spark.kubernetes.namespace=$SPARK_NAMESPACE  \
     --conf spark.executor.resource.gpu.discoveryScript=/opt/sparkRapidsPlugin/getGpusResources.sh \ # 需配置GPU发现脚本
     --conf spark.executor.resource.gpu.vendor=nvidia.com \
     --conf spark.kubernetes.container.image=$IMAGE_NAME \
     --conf spark.executor.extraClassPath=/opt/sparkRapidsPlugin/rapids-4-spark_<version>.jar \
     --driver-class-path=./rapids-4-spark_<version>.jar \
     --driver-memory 2G

Spark RAPIDS支持的算子

Spark RAPIDS支持的算子以及各个算子支持的数据类型在官方文档中有详细说明,需要注意的是,Spark RAPIDS对UDF的支持非常有限,具体如下:

支持以下UDF

  • 实现Function接口的Scala UDF,并通过SparkSession.udf.register进行注册
  • 实现org.apache.spark.sql.api.java.UDF接口之一的Java UDF,并通过SparkSession.udf.register或在PySpark中的spark.udf.registerJavaFunction进行注册
  • 简单或通用的Hive UDF

不支持其他形式的Spark UDF,例如:

  • Scala或Java用户定义的聚合函数(User-Defined Aggregate Functions)
  • Hive聚合函数(UDAF)
  • Hive表函数(UDTF)
  • Lambda函数

一些简单测试

测试环境:

  • 系统:Ubuntu 20.04
  • 显卡型号:RTX A4000 * 1
  • 驱动版本:535.129.03
  • CUDA版本:CUDA 11.3
  • Spark版本:3.5.0
  • Spark RAPIDS: Release v23.10.0

加速成功示例

测试代码

val startTIme = System.currentTimeMillis()
val df = sc.makeRDD(1 to 10000000, 6).toDF
val df2 = sc.makeRDD(1 to 10000000, 6).toDF
df.select( $"value" as "a").join(df2.select($"value" as "b"), $"a" === $"b").count
val endTime = System.currentTimeMillis()
println(s"used time is ${endTime - startTIme} ms")

使用GPU加速

spark-shell配置命令如下:

$SPARK_HOME/bin/spark-shell \
       --master local \
       --num-executors 1 \
       --conf spark.executor.cores=1 \
       --conf spark.rapids.sql.concurrentGpuTasks=1 \
       --driver-memory 10g \
       --conf spark.rapids.memory.pinnedPool.size=2G \
       --conf spark.sql.files.maxPartitionBytes=512m \
       --conf spark.plugins=com.nvidia.spark.SQLPlugin \
       --jars ${SPARK_RAPIDS_PLUGIN_JAR}

运行结果:

used time is 9830 ms
startTIme: Long = 1702024612450
df: org.apache.spark.sql.DataFrame = [value: int]
df2: org.apache.spark.sql.DataFrame = [value: int]
endTime: Long = 1702024622280

不使用GPU加速

spark-shell配置命令如下:

$SPARK_HOME/bin/spark-shell \
       --master local \
       --num-executors 1 \
       --conf spark.executor.cores=1 \
       --driver-memory 10g

运行结果:

used time is 24067 ms
startTIme: Long = 1702024623300
df: org.apache.spark.sql.DataFrame = [value: int]
df2: org.apache.spark.sql.DataFrame = [value: int]
endTime: Long = 1702024647367

结果对比

可以看到,GPU加速的spark运算速度显著高于原生spark(9830 ms vs 24067 ms)

未加速成功的示例

在以上相同环境中运行以下示例代码时,spark未获得GPU加速:

val NUM_SAMPLES = 100000000

val startTIme = System.currentTimeMillis()

val count = spark.sparkContext.parallelize(1 to NUM_SAMPLES).filter { _ =>
    val x = math.random
    val y = math.random
    x * x + y * y < 1
}.count()

val endTime = System.currentTimeMillis()

println(s"used time is ${endTime - startTIme} ms")

实际上从官方文档上看,Spark RAPIDS显然是支持filter算子的,未获得加速的原因可能是它不支持某些UDF,例如Lambda函数(参见前文)。

未完待续

更多信息请参阅

官方文档

基于NVIDIA GPU和RAPIDS加速Spark 3.0

Accelerating_Apache_Spark_3.X-zhCN

为了解决数据处理的延迟,后续将进行RDMA等技术的研究。