Spark RDD案例(二)连续数据统计

1. 背景

  1. Spark作为大数据分析引擎,本身可以做离线和准实时数据处理
  2. Spark抽象出的操作对象如RDD、dataSet、dataFrame、DStream等都是高层级的抽象,屏蔽了分布式数据处理代码细节,操作分布式数据和处理就像使用scala集合接口一样便利。这样可以很大降低编程使用和理解门槛。
  3. 在实际生产中,大数据处理面临的业务需求和正常java 业务需求一样,都是基于数据做处理。不同的是正常java业务数据相对较少,如mysql中适合存储的数据是小而美的如500万行数据及以下,而大数据存储500万行才达到海量数据存储的门槛。
  4. 实际生产中,大数据和小批量Java数据处理需求往往类似,如连续数据统计,如连续签到,连续参与,连续登录等等业务场景,本文就是关于类似需求的简化版本。

2. 案例

  1. 需求
    连续登录3天及以上用户
  2. 数据
guid01,2018-02-28
guid01,2018-03-01
guid01,2018-03-05
guid01,2018-03-02
guid01,2018-03-04
guid01,2018-03-06
guid01,2018-03-07
guid02,2018-03-01
guid02,2018-03-03
guid02,2018-03-02
guid02,2018-03-06
  • Idea 2020
  • maven 2.6.3
  • pom文件
<!-- 定义了一些常量 -->
    <properties>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <scala.version>2.12.10</scala.version>
        <spark.version>3.0.1</spark.version>
        <hbase.version>2.2.5</hbase.version>
        <hadoop.version>3.2.1</hadoop.version>
        <encoding>UTF-8</encoding>
    </properties>
    <dependencies>
        <!-- 导入scala的依赖 -->
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
            <!-- 编译时会引入依赖,打包是不引入依赖 -->
            <!--            <scope>provided</scope>-->
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.12</artifactId>
            <version>${spark.version}</version>
            <!-- 编译时会引入依赖,打包是不引入依赖 -->
            <!--            <scope>provided</scope>-->
        </dependency>
        <!-- https://mvnrepository.com/artifact/com.alibaba/fastjson -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.73</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.47</version>
        </dependency>
    </dependencies>
    <build>
        <pluginManagement>
            <plugins>
                <!-- 编译scala的插件 -->
                <plugin>
                    <groupId>net.alchim31.maven</groupId>
                    <artifactId>scala-maven-plugin</artifactId>
                    <version>3.2.2</version>
                </plugin>
                <!-- 编译java的插件 -->
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>3.5.1</version>
                </plugin>
            </plugins>
        </pluginManagement>
        <plugins>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <executions>
                    <execution>
                        <id>scala-compile-first</id>
                        <phase>process-resources</phase>
                        <goals>
                            <goal>add-source</goal>
                            <goal>compile</goal>
                        </goals>
                    </execution>
                    <execution>
                        <id>scala-test-compile</id>
                        <phase>process-test-resources</phase>
                        <goals>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <executions>
                    <execution>
                        <phase>compile</phase>
                        <goals>
                            <goal>compile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <!-- 打jar插件 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>2.4.3</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
  • scala 2.12.12

2.1 代码一

package com.doit.practice
import java.text.SimpleDateFormat
import java.util.{Calendar, Date}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object ContinuedLogin {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("ContinuedLogin").setMaster("local[*]")
    val sc = new SparkContext(conf)
    // guid01,2018-02-28
    val rdd1: RDD[String] = sc.textFile("E:\\DOITLearning\\12.Spark\\continueLoginSourceData.txt")
    // 连续3天及以上登录的id、开始时间,结束时间,天数
    // 最后调用distinct去重,因为一天只需要有一条数据即可
    val mapedRDD: RDD[(String, String)] = rdd1.map(line => {
      val strings: Array[String] = line.split("\\,")
      val uid: String = strings(0)
      val date: String = strings(1)
      (uid, date)
    }).distinct()
    println("mapedRDD: " + mapedRDD.collect().toBuffer)
    val groupedRDD: RDD[(String, Iterable[String])] = mapedRDD.groupByKey()
    println("groupedRDD: " + groupedRDD.collect().toBuffer)
    * groupedRDD: ArrayBuffer((guid01,CompactBuffer(2018-02-28, 2018-03-01, 2018-03-05, 2018-03-02, 2018-03-04, 2018-03-06, 2018-03-07)),
    *  (guid02,CompactBuffer(2018-03-01, 2018-03-03, 2018-03-02, 2018-03-06)))
    val rowNumberedRDD: RDD[(String, (String, Int))] = groupedRDD.flatMapValues(iter => {
      val sorted: List[String] = iter.toList.sorted
      // 将每个uid对应的日期排序,升序排列,然后给数据配上rowNumber
      var index = 0
      val list: List[(String, Int)] = sorted.map(ele => {
        index += 1
        (ele, index)
    println("rowNumberedRDD: " + rowNumberedRDD.collect().toBuffer)
    * 得到数据就是展平后的结果,按照uid划分,uid作为key,日期和rownumber组成的元组作为value,注意日期也是升序排列
    * rowNumberedRDD: ArrayBuffer((guid01,(2018-02-28,1)), (guid01,(2018-03-01,2)), (guid01,(2018-03-02,3)), (guid01,(2018-03-04,4)), (guid01,(2018-03-05,5)),
    *  (guid01,(2018-03-06,6)), (guid01,(2018-03-07,7)), (guid02,(2018-03-01,1)), (guid02,(2018-03-02,2)), (guid02,(2018-03-03,3)), (guid02,(2018-03-06,4)))
    // 要对日期做差值计算,也就是日期减去rownumber得到差值,如果日期连续,rownumber连续,那差值就会一样,再根据差值分组,就可以得到初步的结果数据
    val diffMapedRDD: RDD[((String, String), (String, String, Int))] = rowNumberedRDD.mapPartitions(iter => {
      // 注意这里需要用到日期加减,日期格式化来操作数据
      // 使用mapPartitions,可以一个分区只创建一个对象,方便复用,内存占用更小一些
      val simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd")
      val calendar: Calendar = Calendar.getInstance()
      // (guid01,(2018-02-28,1))
      // 注意mapPartitions需要返回迭代器或者一个集合,这里使用map刚合适,因为会生成一个迭代器结果
      iter.map(ele => {
        val uid: String = ele._1
        val currentTimeStr: String = ele._2._1
        val rownumber: Int = ele._2._2
        val date: Date = simpleDateFormat.parse(currentTimeStr)
        calendar.setTime(date)
        calendar.add(Calendar.DAY_OF_MONTH, -rownumber)
        val diffDate: Date = calendar.getTime
        val diffDateStr: String = simpleDateFormat.format(diffDate)
        ((uid, diffDateStr), (currentTimeStr, "", 1))
    println("diffMapedRDD: " + diffMapedRDD.collect().toBuffer)
    * 这里已经做下一步的处理,就是把相同uid,相同日期差值数据分区,  ((uid, diffDateStr),currentTimeStr)
    * diffMapedRDD: ArrayBuffer(((guid01,2018-02-27),2018-02-28), ((guid01,2018-02-27),2018-03-01), ((guid01,2018-02-27),2018-03-02),
    * ((guid01,2018-02-28),2018-03-04), ((guid01,2018-02-28),2018-03-05), ((guid01,2018-02-28),2018-03-06), ((guid01,2018-02-28),2018-03-07),
    * ((guid02,2018-02-28),2018-03-01), ((guid02,2018-02-28),2018-03-02), ((guid02,2018-02-28),2018-03-03), ((guid02,2018-03-02),2018-03-06))
    * 这里可以使用reduceByKey对数据做分区聚合,可以得到相同key下,最小日期,最大日期,累计次数,所以上述的map阶段数据还需要调整一下,改为((uid, diffDateStr), (currentTimeStr, "", 1))
    * diffMapedRDD: ArrayBuffer(((guid01,2018-02-27),(2018-02-28,,1)), ((guid01,2018-02-27),(2018-03-01,,1)), ((guid01,2018-02-27),(2018-03-02,,1)),
    * ((guid01,2018-02-28),(2018-03-04,,1)), ((guid01,2018-02-28),(2018-03-05,,1)), ((guid01,2018-02-28),(2018-03-06,,1)), ((guid01,2018-02-28),(2018-03-07,,1)),
    *  ((guid02,2018-02-28),(2018-03-01,,1)), ((guid02,2018-02-28),(2018-03-02,,1)), ((guid02,2018-02-28),(2018-03-03,,1)), ((guid02,2018-03-02),(2018-03-06,,1)))
    // 这里需要对value做操作,也就是 ((uid, diffDateStr), (currentTimeStr, "", 1)) 这里的value--三元素的tuple
    // 因为是reduce,两两比较,所以可以使用局部聚合操作
    val reducedRDD: RDD[((String, String), (String, String, Int))] = diffMapedRDD.reduceByKey((tuple3_1, tuple3_2) => {
      // 这里可以得到相同key--(uid, diffDateStr)下,最小日期,最大日期,
      (Ordering[String].min(tuple3_1._1, tuple3_2._1), Ordering[String].max(tuple3_1._1, tuple3_2._1), tuple3_1._3 + tuple3_2._3)
    println("reducedRDD: " + reducedRDD.collect().toBuffer)
    * 这是结果,需要筛选出符合要求的连续次数大于等于三
    * reducedRDD: ArrayBuffer(((guid01,2018-02-27),(2018-02-28,2018-03-02,3)), ((guid02,2018-02-28),(2018-03-01,2018-03-03,3)),
    * ((guid01,2018-02-28),(2018-03-04,2018-03-07,4)), ((guid02,2018-03-02),(2018-03-06,,1)))
    val filteredRDD: RDD[((String, String), (String, String, Int))] = reducedRDD.filter(ele => ele._2._3 >= 3)
    println("filteredRDD: " + filteredRDD.collect().toBuffer)
    *  已经筛选出结果
    * filteredRDD: ArrayBuffer(
    * ((guid01,2018-02-27),(2018-02-28,2018-03-02,3)), 
    * ((guid02,2018-02-28),(2018-03-01,2018-03-03,3)), 
    * ((guid01,2018-02-28),(2018-03-04,2018-03-07,4)))
    sc.stop(
filteredRDD: ArrayBuffer(((guid01,2018-02-27),(2018-02-28,2018-03-02,3)), ((guid02,2018-02-28),(2018-03-01,2018-03-03,3)), ((guid01,2018-02-28),(2018-03-04,2018-03-07,4)))

2.2 代码二

package com.doit.practice
import java.text.SimpleDateFormat
import java.util.{Calendar, Date}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
object ContinuedLogin2 {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("ContinuedLogin").setMaster("local[*]")
    val sc = new SparkContext(conf)
    // guid01,2018-02-28
    val rdd1: RDD[String] = sc.textFile("E:\\DOITLearning\\12.Spark\\continueLoginSourceData.txt")
    val mapedRDD: RDD[(String, String)] = rdd1.map(line => {
      val strings: Array[String] = line.split("\\,")
      (strings(0), strings(1))
    }).distinct()
    println("mapedRDD: " + mapedRDD.collect().toBuffer)
    * mapedRDD: ArrayBuffer((guid02,2018-03-06), (guid02,2018-03-01), (guid01,2018-03-05),
    * (guid01,2018-03-01), (guid01,2018-03-07), (guid01,2018-03-02), (guid01,2018-03-04),
    *  (guid01,2018-02-28), (guid02,2018-03-03), (guid02,2018-03-02), (guid01,2018-03-06))
    val groupedRDD: RDD[(String, Iterable[String])] = mapedRDD.groupByKey()
    println("groupedRDD: " + groupedRDD.collect().toBuffer)
    * groupedRDD: ArrayBuffer((guid01,CompactBuffer(2018-03-05, 2018-03-01, 2018-03-07, 2018-03-02, 2018-03-04, 2018-02-28, 2018-03-06)),
    * (guid02,CompactBuffer(2018-03-06, 2018-03-01, 2018-03-03, 2018-03-02)))
    val diffMapedRDD: RDD[(String, (String, Int, String))] = groupedRDD.flatMapValues(iter => {
      val sorted: List[String] = iter.toList.sorted
      val simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd")
      val calendar: Calendar = Calendar.getInstance()
      var rownumber = 0
      // 使用map对内部数据做转换
      sorted.map(ele => {
        rownumber += 1
        // 这里可以把数据按照uid,rownumber, diffDate组合起来
        val date: Date = simpleDateFormat.parse(ele)
        calendar.setTime(date)
        calendar.add(Calendar.DATE, -rownumber)
        val diffDate: Date = calendar.getTime
        val diffDateStr: String = simpleDateFormat.format(diffDate)
        (ele, rownumber, diffDateStr)
    println("diffMapedRDD: " + diffMapedRDD.collect().toBuffer)
    * diffMapedRDD: ArrayBuffer((guid01,(2018-02-28,1,2018-02-27)), (guid01,(2018-03-01,2,2018-02-27)),
    * (guid01,(2018-03-02,3,2018-02-27)), (guid01,(2018-03-04,4,2018-02-28)),
    * (guid01,(2018-03-05,5,2018-02-28)), (guid01,(2018-03-06,6,2018-02-28)),
    * (guid01,(2018-03-07,7,2018-02-28)), (guid02,(2018-03-01,1,2018-02-28)),
    * (guid02,(2018-03-02,2,2018-02-28)), (guid02,(2018-03-03,3,2018-02-28)),
    * (guid02,(2018-03-06,4,2018-03-02)))
    // (ele, rownumber, diffDateStr)
    // 需要转换一下,让数据以uid, diffdate的元组作为key,将当前日期, null, 数字1的元组作为value。
    // 这样可以将数据使用reduceByKey一次性都计算出来,包括开始时间,结束时间,累计天数
    val diffReMapedRDD: RDD[((String, String), (String, String, Int))] = diffMapedRDD.map(ele => {
      ((ele._1, ele._2._3), (ele._2._1, "", 1))
    println("diffReMapedRDD: " + diffReMapedRDD.collect().toBuffer)
    * diffReMapedRDD: ArrayBuffer(((guid01,2018-02-27),(2018-02-28,null,1)), ((guid01,2018-02-27),
    * (2018-03-01,null,1)), ((guid01,2018-02-27),(2018-03-02,null,1)), ((guid01,2018-02-28),
    * (2018-03-04,null,1)), ((guid01,2018-02-28),(2018-03-05,null,1)), ((guid01,2018-02-28),
    * (2018-03-06,null,1)), ((guid01,2018-02-28),(2018-03-07,null,1)), ((guid02,2018-02-28),
    * (2018-03-01,null,1)), ((guid02,2018-02-28),(2018-03-02,null,1)), ((guid02,2018-02-28),
    * (2018-03-03,null,1)), ((guid02,2018-03-02),(2018-03-06,null,1)))
    // 将数据通过reduceByKey一次性聚合处理出来,但reduceByKey要求输入和输出的value是一样的,所以这里需要对输入的数据做一次转换
    val preResRDD: RDD[((String, String), (String, String, Int))] = diffReMapedRDD.reduceByKey((tupleThree1, tupleThree2) => {
      // 元组元素太长了,换行显示
        Ordering[String].min(tupleThree1._1, tupleThree2._1),
        Ordering[String] max(tupleThree1._1, tupleThree2._1),
        tupleThree1._3 + tupleThree2._3
    println("preResRDD: " + preResRDD.collect().toBuffer)
    * preResRDD: ArrayBuffer(((guid01,2018-02-27),(2018-02-28,2018-03-02,3)),
    * ((guid02,2018-02-28),(2018-03-01,2018-03-03,3)), ((guid01,2018-02-28),(2018-03-04,2018-03-07,4)),
    * ((guid02,2018-03-02),(2018-03-06,,1)))
    // 过滤3次及以上的结果
    val resRDD: RDD[((String, String), (String, String, Int))] = preResRDD.filter(ele => ele._2._3 >= 3)
    println("resRDD: " + resRDD.collect().toBuffer)
    * resRDD: ArrayBuffer(
    * ((guid01,2018-02-27),(2018-02-28,2018-03-02,3)),
    * ((guid02,2018-02-28),(2018-03-01,2018-03-03,3)),
    * ((guid01,2018-02-28),(2018-03-04,2018-03-07,4)))
    sc.stop()
resRDD: ArrayBuffer(((guid01,2018-02-27),(2018-02-28,2018-03-02,3)), ((guid02,2018-02-28),(2018-03-01,2018-03-03,3)), ((guid01,2018-02-28),(2018-03-04,2018-03-07,4)))
 

上述2种实现方式,最关键的都是使用reduceByKey进行数据的一次性转换,但这样一来,要求对输入的数据进行处理
观察数据得出,先将数据做切分,变成uid, 日期字符串的对偶元组
然后使用groupByKey进行划分,根据uid进行划分
划分之后进行排序,这时候每个分区的数据就是对应uid各自的数据
再使用flatMapValues对数据做转换,将递增的rownumber,diffDate,次数标记1转换出来。注意flatmapValues输入是一个迭代器,输出也是一个迭代器
然后根据reduceByKey的规则,使用map转换数据。用uid,diffDate的元组作为key,使用当前日期,空字符串,数字1作为value
进行reduce聚合时,实际上是每个分区相同key的数据两两聚合,然后是分区之间的结果数据进行聚合。
所以这里直接将相同key(uid,diffDate)的value进行聚合时,相同数据下,最小的日期,最大的日期,累计天数都可以求出来。
累计天数就是分区和分区结果都使用累加的方式
最小天数使用min比较
最大天数使用max比较
最后的结果使用filter过滤

注意点,一定一定记得,根据需求对可能产生重复的数据进行去重,否则会影响结果。同时做数据转换时,最好按需搜集字段,不需要的字段都去除,这样可以极大降低大数据处理的数据量,提升性能,降低处理时间

Spark RDD案例(一)连续数据统计1. 背景Spark作为大数据分析引擎,本身可以做离线和准实时数据处理Spark抽象出的操作对象如RDD、dataSet、dataFrame、DStream等都是高层级的抽象,屏蔽了分布式数据处理代码细节,操作分布式数据和处理就像使用scala集合接口一样便利。这样可以很大降低编程使用和理解门槛。在实际生产中,大数据处理面临的业务需求和正常java 业务需求一样,都是基于数据做处理。不同的是正常java业务数据相对较少,如mysql中适合存储的数据是小而美的 http://bigdata.edu360.cn/laozhang http://bigdata.edu360.cn/laozhang http://bigdata.edu360.cn/laozhao http://bigdata.edu360.cn/laozhao http://bigdata.edu360.cn/laozhao http://bigdata.edu360.c. 1 连续处理概述 连续处理(Continuous Processing)是Spark 2.3中引入的一种新的实验性流执行模式,可实现低的(~1 ms)端到端延迟,并且至少具有一次容错保证。 将其与默认的微批处理(micro-batchprocessing)引擎相比较,该引擎可以实现一次性保证,但最多可实现~100ms的延迟。 在实时流式应用中,最典型的应用场景:网站UV统计。 业务需求一:实时统计网站UV,比如每日网站UV; 业务需求统计最近一段时间(比 Spark是基于内存计算的大数据并行计算框架 Spark最大的特点是基于内存的实时计算 2013年Spark加入Apache孵化器后发展迅猛,如今已成为Apache基金会最重要的三大分布式开源项目之一(Hadoop、Spark、Storm) 2014年打破Hadoop保持的基准排序记录——Spark用十分之一的计算资源,获得了比Hadoop快3倍的速度 ▍Spark特点 速度快:使用DAG执行引擎,以支持循环数据流与内存计算
连续型概率分布与正态分布 连续数据 前面讲到的概率分布涉及的都是离散数据,即数据由一个个单独的数值组成,其中的每一个数值都有对应概率。例如,在分析老虎机收益概率分布时,每一局赌局出现的收益数额是确定的,我们很清楚各种情况的赔率,也知道自己有机会赢到其中一种。 如果是离散数据,则为数值型数据,只能取确切值。离散数据往往能以某种方式进行计数,例如糖果机中的糖果数目,智力游戏中答对的问题的数目,或是在一个特定时段内的故障次数。 但并非所有数值型数据都是离散的。有时候,数据涵盖的是一个范围,这个范围内的任何一个数
上一节,我们总结了离散型数据的处理方法,这一节我们整理下连续数据的处理方法。一般来讲,离散型数据都是类别值,如:男生,女生,商务座,一等座,等座等等。连续数据基本上都是数值型数据,如年龄(10岁,11岁…),身高(110cm, 175cm…),海拔,薪资等等。 既然连续数据都是数值,是可以直接拿来供计算机使用的,那么连续数据该如何处理?这里我们提出一个处理思想:连续数据离散化。连续数据离散化以后的建模效果一定会比原来的好吗?这个答案是不一定的,任何时候我们都需要对两者结果进行比较,从而做出判断
关于朴素贝叶斯的原理及离散型的朴素贝叶斯,参见上一篇博文:https://blog.csdn.net/gongfuxiongmao_/article/details/116062023?spm=1001.2014.3001.5502 对于连续型的数据,在假定数据符合正态分布的前提下,可以对训练数据中每个特征进行高斯处理,得到一个特征的高斯曲线,利用高斯曲线来估计预测数据属于某一类的概率。 比如下面的例子中,数据有四个特征值:x1,x2,x3,x4 ; 同时有三个分类结果:生男孩,生女孩,没怀孕。
使用SparkRDD统计连续登陆的三天及以上的用户 这个问题可以扩展到很多相似的问题:连续几个月充值会员、连续天数有商品卖出、连续打滴滴、连续逾期。 测试数据:用户ID、登入日期 guid01,2018-02-28 guid01,2018-03-01 guid01,2018-03-02 guid01,2018-03-04 guid01,2018-03-05 guid01,2018-03-06 guid01,2018-03-07 guid02,2018-03-01 guid02,2018-03-02 Spark作为大数据分析引擎,本身可以做离线和准实时数据处理 Spark抽象出的操作对象如RDD、dataSet、dataFrame、DStream等都是高层级的抽象,屏蔽了分布式数据处理代码细节,操作分布式数据和处理就像使用scala集合接口一样便利。这样可以很大降低编程使用和理解门槛。 在实际生产中,大数据处理面临的业务需求和正常java 业务需求一样,都是基于数据做处理。不同的是正常java业务数据相对较少,如mysql中适合
QuantileDiscretizer输入连续的特征列,输出分箱的类别特征。分箱数是通过参数numBuckets来指定的。 箱的范围是通过使用近似算法(见approxQuantile )来得到的。 近似的精度可以通过relativeError参数来控制。当这个参数设置为0时,将会计算精确的分位数。箱的上边界和下边界分别是正无穷和负无穷时, 取值将会覆盖所有的实数值。   假设我们有下面的...
Spark的三大核心数据结构:RDD、累加器(只写不读)、广播变量(只读不写) 在spark应用程序中,我们经常会有这样的需求,如异常监控,调试,记录符合某特性的数据的数目,这种需求都需要用到计数器,如果一个变量不被声明为一个累加器,那么它将在被改变时不会再driver端进行全局汇总,即在分布式运行时每个task运行的只是原始变量的一个副本,并不能改变原始变量的值,但是当这个变量被声明为累加器后,该变量就会有分布式计数的功能。 来个简单的例子,看为什么要使用累加器 object Acc { 请根据给定的实验数据,在 spark-shell 中通过编程来计算以下内容: (1)该系总共有多少学生; val rdd= sc.textFile("file:///usr/local/spark/mycode/Data01.txt")
Spark RDD中分组取TopN案例是指在一个RDD中,根据某个键值进行分组,然后对每个组内的数据进行排序,取出每个组内的前N个数据。这种操作在数据分析和处理中非常常见,可以用于统计每个地区的销售额排名前N的产品、每个用户的消费排名前N的商品等。 优化方面,可以考虑使用Spark SQL或DataFrame来实现分组取TopN操作,因为它们提供了更高级的API和优化技术,可以更快速地处理大规模数据。另外,可以使用分布式缓存技术将数据缓存到内存中,以加快数据访问速度。还可以使用分区和并行计算等技术来提高计算效率。
Hive错误之 Execution Error, return code 2 from org.apache.hadoop.hive.ql.exec.mr.MapRedTask错误分析 42585
Hive错误之 Execution Error, return code 2 from org.apache.hadoop.hive.ql.exec.mr.MapRedTask错误分析 作者您好,我是hadoop3.1.3和Hive3.1.3,大概率可能是2.4的原因,那该如何解决呢