Spark中内存分配相关的参数通过SparkConf进行配置,如下示例:
val conf = new SparkConf()
.setAppName("MyApp")
.setMaster("local[2]") // 两个CPU核心
.set("spark.executor.memory", "2g") // 设置每个executor内存为2GB
.set("spark.driver.memory", "1g") // 设置driver内存为1GB
或者通过SparkSession中的config项进行配置,如下示例:
val spark1 = SparkSession.builder().appName("test").master("local")
.config("spark.executor.memory","2g")
.config("spark.driver.memory","1g")
.config("spark.memory.fraction", 0.6)
.getOrCreate()
也可以在提交任务时通过spark-submit进行配置,如下示例:
spark-submit --class com.example.MyApp --master local[2] --executor-memory 2g --driver-memory 1g myapp.jar
以下是与内存分配相关的一些常用配置参数:
executor内存配置:
spark.executor.memory: 指定每个Executor进程可以使用的最大内存,例如 4g 表示4GB内存。 driver内存配置:
spark.driver.memory: 指定Driver程序可以使用的内存大小,例如 4g 表示4GB内存。
Executor内存分配策略:
- spark.memory.fraction: 设置用于Execution和Storage的内存比例,默认值为0.6。
- spark.memory.storageFraction: 设置Storage内存占总内存的比例,默认为0.5。
Executor内存管理:
- spark.memory.offHeap.enabled: 是否启用Executor的堆外内存,可以提高内存使用效率。默认为false。
- spark.memory.offHeap.size: 指定堆外内存大小,例如 1g 表示1GB堆外内存。
下面来具体叙述其机制
概述
本文介绍Spark1.6之后引入的统一内存管理机制(Unified Memory Manager)。
在1.6之前,Spark内存管理主要依赖静态管理(Static Memory Manager)模式,Execution内存和Storage内存的分配占比全部是静态的,其值为系统预先设置的默认参数。在Spark1.6后,为了考虑内存管理的动态灵活性,Spark的内存管理改为统一管理(Unified Memory Manager)模式,支持Storage和Execution内存动态占用。至于静态管理方式任然被保留,可通过spark.memory.useLegacyMode参数启用。
首先简单的介绍一下Spark运行的基本流程。
用户在Driver端提交任务,初始化运行环境(SparkContext等)
Driver根据配置向ResoureManager申请资源(executors及内存资源)
ResoureManager资源管理器选择合适的worker节点创建executor进程
Executor向Driver注册,并等待其分配task任务
Driver端完成SparkContext初始化,创建DAG,分配task到Executor上执行。
Executor启动线程执行task任务,返回结果。
Spark的数据计算主要是在Executor内完成的,而Executor对于RDD的持久化存储以及Shuffle运行过程,均在Spark内存管理机制下统一进行,其内运行的task任务也共享Executor内存,因此本文主要围绕Executor的内存管理进行展开描述。
堆内内存和堆外内存
作为一个 JVM 进程,Executor 的内存管理建立在 JVM 的内存管理之上,Spark 对 JVM 的堆内(On-heap)空间进行了更为详细的分配,以充分利用内存。同时,Spark 引入了堆外(Off-heap)内存,使之可以直接在工作节点的系统内存中开辟空间,进一步优化了内存的使用。
堆内内存
堆内内存的大小,由 Spark 应用程序启动时的 –executor-memory 或 spark.executor.memory 参数配置。Executor 内运行的并发任务共享 JVM 堆内内存,这些任务在缓存 RDD 数据和广播(Broadcast)数据时占用的内存被规划为存储(Storage)内存,而这些任务在执行 Shuffle 时占用的内存被规划为执行(Execution)内存,剩余的部分不做特殊规划,那些 Spark 内部的对象实例,或者用户定义的 Spark 应用程序中的对象实例,均占用剩余的空间。Spark 对堆内内存的管理是一种逻辑上的”规划式”的管理,因为对象实例占用内存的申请和释放都由 JVM 完成,Spark 只能在申请后和释放前记录这些内存
堆外内存
为了进一步优化内存的使用以及提高 Shuffle 时排序的效率,Spark 引入了堆外(Off-heap)内存,使之可以直接在工作节点的系统内存中开辟空间,存储经过序列化的二进制数据。利用 JDK Unsafe API,Spark 可以直接操作系统堆外内存,减少了不必要的内存开销,以及频繁的 GC 扫描和回收,提升了处理性能。堆外内存可以被精确地申请和释放,而且序列化的数据占用的空间可以被精确计算,所以相比堆内内存来说降低了管理的难度,也降低了误差。在默认情况下堆外内存并不启用,可通过配置 spark.memory.offHeap.enabled 参数启用,并由 spark.memory.offHeap.size 参数设定堆外空间的大小。除了没有 other 空间,堆外内存与堆内内存的划分方式相同,所有运行中的并发任务共享存储内存和执行内存。
统一内存管理(动态管理)
统一内存管理示意图如图所示:


其中最重要的优化在于动态占用机制,其规则如下:
- 设定基本的存储内存和执行内存区域(spark.storage.storageFraction 参数),该设定确定了双方各自拥有的空间的范围
- 双方的空间都不足时,则存储到硬盘;若己方空间不足而对方空余时,可借用对方的空间;(存储空间不足是指不足以放下一个完整的 Block)
- 执行内存的空间被对方占用后,可让对方将占用的部分转存到硬盘,然后”归还”借用的空间
- 存储内存的空间被对方占用后,无法让对方”归还”,因为需要考虑 Shuffle 过程中的很多因素,实现起来较为复杂
💬 评论讨论