背景介绍:
最近在对一个Spark任务进行调优时,在260G的输入数据上跑,总会在执行8-9小时后抛出Too large frame的异常。对此异常进行深入了解,也尝试了很多解决办法,现将其总结以备往后参考。
Too large frame异常的原因:
-
Spark抛出Too large frame异常,是因为Spark对每个partition所能包含的数据大小有写死的限制(约为2G),当某个partition包含超过此限制的数据时,就会抛出Too large frame Exception。
-
造成此异常的根本原因是源数据的某一列(或某几列)分布不均匀,即发生了倾斜,当某个shuffle操作是根据此列数据进行shuffle时,就会造成整个数据集发生倾斜,即某几个partition包含了大量数据,并且其数据大小超过了Spark的限制,而其他partition都包含很少的数据。
解决方案:
1. 在Join时对小表(不超过10M)进行广播(Broadcast):
2. 在每个Executor中创建多个worker,利用多核机器增加并行度:
3. 增大DataFrame的join等类shuffle操作时的partition数量:
-
在上面设置spark.executor.cores提高机器并行度后,再设置参数spark.sql.shuffle.partitions=500,此处的500应该替换为你的executor的数量 * 每台executor的核数 * 2或者3,这样增大partition数目后可以想象每个partition包含的数据大小就会减小;
-
但是如果数据本身是倾斜的,而spark.sql.shuffle.partitions的值设置得过大,则会造成某些partition数据量超过限制,而很多其他partition几乎没数据,所以也不能完全解决问题;
4. 尽可能地不要使用RDD,除非某些mllib中的算法只接受RDD作为输入数据并且在ml包中找不到对应算法:
-
比如频繁项集算法PrefixSpan只有mllib包中有,则只能将DataFrame转为RDD后喂给它;
-
除此之外,大多数的RDD操作都有其对应的DataFrame算子。很多人在数据预处理阶段,将DataFrame转为RDD的原因是想使用RDD的groupByKey, reduceByKey 或 aggregateByKey 算子,因为其相对于DataFrame的groupby方法更加灵活。但在DataFrame中其实也可以实现类似的操作(使用collect_list函数,具体用法烦请Google)。
5. 上述方法是遇到Too large frame报错时需要首先尝试,同时也是最简单的方法。但是,如果数据分布本身就是倾斜的,上述方法只是治标不治本。此时就需要尝试人工repartition:
val generatePartitionKey = (d: Double) => {
(d * 300).round // make the dataframe uniformly partitioned into 300 blocks.
val generatePartitionKeyUDF = udf(generatePartitionKey)
journeysFiltered
.select($"lvl2_journey", generatePartitionKeyUDF(rand()).alias("partition_key"))
.repartition(300, $"partition_key")
.drop($"partition_key")
.rdd.map(row => row.getAs[Seq[Long]](0).map(r => Array(r)).toArray)
6. 除了手工进行repartition之外,spark也提供一个自动partition的功能,设置参数spark.repartitioning=true即可启用,但是我还没有具体使用过,不知道其效果如何,有待后续探索。
1.org.apache.
spark
.
Spark
Exception: Kryo serialization failed: Buffer overflow
原因:kryo序列化缓存空间不足。
解决方法:增加参数,--conf
spark
.kryoserializer.buffer.max=2047m。
2.org.elasticsearch.hadoop.rest.EsHadoopNoN...
FetchFailed(BlockManagerId(92, hadoop1136.prod.2dfire.info, 7337, None),
shuffle
Id=4, mapId=42, reduceId=42, message=
org.apache.
spark
.
shuffle
.FetchFailedException: Too
large
frame
: 2292863140...
为了提高
shuffle
的性能并提高资源利用率,Facebook 开发了
Spark
-optimized
Shuffle
(SOS) 。 这种
shuffle
技术有效地将大量小的
shuffle
读请求转换成少并且大的顺序 I/O 请求。目前这个技术于2018年4月已经在 Facebook 大规模使用了,作业整体的 I/O 提升了两倍,计算效率提高10%。值得高兴的是,这项技术 Facebook 打算共享给社区。
本地址是这项技术的视频介绍。关注Hadoop技术博文(iteblog_hadoop) 公众号并回复 sos 获取本文相关ppt及相关技术论文。
新公司遇到的第一个
spark
的坑,寻找原因的过程其实还挺有意思,最终在源码和
spark
ui上的统计数据的帮助下找到根源,具体如下。
先说下问题
由于严重的数据倾斜,大量数据集中在单个task中,导致
shuffle
过程中发生异常
完整的exeception是这样的
但奇怪的是,经过尝试减小executor数量后任务反而成功,增大反而失败,经过多次测试,...
本文结构Background前言Can Fetch描述
优化
方案相关链接Fetch Efficiently描述
优化
方案相关链接Reliable Fetch
Shuffle
W...
11.ERROR
shuffle
.RetryingBlockFetcher: Failed to fetch block
shuffle
_7_18444_7412, and will not retry
原因:Executor被kill,无法拉取该block。可能是开启AE特性时数据倾斜造成的,其他executor都已完成工作被回收,只有倾斜的exec..
经常会遇到 类似org.apache.
spark
.
shuffle
.FetchFailedException: java.io.FileNotFoundException。主要是
shuffle
导致内存不足引起的。
shuffle
分为
shuffle
read 和write 。
1.
shuffle
read 是读其它partition 发来的数据,从缓存中读。
2. shu...
Dpark内存溢出
Spark
内存溢出堆内内存溢出堆外内存溢出堆内内存溢出java.lang.OutOfMemoryError: GC overhead limit execeededjava.lang.OutOfMemoryError: Java heap space具体说明Heap size JVM堆的设置是指java程序运行过程中JVM可以调配使用的内存空间的设置.JVM在启动的时候会自动设...