背景介绍:

最近在对一个Spark任务进行调优时,在260G的输入数据上跑,总会在执行8-9小时后抛出Too large frame的异常。对此异常进行深入了解,也尝试了很多解决办法,现将其总结以备往后参考。

Too large frame异常的原因:

  • Spark抛出Too large frame异常,是因为Spark对每个partition所能包含的数据大小有写死的限制(约为2G),当某个partition包含超过此限制的数据时,就会抛出Too large frame Exception。

  • 造成此异常的根本原因是源数据的某一列(或某几列)分布不均匀,即发生了倾斜,当某个shuffle操作是根据此列数据进行shuffle时,就会造成整个数据集发生倾斜,即某几个partition包含了大量数据,并且其数据大小超过了Spark的限制,而其他partition都包含很少的数据。

解决方案:

1. 在Join时对小表(不超过10M)进行广播(Broadcast):

  • 这样可以对大表完全避免shuffle操作,特别是当大表特别大时,效果立竿见影;

  • 如果报TimeOut的错,说明小表太大,没有在规定的时间内通过网络传输完,解决方法为设置spark.sql.broadcastTimeout=3600(单位为s);

2. 在每个Executor中创建多个worker,利用多核机器增加并行度:

  • 设置参数spark.executor.cores,在yarn模式下该参数默认值为1,请将其设置为机器的核数。注意一般的Spark集群会对每个Spark任务可申请的资源最大值进行限制,在笔者所使用的集群中,Max Allocation的限制值为Memory 100000M, Cores 26,意味着spark.executor.cores的值不能超过26,否则任务提交会失败。

3. 增大DataFrame的join等类shuffle操作时的partition数量:

  • 在上面设置spark.executor.cores提高机器并行度后,再设置参数spark.sql.shuffle.partitions=500,此处的500应该替换为你的executor的数量 * 每台executor的核数 * 2或者3,这样增大partition数目后可以想象每个partition包含的数据大小就会减小;

  • 但是如果数据本身是倾斜的,而spark.sql.shuffle.partitions的值设置得过大,则会造成某些partition数据量超过限制,而很多其他partition几乎没数据,所以也不能完全解决问题;

4. 尽可能地不要使用RDD,除非某些mllib中的算法只接受RDD作为输入数据并且在ml包中找不到对应算法:

  • 比如频繁项集算法PrefixSpan只有mllib包中有,则只能将DataFrame转为RDD后喂给它;

  • 除此之外,大多数的RDD操作都有其对应的DataFrame算子。很多人在数据预处理阶段,将DataFrame转为RDD的原因是想使用RDD的groupByKey, reduceByKey 或 aggregateByKey 算子,因为其相对于DataFrame的groupby方法更加灵活。但在DataFrame中其实也可以实现类似的操作(使用collect_list函数,具体用法烦请Google)。

5. 上述方法是遇到Too large frame报错时需要首先尝试,同时也是最简单的方法。但是,如果数据分布本身就是倾斜的,上述方法只是治标不治本。此时就需要尝试人工repartition:

  • 以下示例是我们的数据预处理阶段即将结束的地方,此时需要将DataFrame转换为RDD以便喂给PrefixSpan算法,但是df.rdd是一个shuffle操作,每次执行到这里都会抛出Too large frame异常进而使整个任务失败。所以我们需要在将DataFrame转换为RDD之前对DataFrame进行repartition,使得其在每个partition中分布的数据量大致相同(uniformly repartition)。

  • 手工partition的代码如下:

val generatePartitionKey = (d: Double) => {

(d * 300).round // make the dataframe uniformly partitioned into 300 blocks.

val generatePartitionKeyUDF = udf(generatePartitionKey)

journeysFiltered

.select($"lvl2_journey", generatePartitionKeyUDF(rand()).alias("partition_key"))

.repartition(300, $"partition_key")

.drop($"partition_key")

.rdd.map(row => row.getAs[Seq[Long]](0).map(r => Array(r)).toArray)

6. 除了手工进行repartition之外,spark也提供一个自动partition的功能,设置参数spark.repartitioning=true即可启用,但是我还没有具体使用过,不知道其效果如何,有待后续探索。

1.org.apache. spark . Spark Exception: Kryo serialization failed: Buffer overflow 原因:kryo序列化缓存空间不足。 解决方法:增加参数,--conf spark .kryoserializer.buffer.max=2047m。 2.org.elasticsearch.hadoop.rest.EsHadoopNoN... FetchFailed(BlockManagerId(92, hadoop1136.prod.2dfire.info, 7337, None), shuffle Id=4, mapId=42, reduceId=42, message= org.apache. spark . shuffle .FetchFailedException: Too large frame : 2292863140... 为了提高 shuffle 的性能并提高资源利用率,Facebook 开发了 Spark -optimized Shuffle (SOS) 。 这种 shuffle 技术有效地将大量小的 shuffle 读请求转换成少并且大的顺序 I/O 请求。目前这个技术于2018年4月已经在 Facebook 大规模使用了,作业整体的 I/O 提升了两倍,计算效率提高10%。值得高兴的是,这项技术 Facebook 打算共享给社区。 本地址是这项技术的视频介绍。关注Hadoop技术博文(iteblog_hadoop) 公众号并回复 sos 获取本文相关ppt及相关技术论文。 新公司遇到的第一个 spark 的坑,寻找原因的过程其实还挺有意思,最终在源码和 spark ui上的统计数据的帮助下找到根源,具体如下。 先说下问题 由于严重的数据倾斜,大量数据集中在单个task中,导致 shuffle 过程中发生异常 完整的exeception是这样的 但奇怪的是,经过尝试减小executor数量后任务反而成功,增大反而失败,经过多次测试,... 本文结构Background前言Can Fetch描述 优化 方案相关链接Fetch Efficiently描述 优化 方案相关链接Reliable Fetch Shuffle W... 11.ERROR shuffle .RetryingBlockFetcher: Failed to fetch block shuffle _7_18444_7412, and will not retry 原因:Executor被kill,无法拉取该block。可能是开启AE特性时数据倾斜造成的,其他executor都已完成工作被回收,只有倾斜的exec..   经常会遇到 类似org.apache. spark . shuffle .FetchFailedException: java.io.FileNotFoundException。主要是 shuffle 导致内存不足引起的。 shuffle 分为 shuffle read 和write 。   1. shuffle read 是读其它partition 发来的数据,从缓存中读。   2. shu... Dpark内存溢出 Spark 内存溢出堆内内存溢出堆外内存溢出堆内内存溢出java.lang.OutOfMemoryError: GC overhead limit execeededjava.lang.OutOfMemoryError: Java heap space具体说明Heap size JVM堆的设置是指java程序运行过程中JVM可以调配使用的内存空间的设置.JVM在启动的时候会自动设...