A 2015-01 5
A 2015-01 15
B 2015-01 5
A 2015-01 8
B 2015-01 25
A 2015-01 5
A 2015-02 4
A 2015-02 6
B 2015-02 10
B 2015-02 5
A 2015-03 16
A 2015-03 22
B 2015-03 23
B 2015-03 10
*B* *2015-03* *11*

需求如下:

每个用户截止到每月为止的最大单月访问次数和累计到该月的总访问次数

结果如下:

用户	月份		当月访问次数	最大访问次数	总访问次数		
A       2015-01		33		33		33
A       2015-02		10		33		43
A       2015-03		38		38		81
B       2015-01		30		30		30
B       2015-02		15		30		45
B       2015-03		44		44		89

1 需求实现

1.1 数据准备

A,2015-01,5
A,2015-01,15
B,2015-01,5
A,2015-01,8
B,2015-01,25
A,2015-01,5
A,2015-02,4
A,2015-02,6
B,2015-02,10
B,2015-02,5
A,2015-03,16
A,2015-03,22
B,2015-03,23
B,2015-03,10
B,2015-03,11

1.2 创建hive表

drop table if exists dan_test.pview
CREATE TABLE dan_test.class ( 
        user_id string, 
        month string,
        pv string
ROW format delimited FIELDS TERMINATED BY ",";

1.3 导入数据

load data local inpath "/home/centos/dan_test/pview.txt" into table pview;

1.4 需求实现

1.4.1 需求分析

从最终的需求可以看出,我们计算的结果是随着行的变化而变化,我们把这类问题称为移动计算。在hivesql中其实解决此类问题我们是通过移动窗口来解决的,类似于spark中的滑动窗口。那么控制此类行的变化范围hive中给出了具体的方法--窗口子句。

1.4.2 窗口函数的理解

窗口:over(),分析函数如:row_number(),max(),lag()等。分析函数+窗口函数:窗口的本质就是指明了分析函数分析数据时要处理的数据范围(作用域)。窗口分为静态窗口和移动窗口(也叫滑动窗口),静态窗口指分析数据的范围是固定不变的。滑动窗口指按照行的变化,窗口数据也随着变换,不同的行对应着不同的窗口数据(类似于与spark中的滑动窗口,随着时间的变化,窗口数据也发生着变化)。窗口也是SQL编程的思维本质,就是对范围内的数据进行处理

我们可以形象的把over()子句理解成开窗子句,即打开一个窗口,窗口内包含多条记录,over()会给每一行开一个窗口。如下图,总共有5条记录,每一行代表一条记录,over()在每一条记录的基础上打开一个窗口,给r1记录打开w1窗口,窗口内只包含自己,给r2打开w2窗口,窗口内包含r1、r2,给r3打开w3窗口,窗口内包含r1、r2、r3,以此类推....

由上我们不难发现,在使用over()子句进行查询的时候, 不仅可以查询到每条记录的信息,还可以查询到这条记录对应窗口内的所有记录的聚合信息,所以我们通常结合聚合函数和over()子句一起使用。

1.4.3 窗口子句

窗口函数包括三个窗口子句。分组:partition by;排序:order by;窗口大小:rows.使用语法如下:

over(partition by xxx order by yyy rows between zzz)

窗口子句范围大小的控制:

rows或(range)子句往往来控制窗口边界范围的,其语法如下:

ROWS between CURRENT ROW | UNBOUNDED PRECEDING | [num] PRECEDING AND  UNBOUNDED FOLLOWING | [num] FOLLOWING| CURRENT ROW
  • n PRECEDING:往前n行数据;
  • n FOLLOWING:往后n行数据;
  • UNBOUNDED:起点,UNBOUNDED PRECEDING 表示从前面的起点, UNBOUNDED FOLLOWING表示到后面的终点;
  • 窗口子句可以用来更精细的描述窗口,注意有几个函数是不支持窗口子句的:Rank, NTile,DenseRank,CumeDisk,PercentRank,Lead,Lag.

    子句意义
    PRECEDING往前
    FOLLOWING往后
    CURRENT ROW当前行
    UNBOUNDED起点(一般结合PRECEDING,FOLLOWING使用)
    UNBOUNDED PRECEDING表示该窗口最前面的行(起点)
    UNBOUNDED FOLLOWING表示该窗口最后面的行(终点)

    用法实例:

  • ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW:从该窗口的起点到当前行
  • ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING:从前2行到后1行
  • ROWS BETWEEN 2 PRECEDING AND 1 CURRENT ROW:从前2行到当前行
  • 1.4.4 rows与range的区别

    rows:rows是真实的行数,也就是我们实际中所说的1,2,3...连续的行数。

    range:range是逻辑上的行数,所谓的逻辑行指的就是需要通过计算才能知道是哪一行。range后面跟计算表达式,对order by后面的某个字段值进行计算,计算后的结果表示其真正的范围。(逻辑偏移量构成)。

    两者区别如下:

    分析下面两个语句: SUM(ID) over(ORDER BY ID ROWS BETWEEN 1 preceding AND 2 following) rows_sum SUM(ID) over(ORDER BY ID RANGE BETWEEN 1 preceding AND 2 following) range_sum

    第一个为物理上的rows:表示从当前行为参考点,数据范围为前一行与后两行范围内求得的结果。数据范围为:

    当前行为第一行时:数据范围如下图所示

    sum(id)=1+1+3=5

    当前行为第二行时:数据范围如下图所示

    sun(id)=1+1+3+6=11

    当前行为第三行时:数据范围如下图所示

    sum(id) = 1+3 +6 +6=16

    ......

    整个过程如下图所示

    整个窗口的变化过程就像按照每一行进行移动,移动的数据范围由窗口子句指定

    第二个为逻辑上的range:数据的范围需要按照id进行计算。计算公式为:

    RANGE BETWEEN 1 preceding AND 2 following。翻译为:当前行的值(此处为id的值,具体是以order by 后字段进行计算的)id-1=

    当为第一行时:id=1,计算公式为id-1=

    sum(id)=1+1+3=5

    当为第二行时:id = 1,计算同上,0=

    当为第三行时:id=3,计算公式为id-1=

    sum(id)=3

    当为第四行时:id=6, 计算公式为id-1=

    sum(id) = 6+6+6+7+8=33

    依次类推,计算出其他行。也就是按照物理行去移动,只不过窗口的数据范围不是物理行,而是需要计算,计算所得的值的范围所在的行。

    range的应用场景:比如有一张员工薪资表。我想知道比当前员工薪资高1000元的员工总数。此时range就很好用。
    

    1.4.5 窗口函数几点认识总结

    a 当窗口函数over()出现分组(partition by)子句时:

    unbounded preceding即第一行是指表中一个分组里的第一行, unbounded following即最后一行是指表中一个分组里的最后一行;

    b 当开窗函数over()无分组(partition by)子句时

    unbounded preceding即第一行是指表中的第一行, unbounded following即最后一行是指表中的最后一行。

    c 而无论是否省略分组子句,以下结论都是成立的:

    1、窗口子句不能单独出现,必须有order by子句时才能出现。
    
  • ​ 有分组有order by 则为分组中第一行到当前行
  • ​ 有分组无order by 则为整个分组
  • ​ 无分组有order by 则为整个表中第一行到当前行
  • ​ 无分组无order by则为整个表。即over()
  • 1.4.6 窗口函数执行顺序及原理

    1)先看sql的执行顺序
      1 from
    
  • 计算除窗口函数以外所有的其他运算,如:group by,join ,having等。上面的代码的第一阶段即为:
  • select channel, month, sum(amount) as s from sales group by channel, month;
    
  • 将上一步的输出作为 WindowingTableFunction 函数的输入,计算对应的窗口函数值。上面代码的第一阶段即为:
  • select channel, month, s,dr,r from 
    WindowingTableFunction(
    -- 上一阶段的输出
    <select channel, month, sum(amount) as s from sales group by channel, month>,
    -- 窗口函数的分区list
    partition by channel,
    -- 窗口函数的order list
    order by s,
    -- 窗口函数调用
    [r:<rank()>, dr:<denserank()>]
    

    1.4.7 条件判断语句嵌套window子句时的执行顺序(易错点)

    (1)需求:表如下

    user_idgood_namegoods_typerk
    1hadoop101
    1hive122
    1sqoop263
    1hbase104
    1spark135
    1flink266
    1kafka147
    1oozie108

    以上数据中,goods_type列,假设26代表是广告,现在有个需求,想获取每个用户每次搜索下非广告类型的商品位置自然排序,如果下效果:

    user_idgood_namegoods_typerknaturl_rk
    1hadoop1011
    1hive1222
    1sqoop263null
    1hbase1043
    1spark1354
    1flink266null
    1kafka1475
    1oozie1086

    (2)建表

    create table window_goods_test (
    user_id int,    --用户id
    goods_name string,  --商品名称
    goods_type int, --标识每个商品的类型,比如广告,非广告
    rk int  --这次搜索下商品的位置,比如第一个广告商品就是1,后面的依次2,3,4...
    )ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t';
    

    (3)数据

    vim window_goods_test

    1	hadoop	10	1
    1	hive	12	2
    1	sqoop	26	3
    1	hbase	10	4
    1	spark	13	5
    1	flink	26	6
    1	kafka	14	7
    1	oozie	10	8
    

    (4)加载数据

    load data local inpath "/home/centos/dan_test/window_goods_test.txt" into table window_goods_test;
    

    (5)查询数据

    21/06/25 11:35:33 INFO DAGScheduler: Job 2 finished: processCmd at CliDriver.java:376, took 0.209632 s
    1	hadoop	10	1
    1	hive	12	2
    1	sqoop	26	3
    1	hbase	10	4
    1	spark	13	5
    1	flink	26	6
    1	kafka	14	7
    1	oozie	10	8
    Time taken: 0.818 seconds, Fetched 8 row(s)
    21/06/25 11:35:33 INFO CliDriver: Time taken: 0.818 seconds, Fetched 8 row(s)
    

    (6) 分析

    从结果表来看只需要增加一列为排序列,只不过是将goods_type列去除掉重新排序,因此我们很容易想到用窗口函数解决排序问题.row_number()便可顺利解决。于是很容易写出如下SQL:

    select 
        user_id,
        goods_name,
        goods_type,
        case when goods_type!=26 then row_number() over(partition by user_id  order by rk) else null end as naturl_rank  
    from window_goods_test
    

    执行结果如下:

    1	hadoop	10	1	1
    1	hive	12	2	2
    1	sqoop	26	3	NULL
    1	hbase	10	4	4
    1	spark	13	5	5
    1	flink	26	6	NULL
    1	kafka	14	7	7
    1	oozie	10	8	8
    Time taken: 2.858 seconds, Fetched 8 row(s)
    

    img 从结果可以看出并非自然排序,不是我们最终想要的目标结果,从实现上看逻辑也清除没什么问题,问题出哪了呢?其原因在于对窗口函数的执行原理及顺序不了解。下面我们进一步通过执行计划来看此SQL的执行过程。SQL如下:

    explain select 
        user_id,
        goods_name,
        goods_type,
        case when goods_type!=26 then row_number() over(partition by user_id  order by rk) else null end as naturl_rank  
    from window_goods_test
    

    具体执行计划如下:

    == Physical Plan ==
    *Project [user_id#67, goods_name#68, goods_type#69, rk#70, CASE WHEN NOT (goods_type#69 = 26) THEN _we0#72 ELSE null END AS naturl_rank#64]
    +- Window [row_number() windowspecdefinition(user_id#67, rk#70 ASC NULLS FIRST, ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS _we0#72], [user_id#67], [rk#70 ASC NULLS FIRST]
       +- *Sort [user_id#67 ASC NULLS FIRST, rk#70 ASC NULLS FIRST], false, 0
          +- Exchange hashpartitioning(user_id#67, 200)
             +- HiveTableScan [user_id#67, goods_name#68, goods_type#69, rk#70], MetastoreRelation default, window_goods_test
    Time taken: 0.238 seconds, Fetched 1 row(s)
    21/06/25 13:06:24 INFO CliDriver: Time taken: 0.238 seconds, Fetched 1 row(s)
    

    (4)执行row_number()函数进行分析

    (5) 使用case when进行判断

    由执行计划可以看出case when是在窗口函数之后执行的,与我们同常理解先进行条件判断,满足条件后再执行窗口函数的认知是有差异的。

    也就是说case when中使用窗口函数时候,先执行窗口函数再执行条件判断。

    *上述代码中数据运转流程图如下:*

    实际上上述SQL被拆分成三部分执行:

    第一步:扫描表,获取select的结果集

    select 
        user_id,
        goods_name,
        goods_type,
    from window_goods_test
    21/06/25 13:49:49 INFO DAGScheduler: Job 9 finished: processCmd at CliDriver.java:376, took 0.039341 s
    1	hadoop	10	1
    1	hive	12	2
    1	sqoop	26	3
    1	hbase	10	4
    1	spark	13	5
    1	flink	26	6
    1	kafka	14	7
    1	oozie	10	8
    Time taken: 0.153 seconds, Fetched 8 row(s)
    21/06/25 13:49:49 INFO CliDriver: Time taken: 0.153 seconds, Fetched 8 row(s)
    

    第二步:执行窗口函数

    select 
        user_id,
        goods_name,
        goods_type,
       row_number() over(partition by user_id order by rk) as naturl_rank  
    from window_goods_test
    21/06/25 13:47:33 INFO DAGScheduler: Job 8 finished: processCmd at CliDriver.java:376, took 0.754508 s
    1	hadoop	10	1	1
    1	hive	12	2	2
    1	sqoop	26	3	3
    1	hbase	10	4	4
    1	spark	13	5	5
    1	flink	26	6	6
    1	kafka	14	7	7
    1	oozie	10	8	8
    Time taken: 1.016 seconds, Fetched 8 row(s)
    21/06/25 13:47:33 INFO CliDriver: Time taken: 1.016 seconds, Fetched 8 row(s)
    

    第三步:在第二步的结果集中执行case when

    select 
        user_id,
        goods_name,
        goods_type,
        case when goods_type!=26 then row_number() over(partition by user_id  order by rk) else null end as naturl_rank  
    from window_goods_test
    

    其变换结果集如下:

    由以上分析我们我们可以看出要想得到正确的SQL需要在窗口函数执行前就需要将数据先过滤掉而不是窗口函数执行后。因此可以想到在where语句里面先过滤,但是根据结果商品类型为26的排序需要置为NULL,因此我门采用union,具体SQL如下:

    select user_id
      ,goods_name
      ,goods_type
      ,row_number() over(partition by user_id  order by rk) as naturl_rank 
    from window_goods_test
    where goods_type!=26
    union all
    select user_id
      ,goods_name
      ,goods_type
      ,null as naturl_rank 
    from window_goods_test
    where goods_type=26
    1	hadoop	10	1	1
    1	hive	12	2	2
    1	hbase	10	4	3
    1	spark	13	5	4
    1	kafka	14	7	5
    1	oozie	10	8	6
    1	sqoop	26	3	NULL
    1	flink	26	6	NULL
    Time taken: 1.482 seconds, Fetched 8 row(s)
    21/06/25 14:08:14 INFO CliDriver: Time taken: 1.482 seconds, Fetched 8 row(s)
    

    但此处的缺点是需要扫描表 window_goods_test两次,显然对于此题不是最好的解法.

    下面我们给出其他解法。为了能够先过滤掉商品类型为26的商品,我们可以先在partition by分组中先进行if 语句过滤,如果goods_type!=26则取对应的id进行分组排序,如果goods_type=26则置为随机数再按照随机数分组排序,最后外层再通过goods_type!=26将其过滤掉。此处partition by分组中if 语句中的else后不置为NULL而是随机数,是因为如果置为NULL,goods_type=26的数较多的情况下会被分到一组造成数据倾斜,因此采用了rand()函数。具体SQL如下:

    select user_id
      ,goods_name
      ,goods_type
      ,if(goods_type!=26,row_number() over(partition by if(goods_type!=26,user_id,rand()) order by rk),null) naturl_rank 
    from window_goods_test
    order by rk
    ------------------------------
    

    此处为了得到最终的结果对rk进行了order by排序,order by执行是在窗口函数之后。

    获取的中间结果如下:

    select user_id
      ,goods_name
      ,goods_type
      ,row_number() over(partition by if(goods_type!=26,user_id,rand()) order by rk) naturl_rank 
    from window_goods_test
    order by rk
    ---------------------------------
    21/06/25 14:37:27 INFO CliDriver: Time taken: 0.187 seconds, Fetched 1 row(s)
    

    最终执行结果如下:

    1	hadoop	10	1	1
    1	hive	12	2	2
    1	sqoop	26	3	NULL
    1	hbase	10	4	3
    1	spark	13	5	4
    1	flink	26	6	NULL
    1	kafka	14	7	5
    1	oozie	10	8	6
    Time taken: 1.255 seconds, Fetched 8 row(s)
    21/06/25 14:14:21 INFO CliDriver: Time taken: 1.255 seconds, Fetched 8 row(s)
    21/06/25 14:39:55 INFO CliDriver: Time taken: 0.186 seconds, Fetched 1 row(s)
    

    此题给我们的启示:

  • (1)case when(或if)语句中嵌套窗口函数时,条件判断语句的执行顺序在窗口函数之后
  • (2)窗口函数partition by子句中可以嵌套条件判断语句
  • *1.4.8 窗口函数中的分组与group by的区别***

    (1) 窗口函数中的分组与group by的区别1

  • group by 分组返回值只有一个,一组中只返回一个结果。窗口函数中partition by分组,每组每行中都会有一个分析结果。
  • select 中的字段必须出现在group by中,而窗口函数中partition by分组则无此限制,其分析的结果可以与表中的其他字段并列,其相当于在原表每个分组中添加了一列。
  • 如果开窗函数在 group by后的结果集中使用时,那么窗口中无其他限定时,一般把一组看成一条记录,相当于先进行分组后,分组后这一组内整体的记录数被作为一条记录***。***窗口函数也是基于整个group by后的查询结果,而不是基于每组组内的查询结果。
  • ***group by 汇总后行数减少,partition by汇总后原表中的行数没变。***这个也是为什么使用窗口函数分组而不使用group by的原因。具体如下所示:
  • *(2)窗口函数与group by的区别2***

  • *通过HiveSQL的执行顺序我们知道,窗口函数的执行是在group by,having之后进行,是与select同级别的,所以我们可以得出窗口函数的partition by与group by的一个重要区别就是,如果SQL中既使用了group by又使用了partition by,那么此时partition by的分组是基于group by分组之后的再次分组,分析的数据范围也是基于group by后的数据。一定要注意分析函数如count(*)只是针对over()中的数据进行分析。例如:先进行了group by XXX 后使用了count(*) over(partition by XXX),此时count只是对group by 后的数据再进行partition by后进行统计。***
  • *窗口中的partition by不进行去重,而group by进行去重***
  • 以上的理解比较绕口,我们通过一个实验进行说明

    数据如下:

    name	orderdate	cost
    jack	2017-01-01	10
    jack	2017-02-03	23
    jack	2017-01-05	46
    jack	2017-04-06	42
    jack	2017-01-08	55
    mart	2017-04-08	62
    mart	2017-04-09	68
    mart	2017-04-11	75
    mart	2017-04-13	94
    

    (1)实验1:用group by及partiton by进行分组。

        =======================gruop by=======================
        select
            name,
            count(*)
            overdemo
        where
            date_format(orderdate,'yyyy-MM')='2017-04'
        group by
            name;
        --------结果---------
        name	_c1
        jack	1
        mart	4
        =======================partition by========================
        select
            name,
            count(*) over(partition by name)
            overdemo
        where
            date_format(orderdate,'yyyy-MM')='2017-04';
        --------结果---------
        name	count_window_0
        jack	1
        mart	4
        mart	4
        mart	4
        mart	4
    
  • 实验结果:group by去重,partition by不去重
  • (2)实验2:在group by基础上再进行partiton by

    =====group by name后over(partition by name)=====
    select
        name,
        count(*) over(partition by name)
        overdemo
    where
        date_format(orderdate,'yyyy-MM')='2017-04'
    group by 
        name;
    ---------结果--------
    name	count_window_0
    jack	1
    mart	1
    
  • 总结:通过以上对比可以看出先group by name后over(partition by name)是在分组的基础上再分组,count的结果是针对分区而言的,partition by的结果是对group by的上层抽象,group by后的结果集为*(jack,mark)进行partition by分组后为jack一组,mark一组,那么count(*)作用于每个分区计算结果为1和1**。***
  • count(*) over(),我们知道over()即为整表,没有分组条件,所有的key都在一个分区中,即jack,mark在同一个分区中,但此时有group by,所以结果集是基于group by的,那么此时的group by后的结果集就是作为一个子表,其被抽象为(jack,mark),count(*)作用于一个分区后计算结果为2
  • 小结: 当group by和partition by同时出现的时候,partition by是基于group 的结果进行分组的,group by 后的结果作为一个子集一个抽象的结果(key的集合)传给窗口进行再次分组,此时聚合分析函数仅作用于每个分组;另外两者单独出现作为分组时group by去重(对key去重),partition by不去重(不会对key进行去重);partition by在group by后执行,可以进行更高级的需求分析。

    其本质是要理解开窗函数执行顺序是和select同一级的,所以group by的结果作为子表供开窗函数使用,开窗函数也是基于group by后的结果进行分析,可以将其定位为select字段一致的用法只不过是加了分析后形成一个新的字段,一个新的属性,常常作为辅助列使用。

    1.4.9 需求实现

  • 目标:每个用户截止到每月为止的最大单月访问次数和累计到该月的总访问次数
  • 本题就是典型的移动计算问题,从给的原始数据可以看出需要先对用户的每月的访问次数进行汇总。SQL代码如下:

     --先整理数据
    select a.user_id as user_id, a.month as month
           ,sum(a.pv) as pv 
     from pview a 
     group by a.user_id, a.month
    

    按用户分组,对用户组内的数据进行窗口滑动,用户组内按照行向下移动,滑动的范围为:从第一条数据到当前行的数据范围。在数据范围内求最大值及累计sum值,这样求出的就是截止当前每月最大访问次数,及截止当前每月的总访问次数。

    因而可得出规律:凡是题目中有截止当前字样的肯定需要移动计算,窗口函数去完成。

    具体SQL如下:

     select b.user_id
           ,b.month
           ,b.pv
           ,max(b.pv) over(partition by b.user_id order by b.month) as max_pv --此处省略了rows between unbounded preceding and current row
           ,sum(b.pv) over(partition by b.user_id order by b.month) as sum_pv
    	select a.user_id as user_id
    	      ,a.month as month
                  ,sum(a.pv) as pv 
            from pview a 
            group by a.user_id, a.month
    

    2 小 结

    本文通过案例来引出对窗口函数的认识,总结了窗口函数的用法及使用规律,该案例主要是对窗口函数在移动计算中的应用,类似于滑动窗口,所谓的滑动窗口也就是指每一行对应对应的数据窗口都不同,通过窗口子句类实现移动计算时数据的范围,也就是窗口每次按行滑动时长度大小,但窗口中每一次对应的数据总是在变化。通过本文你可以获得如下知识:

  • (1)窗口函数的使用规则及用法
  • (2)窗口子句的使用规则
  • (3)窗口函数的执行顺序及原理
  • (4)窗口函数在移动计算中的应用
  •