当不跟随父对象partition数目的shuffle过程发生后,结果的partition会发生改变,这两个参数就是控制这类shuffle过程后,返回对象的partition的
经过实测,得到结论:
spark.sql.shuffle.partitions 作用于dataframe(val df2=df1.shuffle算子(如df1.orderBy()),的df2的partition就是这个参数的值)
spark.default.parallelism 作用于rdd(val rdd2=rdd1.shuffle算子(如rdd1.reduceByKey()),的rdd2的partition就是这个参数的值)
如何查看操作是否有shuffle?善用rdd的toDebugString函数,详见
Spark中的shuffle算子
df也可以先df.rdd.toDebugString查看是否有shuffle发生
另外,也可以说:
-
spark.default.parallelism只有在处理RDD时有效。
-
spark.sql.shuffle.partitions则是只对SparkSQL(产生的是dataframe)有效。
修改方法:
代码中设定:
sqlContext.setConf("spark.sql.shuffle.partitions", "500")
sqlContext.setConf("spark.default.parallelism", "500")
提交任务时设定
:
./bin/spark-submit --conf spark.sql.shuffle.partitions=500 --conf spark.default.parallelism=500
官方说明和默认值:
spark.default.parallelism | For distributed shuffle operations like reduceByKey and join , the largest number of partitions in a parent RDD. For operations like parallelize with no parent RDDs, it depends on the cluster manager:
- Local mode: number of cores on the local machine
- Mesos fine grained mode: 8
- Others: total number of cores on all executor nodes or 2, whichever is larger
| Default number of partitions in RDDs returned by transformations like join , reduceByKey , and parallelize when not set by user. |
spark.sql.shuffle.partitions | 200(default) | Configures the number of partitions to use when shuffling data for joins or aggregations. 该参数也决定着sc.sql(...)取数据时的并行度 |
跟随父对象partition数目的shuffle?比如df的join,df1.join(df2) 返回partition数目根据df1定
参考资料:
Configuration - Spark 2.1.0 Documentation
Performance Tuning - Spark 3.4.0 Documentation
https://www.jianshu.com/p/7442deb21ae0
performance - What is the difference between spark.sql.shuffle.partitions and spark.default.parallelism? - Stack Overflow
当不跟随父对象partition数目的shuffle过程发生后,结果的partition会发生改变,这两个参数就是控制这类shuffle过程后,返回对象的partition的经过实测,得到结论:spark.sql.shuffle.partitions 作用于dataframe(val df2=df1.shuffle算子(如df1.orderBy()),的df2的partition就是这个参...
分配更多的资源
分配更多的资源是性能优化调优的王道,就是增加和分配更多的资源,这对于性能和速度上的提升是显而易见的,基本上,在一定范围之内,增加资源与性能的提升,是成正比的;写完了一个复杂的spark作业之后,进行性能调优的时候,首先第一步,就是要来调节最优的资源配置;
在这个基础之上,如果说你的spark作业,能够分配的资源达到了你的能力范围的顶端之后,无法再分配更多的资源了,公司资源有限;那么才是...
六、Spark Shuffle的配置选项(配置调优)
一、spark 的shuffle调优
主要是调整缓冲的大小,拉取次数重试重试次数与等待时间,内存比例分配,是否进行排序操作等等
二、spark.shuffle.file.buffer
参数说明:该参数用于设置shuffle write task的BufferedOutputStream的buffer缓冲大小(默认是32K)。将数据写到磁盘文件之前,会先写入buffer缓冲中,待缓冲写满之后,才会溢写到磁盘。
调优建议:如果作业可用的内存资源较为充
spark.reducer.maxSizeInFlight 48m reduce task的buffer缓冲,代表了每个reduce task每次能够拉取的map side数据最大大小,如果内存充足,可以考虑加大,从而减少网络传输次数,提升性能
spark.shuffle.blockTransferService netty shuffle过程中,传输数据的方式,两种选项,netty或nio,spark 1.2开始,默认就是netty,比较简单而且性能较高,spark 1.5开始nio就是过期的了,而且spark 1.6中会去除掉
spark.shuffle.compress true 是否对map side输出的文件进行压缩,默认是启用压缩的,压缩器是由spark.io.compression.codec属性指定的,默认是snappy压缩器,该压缩器强调的是压缩速度,而不是压缩率
谈谈spark.sql.shuffle.partitions和 spark.default.parallelism 的区别及spark并行度的理解spark.sql.shuffle.partitions和 spark.default.parallelism 的区别spark并行度的理解如何设置spark.sql.shuffle.partitions和spark.default.parallelism的值
spark.sql.shuffle.partitions和 spark.default.parallel
在关于spark任务并行度的设置中,有两个参数我们会经常遇到,spark.sql.shuffle.partitions 和 spark.default.parallelism, 那么这两个参数到底有什么区别的?
首先,让我们来看下它们的定义
Property NameDefaultMeaning
spark.sql.shuffle.partitions...
1.spark.default.parallelism只对RDD有效,对sparksql(DataFrame、DataSet)无效
2.spark.sql.shuffle.partitions对sparksql中的joins和aggregations有效,但其他的无效(对这种情况下,上述的两种配置都无效,我们应该怎么办呢?看第三点)
3.我们可以使用repartition算子对dataframe进行重分区。
通过spark-submit设置spark.sql.shuffle.partitions可以在提交Spark应用程序时指定。可以使用以下命令行参数来设置:
spark-submit --conf spark.sql.shuffle.partitions=<num_partitions> ...
其中,`<num_partitions>`是你想要设置的分区数。这个参数决定了Spark SQL中shuffle操作的并行度,即数据在进行聚合、排序等操作时的分区数。
请注意,`spark.sql.shuffle.partitions`参数只对Spark SQL的shuffle操作生效,不会影响其他类型的操作。默认情况下,Spark会根据集群的CPU核心数来自动设置分区数。