SparkSQL执行时参数优化
近期接手了不少 大数据 表任务调度补数据的工作,补数时发现资源消耗异常的大且运行速度却不怎么给力.
发现根本原因在于sparkSQL配置有诸多问题,解决后总结出来就当抛砖引玉了.
-
具体现象
- 内存CPU比例失调 一个Spark任务消耗 120(executor)*4G = 480G内存仅仅使用120个 core.几个SprakSQL任务就将整个系统资源吃光.
- 设置超过40个executor,但未指定分区数,导致多数executor空闲.
- 原因分析
- SparkSQL 配置时Core与内存比例不恰当
- 没有指定executor核心数
-
未进行其他配置参数优化
- 在配置 SparkSQL 任务时指定executor核心数 建议为4 (同一 executor [进程]内内存共享,当数据倾斜时,使用相同核心数与内存量的两个任务, executor总量少 的任务不容易OOM,因为单核心最大可用内存大.但是并非越大越好,因为单个exector最大core受 服务器 剩余core数量限制, 过大的core 数量可能导致资源分配不足)
- 设置spark.default.parallelism=600 每个stage的默认task数量 (计算公式为 num-executors * executor-cores 系统默认值分区为40,这是导致executor并行度上不去的罪魁祸首,之所以这样计算是为了尽量避免计算 最慢的task 决定整个stage的时间,将其设置为总核心的2-3倍,让运行快的task可以继续领取任务计算直至全部任务计算完毕)
- 开启spark.sql.auto.repartition=true 自动重新分区 (每个 stage [阶段]运行时分区并不尽相同,使用此配置可优化计算后分区数,避免分区数过大导致单个分区数据量过少,每个task运算分区数据时时间过短,从而导致task频繁调度消耗过多时间)
- 设置spark.sql.shuffle.partitions=400 提高shuffle并行度 (shuffle read task的并行度)
- 设置spark.shuffle.service.enabled=true 提升shuffle效率 --!并未测试 (Executor 进程除了运行task 也要进行写shuffle 数据,当Executor进程任务过重时,导致GC不能为其他Executor提供shuffle数据时将会影响效率.此服务开启时代替Executor来抓取shuffle数据)
前后资源配置对比
类型 |
内存数量 |
cpu核心数量 |
executor数量 |
executor内存 |
单核心内存 |
---|---|---|---|---|---|
系统资源总量 |
7168G |
3500 |
- |
- |
2G |
目前一个任务 |
480G |
120 |
120 |
4G |
4G |
优化后 |
480G |
240 |
60 |
8G |
2G |
以下为SparkSQL调优相关设置
以下列表中动态资源分配相关不建议使用
//1.下列Hive参数对Spark同样起作用。 set hive.exec.dynamic.partition=true; // 是否允许动态生成分区 set hive.exec.dynamic.partition.mode=nonstrict; // 是否容忍指定分区全部动态生成 set hive.exec.max.dynamic.partitions = 100; // 动态生成的最多分区数
//2.运行行为 set spark.sql.autoBroadcastJoinThreshold; // 大表 JOIN 小表,小表做广播的阈值 set spark.dynamicAllocation.enabled; // 开启动态资源分配 set spark.dynamicAllocation.maxExecutors; //开启动态资源分配后,最多可分配的Executor数 set spark.dynamicAllocation.minExecutors; //开启动态资源分配后,最少可分配的Executor数 set spark.sql.shuffle.partitions; // 需要shuffle是mapper端写出的partition个数 set spark.sql.adaptive.enabled; // 是否开启调整partition功能,如果开启,spark.sql.shuffle.partitions设置的partition可能会被合并到一个reducer里运行 set spark.sql.adaptive.shuffle.targetPostShuffleInputSize; //开启spark.sql.adaptive.enabled后,两个partition的和低于该阈值会合并到一个reducer