相关文章推荐
兴奋的草稿纸  ·  如何从spark scala ...·  1 月前    · 
坚韧的竹笋  ·  NSDate,NSDateFormatter ...·  1 年前    · 
朝气蓬勃的木瓜  ·  c++ - Using the ...·  1 年前    · 

前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住分享一下给大家。点击跳转到网站: https://www.captainai.net/dongkelun

熟悉Spark的分区对于Spark性能调优很重要,本文总结Spark通过各种函数创建RDD、DataFrame时默认的分区数,其中主要和sc.defaultParallelism、sc.defaultMinPartitions以及HDFS文件的Block数量有关,还有很坑的某些情况的默认分区数为1。

  • 如果分区数少,那么并行执行的task就少,特别情况下,分区数为1,即使你分配的Executor很多,而实际执行的Executor只有1个,如果数据很大的话,那么任务执行的就很慢,好像是卡死了~,所以熟悉各种情况下默认的分区数对于Spark调优就很有必要了,特别是执行完算子返回的结果分区数为1的情况,更需要特别注意。(我就被坑过,我已经分配了足够多的Executor、默认的并行度、以及执行之前的数据集分区数,但分区数依然为1)
  • 1、关于 sc.defaultMinPartitions

    sc.defaultMinPartitions=min(sc.defaultParallelism,2)
    也就是sc.defaultMinPartitions只有两个值1和2,当sc.defaultParallelism>1时值为2,当sc.defaultParallelism=1时,值为1
    上面的公式是在源码里

    本文来自dongkelun,讲各种 情况 下的sc.defaultParallelism,defaultMinPartitions,各种 情况 创建 以及转化。熟悉 Spark 分区 对于 Spark 性能调优很重要,本文总结 Spark 通过各种函数 创建 RDD DataFrame 默认 分区 数,其中主要和sc.defaultParallelism、sc.defaultMinPartitions以及HDFS文件的Block数量有关,还有很坑的某些 情况 默认 分区 数为1。如果 分区 数少,那么并行执行的task就少,特别 情况 下, 分区 数为1,即使你分配的Executor很多,而实际执行的Executor只有1个,如果数据 scala> val df = spark . spark Context.make RDD (1.to(100), 4).toDF("id") df: org.apache. spark .sql. DataFrame = [id: int] scala> // 获取 分区 个数 scala> val partition_num=df. rdd .partitions.length partition_num: Int = spark 项目中发现从hive读取的数据, dataframe 无法进行reparation的设置,非得转成 rdd 才可以??? 后来经过查阅资料和测试,原来是submit中设置的参数问题: --conf spark .sql.adaptive.enabled=true 谨慎使用 由于 spark .sql.adaptive.enabled设置成true就是开启动态 分区 了,自定义 分区 数将不管用; 我们设置成 false 后自定义 分区 数就管用了!!! 相关参数: spark .sql.adapt.. object Spark 04 { def main(args: Array[String]): Unit = { //1. 创建 Spark Conf并设置App名称,使用本地模式开启两核 val conf = new Spark Conf().setMaster("local[2]").setAppNa ① Ctrl + B 进入textFiletextFile底层其实就是通过去读文件② Ctrl + B 进入hadoopFile>>可以发现里面New了一个Hadoop RDD 实例③Ctrl + B 进入Hadoop RDD 类重点关注里面的方法>> 拆解分析1:最终返回的是一个partition数组>> 拆解分析2:inputSplits是通过getInputFormat(jobConf).getSplits(jobConf, minPartitions)获取,所以需要进入方法。 hdfs中的block是分布式存储的最小单元,类似于盛放文件的盒子,一个文件可能要占多个盒子,但一个盒子里的内容只可能来自同一份文件。partition 是指的 spark 在计算过程中,生成的数据在计算空间内最小单元,同一份数据( RDD )的partition 大小不一,数量不定,是根据application里的算子和最初读入的数据分块数量决定的,这也是为什么叫“弹性分布式”数据集的原因之一。我们需要注意的是,只有Key-Value类型的 RDD 才有 分区 的,非Key-Value类型的 RDD 分区 的值是None的。 现如今 Spark 已经得到了几乎所有大数据企业的认可,而这些企业也迅速将自己的产品与 Spark 进行了紧密地集成。所以,作为现在最热门的几大分布式大数据计算引擎之一, Spark 几乎是大数据工程师的必修课,而 RDD 作为 Spark 框架的灵魂所在,也是我们所必须熟悉并掌握的。 今天有这么个需求 与本篇相关的部分就是:我需要从hive的一张表 根据其中某个字段去提取timestamp 作为时间 分区 字段 用insert into 得开启动态 分区 再一个问题就是 系统需要新建 分区 几千个。根据报错内容 查了一下 因为hive on spark 环境jar包问题所致 但是懒得去弄了。成功解决引擎问题 小文件合并问题 动态 分区 数过多超限问题。我们是用的hive on spark ,一开始就报错。然后查看yarn的日志里 报错内容。 spark 读取hive table时 分区 数由什么决定 这个话题比较复杂,他不是一个单一的指标的来决定。主要是取决于表存储的文件大小,hdfs的块文件大小,hive是否是 分区 表,如果是 分区 表一个hive表 分区 一个hadoop RDD , 最后会union成一个 RDD ,如果不是 分区 表就是一个Hadoop RDD 。如果hive底层存储单个文件大小超过块大小,这一个文件的读取就会被hdfs的块大小除以max(“mapreduce.job.maps”,2)来划分,如果块大于文件大小就使用文件大小除以max(“mapred