当不跟随父对象partition数目的shuffle过程发生后,结果的partition会发生改变,这两个参数就是控制这类shuffle过程后,返回对象的partition的

经过实测,得到结论:

spark.sql.shuffle.partitions 作用于dataframe(val df2=df1.shuffle算子(如df1.orderBy()),的df2的partition就是这个参数的值)

spark.default.parallelism 作用于rdd(val rdd2=rdd1.shuffle算子(如rdd1.reduceByKey()),的rdd2的partition就是这个参数的值)

如何查看操作是否有shuffle?善用rdd的toDebugString函数,详见 Spark中的shuffle算子

df也可以先df.rdd.toDebugString查看是否有shuffle发生

另外,也可以说:

  • spark.default.parallelism只有在处理RDD时有效。
  • spark.sql.shuffle.partitions则是只对SparkSQL(产生的是dataframe)有效。

修改方法:

代码中设定:

sqlContext.setConf("spark.sql.shuffle.partitions", "500")
sqlContext.setConf("spark.default.parallelism", "500")

提交任务时设定:

./bin/spark-submit --conf spark.sql.shuffle.partitions=500 --conf spark.default.parallelism=500

官方说明和默认值:

spark.default.parallelismFor distributed shuffle operations like reduceByKey and join, the largest number of partitions in a parent RDD. For operations like parallelize with no parent RDDs, it depends on the cluster manager:
  • Local mode: number of cores on the local machine
  • Mesos fine grained mode: 8
  • Others: total number of cores on all executor nodes or 2, whichever is larger
Default number of partitions in RDDs returned by transformations like joinreduceByKey, and parallelize when not set by user.
spark.sql.shuffle.partitions200(default)

Configures the number of partitions to use when shuffling data for joins or aggregations. 

该参数也决定着sc.sql(...)取数据时的并行度

跟随父对象partition数目的shuffle?比如df的join,df1.join(df2) 返回partition数目根据df1定




参考资料:

Configuration - Spark 2.1.0 Documentation

Performance Tuning - Spark 3.4.0 Documentation

https://www.jianshu.com/p/7442deb21ae0

performance - What is the difference between spark.sql.shuffle.partitions and spark.default.parallelism? - Stack Overflow

当不跟随父对象partition数目的shuffle过程发生后,结果的partition会发生改变,这两个参数就是控制这类shuffle过程后,返回对象的partition的经过实测,得到结论:spark.sql.shuffle.partitions 作用于dataframe(val df2=df1.shuffle算子(如df1.orderBy()),的df2的partition就是这个参... 分配更多的资源 分配更多的资源是性能优化调优的王道,就是增加和分配更多的资源,这对于性能和速度上的提升是显而易见的,基本上,在一定范围之内,增加资源与性能的提升,是成正比的;写完了一个复杂的spark作业之后,进行性能调优的时候,首先第一步,就是要来调节最优的资源配置; 在这个基础之上,如果说你的spark作业,能够分配的资源达到了你的能力范围的顶端之后,无法再分配更多的资源了,公司资源有限;那么才是...
六、Spark Shuffle的配置选项(配置调优) 一、sparkshuffle调优 主要是调整缓冲的大小,拉取次数重试重试次数与等待时间,内存比例分配,是否进行排序操作等等 二、spark.shuffle.file.buffer 参数说明:该参数用于设置shuffle write task的BufferedOutputStream的buffer缓冲大小(默认是32K)。将数据写到磁盘文件之前,会先写入buffer缓冲,待缓冲写满之后,才会溢写到磁盘。 调优建议:如果作业可用的内存资源较为充
spark.reducer.maxSizeInFlight 48m reduce task的buffer缓冲,代表了每个reduce task每次能够拉取的map side数据最大大小,如果内存充足,可以考虑加大,从而减少网络传输次数,提升性能 spark.shuffle.blockTransferService netty shuffle过程,传输数据的方式,两种选项,netty或nio,spark 1.2开始,默认就是netty,比较简单而且性能较高,spark 1.5开始nio就是过期的了,而且spark 1.6会去除掉 spark.shuffle.compress true 是否对map side输出的文件进行压缩,默认是启用压缩的,压缩器是由spark.io.compression.codec属性指定的,默认是snappy压缩器,该压缩器强调的是压缩速度,而不是压缩率
谈谈spark.sql.shuffle.partitions和 spark.default.parallelism 的区别及spark并行度的理解spark.sql.shuffle.partitions和 spark.default.parallelism 的区别spark并行度的理解如何设置spark.sql.shuffle.partitions和spark.default.parallelism的值 spark.sql.shuffle.partitions和 spark.default.parallel
在关于spark任务并行度的设置,有两个参数我们会经常遇到,spark.sql.shuffle.partitions 和 spark.default.parallelism, 那么这两个参数到底有什么区别的? 首先,让我们来看下它们的定义 Property NameDefaultMeaning spark.sql.shuffle.partitions...
1.spark.default.parallelism只对RDD有效,对sparksqlDataFrame、DataSet)无效 2.spark.sql.shuffle.partitions对sparksql的joins和aggregations有效,但其他的无效(对这种情况下,上述的两种配置都无效,我们应该怎么办呢?看第三点) 3.我们可以使用repartition算子对dataframe进行重分区。
通过spark-submit设置spark.sql.shuffle.partitions可以在提交Spark应用程序时指定。可以使用以下命令行参数来设置: spark-submit --conf spark.sql.shuffle.partitions=<num_partitions> ... 其,`<num_partitions>`是你想要设置的分区数。这个参数决定了Spark SQLshuffle操作的并行度,即数据在进行聚合、排序等操作时的分区数。 请注意,`spark.sql.shuffle.partitions`参数只对Spark SQLshuffle操作生效,不会影响其他类型的操作。默认情况下,Spark会根据集群的CPU核心数来自动设置分区数。