一、结构化 API 概述

1. 结构化API是处理各种数据类型的工具,可处理非结构化的日志文件、半结构化的CSV文件以及高度结构化的Parquet文件。结构化API指以下三种核心分布式集合类型的API:Dataset类型、DataFrame类型、SQL表和视图。

大多数结构化API 均适用于批处理和流处理,这意味着使用结构化API 编写代码时,几乎不用改动代码就可以从批处理程序转换为流处理程序 (反之亦然)。

DataFrame 和Dataset 是具有行和列的类似于(分布式)数据表的集合类型。所有列的行数相同(可以使用null 来指定缺省值),并且某一列的类型必须在所有行中保持一致。Spark 中的DataFrame 和Dataset 代表不可变的数据集合 ,可以通过它指定对特定位置数据的操作,该操作将以惰性评估方式执行。当对DataFrame执行action操作时,将触发Spark执行具体transformation操作并返回结果。表和视图与DataFrame基本相同,所以通常在DataFrame上执行SQL操作,而不是用DataFrame专用的Scala代码。

Schema定义了DataFrame的列名和类型,可以手动定义或者从数据源读取模式(通常定义为模式读取)。Schema数据模式需要指定数据类型,这意味着用户需要指定在什么地方放置什么类型的数据。

2. Spark实际上有它自己的编程语言, Spark 内部使用一个名为Catalyst 的引擎,在计划制定和执行作业的过程中使用Catalyst 来维护它自己的类型信息,这样就会带来很大的优化空间,这些优化可以显著提高性能 。Spark类型直接映射到不同语言API,并且针对Scala、Java、Python、SQL和R语言,都有一个对应的API查找表, 即使通过 Python 或R 语言来使用Spark 结构化API ,大多数情况下也是操作Spark 类型而非Python 类型 。例如,以下代码不会在Scala或Python中执行加法,而实际上完全是在Spark中执行加法:

// in Scala
val df = spark.range(500).toDF("number")
df.select(df.col("number") + 10)
# in Python
df = spark.range(500).toDF("number")
df.select(df["number"] + 10)

加法操作这样执行,是因为Spark会将用输入语言编写的表达式转换为代表相同功能的Spark内部Catalyst表示然后它将根据该内部表示进行操作。在讨论为什么会出现这种情况之前,先来讨论Dataset类型。

3. 实质上,结构化API包含两类API,即非类型化的DataFrame和类型化的Dataset。DataFrame是无类型的可能不太准确,因为它其实是有类型的,只是Spark完全负责维护它的类型,仅在运行时检查这些类型是否与schema中指定的类型一致。而Dataset在编译时就会检查类型是否符合规范。Dataset仅适用于基于Java虚拟机(JVM)的语言(比如Scala和Java),并通过case类或Java beans指定类型

因此在大多数情况下会使用DataFrame。Scala版本的Spark中, DataFrame就是一些Row类型的Dataset的集合。“Row”类型是Spark用于支持内存计算而优化的数据格式。这种格式利于高效计算,因为它避免使用会带来昂贵垃圾回收开销和对象实例化开销的JVM类型,而是基于自己的内部格式运行,所以并不会产生这种开销。Python版本和R语言版本的Spark并不支持Dataset,所有东西都是DataFrame。

4. DataFrame中的每条记录都必须是Row类型。可以通过SQL手动创建、或者RDD提取、或从数据源手动创建这些行。下面的示例使用范围函数range()来创建一个row对象的数组,

spark.range(2).toDF().collect()

接下来讨论一下如何实例化一个列为特定类型。通过以下方法可以使用正确的Scala类型:

import org.apache.spark.sql.types._
val b = ByteType

利用如下的工厂方法来使用正确的Java类型:

import org.apache.spark.sql.types.DataTypes; 
ByteType x = DataTypes.ByteType;

5. 针对一个结构化API查询任务,从用户代码到执行代码的过程步骤如下:

(1)编写DataFrame / Dataset / SQL代码。

(2)如果代码能有效执行,Spark将其转换为一个逻辑执行计划(Logical Plan)。

(3)Spark将此逻辑执行计划转化为一个物理执行计划(Physical Plan),并检查可行的优化策略,并在此过程中检查优化

(4)然后,Spark在集群上执行该物理执行计划(RDD操作)。

用户编写的代码通过控制台提交给Spark,或者以一个Spark作业的形式提交。然后代码将交由Catalyst优化器决定如何执行,并指定一个执行计划。最后代码将被运行,得到的结果将返回给用户,如下图所示:

从上面步骤可以看到,job执行的第一阶段旨在获取用户代码并将其转换为逻辑计划,该过程如下图所示:

这个逻辑计划仅代表一组抽象转换,还不涉及executor或driver,它只是将用户的表达式集合转换为最优的版本,它通过将用户代码转换为未解析的逻辑计划来实现这一点。这个计划没有解析,因为虽然用户代码可能是有效的,但它涉及的表或列可能存在也可能不存在。

Spark使用catalog(所有表和DataFrame信息的存储库)来在分析器analyzer中解析(resolve)列和表格。如果目录中不存在所需的表或列名称,分析器可能会拒绝该未解析的逻辑计划如果分析器可以解析它,结果将通过Catalyst优化器(Catalyst Optimizer),尝试通过谓词下推或选择操作来优化逻辑计划。用户也可以在定制源码时,扩展Catalyst优化器来支持自己特定的优化策略。

6. 在成功创建优化的逻辑计划后,Spark开始执行物理计划流程。物理计划(通常称为Spark计划)通过生成不同的物理执行策略,并通过代价模型进行比较分析,从而指定如何在集群上执行逻辑计划,具体流程如下图所示:

例如执行一个join操作就会涉及到代价比较,它通过分析数据表的物理属性(表的大小或分区的大小),对不同的物理执行策略进行代价比较,选择合适的物理执行计划。物理计划产生一系列的RDD和transformation操作这就是Spark被称为编译器的原因,因为它将DataFrame、Dataset和SQL中的查询操作编译为一系列RDD转换操作。在选择一个物理计划时,Spark将所有代码运行在底层编程接口RDD上,Spark在运行时执行进一步优化,生成可以在执行期间优化task或stage的本地Java字节码,最终将结果返回给用户。

二、基本的结构化操作

7. DataFrame由记录(record)组成,record是Row类型(与table中的一行相似)。一条record由多列(column)组成(类似于表格中的列)。模式(schema)定义了DataFrame列的名称以及列的数据类型。DataFrame的分区(partition)定义了DataFrame以及Dataset在集群上的物理分布,而分区模式(partitioning schema)定义了partition的分配方式,用户可以自定义分区的方式,也可以采取随机分区的方式。下面为一个DataFrame创建示例:

val df = spark.read.format("json")
    .load("/data/flight-data/json/2015-summary.json")

DataFrame有很多列,而模式(schema)定义了这些列的名字和数据类型。可以用下面的方法查询DataFrame的模式:

df.printSchema()

schema可以选择根据数据源来自动定义模式(称为读时模式schema-on-read),也可以选择由用户自己来显式地定义表的schema。实际应用场景决定了定义Schema的方式。当应用于即席分析(adhoc analysis)时,使用读时模式即可(尽管在处理如CSV和JSON等纯文本文件时速度较慢)。

但是读时模式也可能导致数据精度损失问题,例如在文件中读取时,将long型错误地解析为整数。当使用Spark进行实现生产级别ETL的时候,最好采取手动显式定义Schema的方式,尤其是在处理诸如CSV和JSON之类的无类型数据源时,这是因为模式推断(schema inference)方法会根据读入数据类型的不同而不同。

下面的例子利用行分隔的JSON半结构化性质来定义这个结构。数据集是来自美国交通局的航班统计数据:

spark.read.format("json").load("/data/flight-data/json/2015-summary.json").schema

会返回以下内容:

org.apache.spark.sql.types.StructType = ...
StructType(StructField(DEST_COUNTRY_NAME,StringType,true),
StructField(ORIGIN_COUNTRY_NAME,StringType,true),
StructField(count,LongType,true))

一个schema是由许多字段构成的StructType,这些字段即为StructField,它具有名称、类型、布尔标志(该标志指定该列是否可以包含缺失值或空值),并且用户可指定与该列关联的元数据(metadata)。元数据存储着有关此列的信息(Spark在其机器学习库中使用此功能)。模式还包含其他的StructType(Spark的复杂类型)。如果(在运行时)数据的类型与定义的schema模式不匹配,Spark将抛出一个错误。以下示例显示了如何为一个DataFrame创建并指定模式:

import org.apache.spark.sql.types.{StructField, StructType, StringType, LongType}
import org.apache.spark.sql.types.Metadata
val myManualSchema =StructType(Array(
    StructField("DEST_COUNTRY_NAME", StringType, true),
    StructField("ORIGIN_COUNTRY_NAME", StringType, true),
    StructField("count", LongType, false,
    Metadata.fromJson("{\"hello\":\"world\"}"))
val df = spark.read.format("json").schema(myManualSchema)
    .load("/data/flight-data/json/2015-summary.json")

8. 因为Spark会维护它自己的类型信息,所以不能简单地通过每种语言的类型来设置类型。接下来看看schema定义的列,Spark可以对DataFrame中的列进行选择、transformation操作和删除,并将这些操作表示为表达式。有很多不同的方法来构造和引用列,两个最简单的方法是通过col函数或column函数。使用这两个函数,需要传入列名:

import org.apache.spark.sql.functions.{col, column}
col("someColumnName")
column("someColumnName")

DataFrame可能不包含某列,所以该列要将列名与catalog中维护的列名相比较之后才会确定该列是否会被解析。上面提到过,列和数据表的解析在分析器(analyzer)阶段发生。Scala中有一些特有的语言支持,可以使用更多简短的方式来引用列,下面的Scala表达式也执行引用列的功能,但不会提升性能:

$"myColumn"
'myColumn

符号“$”将字符串指定为表达式,而符号(')指定一个symbol,是Scala引用标识符的特殊结构。它们都执行相同的功能,即通过列名引用列的简写方式。如果需要引用某DataFrame的某一列,则可以在这个DataFrame上使用col方法。当执行连接(Join)操作时,如果两个join的DataFrame存在一个同名列,该方法会非常有用。显式引用列的另一个好处就是Spark不用自己解析该列(在分析阶段),如下所示:

df.col("count")

列其实就是表达式,但什么是表达式?expression是对一个DataFrame中某一个记录的一至多个值的一组transformation操作。可以把它想象成一个函数,它将一个或多个列名作为输入,解析它们,然后针对数据集中的每条记录应用表达式来得到一个单值。这个“单值”实际上可以是一个复杂的类型,如Map或Array。在最简单的情况下,通过expr函数创建的表达式,仅仅是一个DataFrame列的引用,也就是说expr("someCol")等同于col("someCol")

9. 列提供了表达式功能的一个子集。如果使用col(),并想对该列执行transformation操作,则必须对该列的引用执行这些transformation操作。当使用表达式时,expr函数实际上可以将字符串解析成转换操作和列引用,也可以在之后将其传递到下一步的转换操作,来看下面的例子:

expr("someCol -5")
col("someCol") -5
expr("someCol") –5

它们都是相同的transformation操作。Spark将它们编译为表示操作顺序的逻辑树。要记住下面两点:

(1)在Spark看来,列只是表达式。

(2)列与对这些列的transformation操作被编译后生成的逻辑计划,与解析后的表达式的逻辑计划是一样的。

用一个例子来说明这一点:

(((col("someCol") + 5) * 200) -6) < col("otherCol")

逻辑树是一种有向无环图,该图等同于以下代码:

import org.apache.spark.sql.functions.expr
expr("(((someCol + 5) * 200) -6) < otherCol")

这里需要强调,上面的表达式是有效SQL代码,就像SELECT表达式。这是因为SQL与DataFrame代码在执行之前会变编译成相同的底层逻辑树。这意味着SQL表达式和DataFrame代码的性能是一样的。如果想在程序中访问列,可以使用属性columns查询DataFrame的所有列:

spark.read.format("json").load("/data/flight-data/json/2015-summary.json").columns

10. 在Spark中,DataFrame的每一行都是一个记录(Record),而记录是Row类型的对象。Spark使用列表达式操纵Row类型对象,Row对象内部其实是字节数组,但是Spark没有提供访问这些数组的接口,因此只能使用列表达式去操纵。可以通过在DataFrame上调用first()来查看一行:

df.first()

可以基于已知的每列的数值去手动实例化一个Row对象来创建行。需要注意的是,只有DataFrame具有模式(schema),行对象本身没有模式,这意味着如果手动创建Row对象,则必须按照该行所属的DataFrame的列顺序来初始化Row对象:

import org.apache.spark.sql.Row
val myRow = Row("Hello", null, 1, false)

访问行的数据也同样简单:只需指定想要的位置。使用Scala或Java时,必须使用辅助方法或显式地指定值类型,而使用Python或者R时,该值将被自动转化为正确的类型,如下所示:

// in Scala
myRow(0) // 任意类型
myRow(0).asInstanceOf[String] // 字符串
myRow.getString(0) // 字符串
myRow.getInt(2) // 整型
# in Python
myRow[0]
myRow[2]

11. 处理DataFrame对象时通常涉及几个基本目标,可以被归纳为如下几个核心操作:

(1)添加行或列;

(2)删除行或列;

(3)将一行转换操作为一列(或者将一列转换操作为一行);

(4)根据列中的值更改行的顺序;

可以把所有的这些操作转化成简单的transformation操作,最常见的是取一列逐行更改,然后返回结果。正如之前所见,可以从原始数据源中创建DataFrame,如下所示:

val df = spark.read.format("json")
    .load("/data/flight-data/json/2015-summary.json")
df.createOrReplaceTempView("dfTable")

也可以通过获取一组行并将它们转换操作为一个DataFrame来即时创建DataFrame,如下所示:

import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StructField, StructType,StringType, LongType}
val myManualSchema = new StructType(Array(
    new StructField("some", StringType, true),
    new StructField("col", StringType, true),
    new StructField("names", LongType, false)))
val myRows = Seq(Row("Hello", null, 1L))
val myRDD = spark.sparkContext.parallelize(myRows)
val myDf = spark.createDataFrame(myRDD, myManualSchema)
myDf.show()

上面代码的结果如下所示:

在Scala中,还能还可以利用Spark的隐式方法(使用implicit关键字),对Seq类型执行toDF函数来实现,由于对于null类型的支持并不稳定,所以这种方法并不推荐在实际生产中使用,如下所示:

val myDF = Seq(("Hello", 2, 1L)).toDF("col1", "col2", "col3")

12. 接下来看看DataFrame类型支持的最有用的方法:处理列或表达式时的select方法,以及处理字符串表达式时的selectExpr方法。当然有的转换操作不是针对列的操作方法,因此org.apache.spark.sql.functions包中包含一组函数方法用来提供额外支持。select和selectExpr函数支持在DataFrame上执行类似数据表的SQL查询:

SELECT * FROM dataFrameTable
SELECT columnName FROM dataFrameTable
SELECT columnName * 10, otherColumn, someOtherCol as c FROMdataFrameTable

简单来说就是,可以使用select和selectExpr来操作DataFrame中的列。最简单的方式就是使用select方法,待处理的列名作为参数传递进来:

// in Scala
df.select("DEST_COUNTRY_NAME", "ORIGIN_COUNTRY_NAME").show(2)
--in SQL
SELECT DEST_COUNTRY_NAME, ORIGIN_COUNTRY_NAME FROM dfTable LIMIT 2

上述语句输出结果为:

当然,也可以通过多种不同的方式引用列,而且这些方式可以等价互换:

import org.apache.spark.sql.functions.{expr, col, column}
df.select(
    df.col("DEST_COUNTRY_NAME"),
    col("DEST_COUNTRY_NAME"),
    column("DEST_COUNTRY_NAME"),
    'DEST_COUNTRY_NAME,
    $"DEST_COUNTRY_NAME",
    expr("DEST_COUNTRY_NAME"))
  .show(2)

一个常见的错误是混淆Column对象和字符串。例如,以下代码将导致编译错误:

df.select(col("DEST_COUNTRY_NAME"), "DEST_COUNTRY_NAME")

expr是目前使用到的最灵活的引用方式,它能够引用一列,也可以引用对列进行操纵的字符串表达式。为了说明这一点,先更改列名,然后使用AS关键字和列上的alias方法将名字重新改回去:

// in Scala
df.select(expr("DEST_COUNTRY_NAME AS destination")).show(2)
--in SQL
SELECT DEST_COUNTRY_NAME as destination FROM dfTable LIMIT 2

因为select后跟着一系列expr是非常常见的写法,所以Spark有一个有效地描述此操作序列的接口:selectExpr,它可能是最常用的接口:

df.selectExpr("DEST_COUNTRY_NAME as newColumnName", "DEST_COUNTRY_NAME").show(2)

这是Spark最强大的地方,可以利用selectExpr构建复杂表达式来创建DataFrame。实际上,可以添加任何不包含聚合操作的有效SQL语句,并且只要列可以解析,它就是有效的。下面是一个简单的例子,在DataFrame中增加一个新列withinCountry,该列描述了destination和origin是否相同:

df.selectExpr(
    "*", // 包含所有原始表中的列
    "(DEST_COUNTRY_NAME = ORIGIN_COUNTRY_NAME) as withinCountry")
  .show(2)

该语句的输出结果为:

使用select语句,还可以利用系统预定义好的聚合函数来指定在整个DataFrame上的聚合操作,如下面所展示的代码示例:

df.selectExpr("avg(count)", "count(distinct(DEST_COUNTRY_NAME))").show(2)

该语句的结果为:

13. 有时候需要给Spark传递显式的值,它们只是一个值而非新列,这可能是一个常量值,或接下来需要比较的值。可以通过字面量(literal)传递,简单来说就是从给定编程语言的字面值转换操作为Spark可以理解的值。字面量就是表达式,可以用操作表达式的方式来使用它们

import org.apache.spark.sql.functions.lit
df.select(expr("*"), lit(1).as("One")).show(2)

在SQL中,字面量只是特定的值:

SELECT *, 1 as One FROM dfTable LIMIT 2

这两个语句的输出结果为:

使用WithColumn可以为DataFrame增加新列,例如添加一个仅包含数字1的列:

// in Scala
df.withColumn("numberOne", lit(1)).show(2)
--in SQL
SELECT *, 1 as numberOne FROM dfTable LIMIT 2

该语句的输出如下所示:

来接触一下实际的表达式。在下一个示例中,当出发国家与目的地国家相同时,为其设置一个布尔标志:

df.withColumn("withinCountry", expr("ORIGIN_COUNTRY_NAME == DEST_COUNTRY_NAME"))
  .show(2)

withColumn函数有两个参数,分别是列名和为给定行赋值的表达式。也可以用WithColumn对某一列重命名,如下所示:

df.withColumn("Destination", expr("DEST_COUNTRY_NAME")).columns

修改后的结果为:

... DEST_COUNTRY_NAME, ORIGIN_COUNTRY_NAME, count, Destination

14. 不仅可以使用WithColumn对列重命名,还可以使用WithColumnRenamed方法实现对列重命名。WithColumnRenamed中,第一个参数是要被修改的列的名,第二个参数是新的列名:

df.withColumnRenamed("DEST_COUNTRY_NAME", "dest").columns
... dest, ORIGIN_COUNTRY_NAME, count  //改动后结果

可能还会遇到列名中包含空格或者连字符等保留字符,要处理这些保留字符意味着要适当的对列名进行转义。在Spark中,通过使用反引号(`)字符来实现关键字等保留字符转义。withColumn是一个允许使用保留字来创建列的方法。接下来展示两个示例—在第一个示例中不需要转义字符,但在第二个中会用到:

import org.apache.spark.sql.functions.expr
val dfWithLongColName = df.withColumn(
    "This Long Column-Name",
    expr("ORIGIN_COUNTRY_NAME"))

这里不需要转义字符,因为withColumn的第一个参数只是新列名的字符串。但在下面这个例子中,需要使用反引号,因为在expr表达式中引用了一个列

//in Scala
dfWithLongColName.selectExpr(
    "`This Long Column-Name`",
    "`This Long Column-Name` as `new col`")
  .show(2)
--in SQL
SELECT `This Long Column-Name`, `This Long Column-Name` as `new col`
FROM dfTableLong LIMIT 2

如果显式地使用字符串来引用列,则可以引用带有保留字符的类(而不用转义它们),这个字符串会被解释成字面值而不是表达式,只需要转义使用保留字符或者关键字的表达式。下面两个例子会产生相同的DataFrame:

// in Scala
dfWithLongColName.select(col("This Long Column-Name")).columns
# in Python
dfWithLongColName.select(expr("`This Long Column-Name`")).columns

15.从DataFrame中删除一至多个列可以使用drop方法,如下所示:

dfWithLongColName.drop("ORIGIN_COUNTRY_NAME", "DEST_COUNTRY_NAME")

可以通过更改列的类型来转换数据类型。例如,下面将count列从integer整型转换操作成Long:

//in Scala
df.withColumn("count2", col("count").cast("long"))
--in SQL
SELECT *, cast(count as long) AS count2 FROM dfTable

为了过滤行,只要创建一个表达式判断该表达是true还是false,然后过滤使表达式为false的行。在DataFrame上实现过滤操作最常见的两种方式分别是where和filter,它们可以执行相同的操作,接受相同参数类型,如下面的例子所示:

df.filter(col("count") < 2).show(2)
df.where("count < 2").show(2)
--in SQL
SELECT * FROM dfTable WHERE count < 2 LIMIT 2

输出结果为:

想把多个过滤条件放到一个表达式中的方式,并不总是有效。因为Spark会同时执行所有过滤操作,而不管过滤条件的先后顺序。因此想指定多个AND过滤操作时,只要按照先后顺序以链式的方式把些过滤条件串联起来,然后让Spark执行剩下的工作:

// in Scala
df.where(col("count") < 2).where(col("ORIGIN_COUNTRY_NAME") =!= "Croatia")
.show(2)
--in SQL
SELECT * FROM dfTable WHERE count < 2 AND ORIGIN_COUNTRY_NAME != "Croatia"
LIMIT 2

这样的输出结果为:

还有一个常见的应用场景是去除DataFrame中重复的行,这可以去除一列或者多列中重复的值。实现去重的方式之一是使用DataFrame的distinct方法,它能够对DataFrame中的行进行去重操作。例如,从数据集中找出所有不同的出发国家-目的地国家组合或者所有不同的出发国家,这是一个transformation操作,它将返回去重后的DataFrame:

// in Scala
df.select("ORIGIN_COUNTRY_NAME", "DEST_COUNTRY_NAME").distinct().count()
--in SQL
SELECT COUNT(DISTINCT(ORIGIN_COUNTRY_NAME, DEST_COUNTRY_NAME)) FROM dfTable

但是从去重性能优化的角度上考虑,不建议使用count distinct,建议使用count … group by,因为distinct需要将列中的所有内容都加载到内存中,大致可以理解为一个hash结构,key自然就是列中的所有值。因为是hash结构,那运算速度自然就快。最后计算hash中有多少key就是最终的结果。但在海量数据环境下,将所有值都存起来的内存消耗,可能会导致OOM。

而group by的实现方式是先将列排序,对于快速排序算法来说时间复杂度为O(nlogn),而空间复杂度只有O(1)。这样一来即使数据量再大一些,group by基本也扛得住。但是因为需要做一次排序,所以时间上会相对慢一点点。所以count(distinct)吃内存,查询快;而group by空间复杂度小,在时间复杂度允许的情况下,可以发挥它的空间复杂度优势。

16. 有时想从DataFrame中随机抽取一些记录,可以使用sample方法来实现此操作,它按一定比例从DataFrame中随机抽取一部分行,可以通过withReplacement参数指定是否放回抽样,true为有放回的抽样(可以有重复样本),false为无放回的抽样(无重复样本):

val seed = 5
val withReplacement = false
val fraction = 0.5
df.sample(withReplacement, fraction, seed).count()

当需要将原始DataFrame随机分割成多个分片时,可以使用随机分割。这通常是在机器学习算法中,用于分割数据集来创建训练集、验证集和测试集。在下一个示例中,要设置分割比例(随机分割函数的参数)来将DataFrame分割成两个不同的DataFrame。由于随机分割是一种随机方法,所以还需要指定一个随机seed。需要注意的是,如果一个DataFrame的分割比例的和不为1,则比例参数会被自动归一化:

val dataFrames = df.randomSplit(Array(0.25, 0.75), seed)
dataFrames(0).count() > dataFrames(1).count() // 结果为False

17. DataFrame是不可变的,这意味着用户不能向DataFrame追加行。如果想要向DataFrame追加行,必须将原始的DataFrame与新的DataFrame联合起来,即union操作,但必须确保它们具有相同的schema和列数,否则union操作将会失败。目前union是基于位置而不是基于数据模式schema执行,也就是说它并不会自动根据列名匹配对齐后再进行union,所以两个联合的DataFrame需要具有完全相同的模式和列数,如下所示:

import org.apache.spark.sql.Row
val schema = df.schema
val newRows = Seq(
    Row("New Country", "Other Country", 5L),
    Row("New Country 2", "Other Country 3", 1L)
val parallelizedRows = spark.sparkContext.parallelize(newRows)
val newDF = spark.createDataFrame(parallelizedRows, schema)
df.union(newDF)
  .where("count = 1")
  .where($"ORIGIN_COUNTRY_NAME" =!= "United States")
  .show() // get all of them and we'll see our new rows at the end

在Scala中需要使用“=!=”运算符,这是因为它不仅能比较字符串,也能够比较表达式,语句的输出结果如下:

当对DataFrame中的值进行排序时,通常是想要获得DataFrame里的一些最大值或者最小值。sort和orderBy方法是相互等价的操作,执行的方式也一样。它们均接收列表达式和字符串,以及多个列。默认设置是按升序排序:

df.sort("count").show(5)
df.orderBy("count", "DEST_COUNTRY_NAME").show(5)
df.orderBy(col("count"), col("DEST_COUNTRY_NAME")).show(5)

若要更明确地指定升序或是降序,则需使用asc函数和desc函数:

// in Scala
import org.apache.spark.sql.functions.{desc, asc}
df.orderBy(expr("count desc")).show(2)
df.orderBy(desc("count"), asc("DEST_COUNTRY_NAME")).show(2)
--in SQL
SELECT * FROM dfTable ORDER BY count DESC, DEST_COUNTRY_NAME ASC LIMIT 2

一个高级技巧是可以指定空值在排序列表中的位置,使用asc_nulls_firs指示空值安排在升序排列的前面,使用desc_nulls_firs指示空值安排在降序排列的前面,使用asc_nulls_last指示空值安排在升序排列的后面,使用desc_nulls_last指示空值安排在降序排列的后面。

出于性能优化的目的,最好是在进行别的转换之前,先对每个分区进行内部排序。可以使用sortWithinPartitions方法实现这一操作:

spark.read.format("json").load("/data/flight-data/json/*-summary.json")
     .sortWithinPartitions("count")

18. 另一个重要的优化是根据一些经常过滤的列对数据进行分区,控制跨集群数据的物理布局,包括分区方案和分区数。不管是否有必要,重新分区都会导致数据的全面shuffle。如果将来的分区数大于当前的分区数,或者想要基于某一组特定列来进行分区时,通常只能重新分区:

df.rdd.getNumPartitions // 1
df.repartition(5)

如果知道要经常按某一列执行过滤操作,则根据该列进行重新分区是很有必要的

df.repartition(col("DEST_COUNTRY_NAME"))

而另一方面,合并操作(coalesce)不会导致数据的全面shuffle,但会尝试合并分区。下面的示例代码将基于目的地国家名的列将数据重新划分成5个分区,然后再合并它们(没有导致数据全面shuffle):

df.repartition(5, col("DEST_COUNTRY_NAME")).coalesce(2)

但是如果是要减少小文件的数量,将很多个父RDD或父DataFrame的分区数急剧减少,这时候就不要直接用coalesce()而应该用repartition(),因为Spark源码中repartition()的实现其实就是coalesce(numOfPartitions, true),即支持shuffle的coalesce(),只用coalesce(numOfPartitions)则不会发生shuffle操作,如果括号内设置的numOfPartitions参数太少,会导致实际运行计算的节点过少,从而导致如下报错:

java.io.IOException: Unable to acquire 16777216 bytes of memory

这个报错只要设置coalesce()里是否shuffle的参数为true时就能消除。如果要把父RDD的分区数增大,如果coalesce()的是否shuffle设置为false,则增大分区数并不会起作用,这时候就必须设置是否shuffle为true了,这样父RDD才会在shuffle之后返回一个更多分区的RDD,数据分区方式默认是采用 Hash Partitioner。因此一般都建议用repartition()替代coalesce()

19. Spark的driver维护着集群状态,有时候需要让driver收集一些数据到本地,这样可以在本地机器上处理它们。下面的代码示例使用collect函数从整个DataFrame中获取所有数据,使用take函数选择前N行,并使用show函数打印一些行:

val collectDF = df.limit(10)
collectDF.take(5) // 获取整数行
collectDF.show() // 更友好的打印
collectDF.show(5, false)
collectDF.collect()

为了遍历整个数据集,还有一种让driver获取行的方法,即toLocalIterator()函数。toLocalIterator函数是一个迭代器,将每个分区的数据返回给driver。这个函数允许以串行的方式一个个分区地迭代整个数据集:

collectDF.toLocalIterator()

但是一般情况下应该避免collect()和toLocalIterator()等数据全部拉取到driver的操作,当数据集很大时可能会导致driver端OOM。

三、处理不同的数据类型

20. 在源码中,DataFrame本质上就是一个Row类型的Dataset所以最终查看的就是Dataset的方法。Dataset的子模块如DataFrameStatFunctions和DataFrameNaFunctions有更多解决具体问题的方法,例如DataFrameStatFunctions包含许多统计相关的函数,而DataFrameNaFunctions包含处理空值相关的函数。

org.apache.spark.sql.functions包含针对一系列不同数据类型的各种函数方法。因为这些函数方法经常被用到,所以这个包经常被整个导入代码中。可以在这里找到SQL和DataFrame 的函数方法。

上述这么多的函数方法会感觉茫然无措,但是不必担心,因为大部分函数可以在SQL和解析系统中找到。这些工具都是将一种数据格式或结构的数据行转换为另一种数据格式或结构的数据行,可能会导致行数的增减。首先读取例子数据以便后续的分析使用:

val df = spark.read.format("csv")
    .option("header","true")
    .option("inferSchema","true")
    .load("/data/retail-data/by-day/2010-12-01.csv")
df.printSchema()
df.createOrReplaceTempView("dfTable")

以下是结果和一小部分数据示例:

要做的一件事是将原始类型转换成Spark类型,要使用lit函数来实现这一点,该函数将其他语言的类型转换为与其相对应的Spark表示,下面将几种不同类型的Scala和Python值转换为对应的Spark类型数据:

// in Scala
import org.apache.spark.sql.functions.lit
df.select(lit(5),lit("five"),lit(5.0))
# in Python
from pyspark.sql.functions import lit
df.select(lit(5),lit("five"),lit(5.0))

21. 接下来基于上面导入的零售数据来说明处理布尔类型的方法,可以在其中指定等于、小于、或大于:

import org.apache.spark.sql.functions.col
df.where(col("InvoiceNo").equalTo(536365))
  .select("InvoiceNo","Description")
  .show(5,false)

Scala有一些关于==和===用法的特殊语义。在Spark中,如果想通过相等条件来进行过滤,应该使用===(等于)或者=!=(不等于)符号,还可以使用not函数和equalTo方法来实现,如下所示:

import org.apache.spark.sql.functions.col
df.where(col("InvoiceNo") =!= 536365)
  .select("InvoiceNo","Description")
  .show(5,false)

输出结果如下所示:

另外一种方法是使用字符串形式的谓词表达式(可能是最简洁的方法),Python或Scala支持这种方法。这里使用了另一种表达“不等于”的方法:

df.where("InvoiceNo = 536365")
  .show(5,false)
df.where("InvoiceNo <> 536365")
  .show(5,false)

之前提到可以使用and或者or将多个Boolean表达式连起来。但是Spark中,最好是以链式连接的方式组合起来,形成顺序执行的过滤器。这样做的原因是因为即使Boolean语句是一个接一个顺序表达的,Spark也会将所有这些过滤器合并为一条语句,并同时执行这些过滤器,创建and语句。虽然说可以在语句中显式地使用and,但如果将它们串起来就更容易理解和阅读。or语句需要在同一语句中指定:

//in Scala
val priceFilter = col("UnitPrice") > 600
val descripFilter = col("Description").contains("POSTAGE")
df.where(col("StockCode").isin("DOT")).where(priceFilter.or(descripFilter))
  .show()
--in SQL
SELECT * FROM dfTable WHERE StockCode in ("DOT") AND(UnitPrice > 600 OR
instr(Description,"POSTAGE") >= 1)

输出结果如下所示:

过滤器不一定非要使用Boolean表达式,要过滤DataFrame,也可以删掉指定一个Boolean布尔类型的:

//in Scala
val DOTCodeFilter = col("StockCode") === "DOT"
val priceFilter = col("UnitPrice") > 600
val descripFilter = col("Description").contains("POSTAGE")
df.withColumn("isExpensive",DOTCodeFilter.and(priceFilter.or(descripFilter)))
  .where("isExpensive")
  .select("unitPrice","isExpensive").show(5)
--in SQL
SELECT UnitPrice,(StockCode = 'DOT' AND
(UnitPrice > 600 OR instr(Description,"POSTAGE") >= 1)) as isExpensive
FROM dfTable
WHERE (StockCode = 'DOT' AND
(UnitPrice > 600 OR instr(Description,"POSTAGE") >= 1))

注意并没有将过滤器设置为一条语句,使用一个列名无需其他工作就可以实现。实际上,将过滤器表示为SQL语句比使用编程式的DataFrame接口更简单,同时Spark SQL实现这点并不会造成性能下降。例如,以下两条语句是等价的:

import org.apache.spark.sql.functions.{expr,not,col}
df.withColumn("isExpensive",not(col("UnitPrice").leq(250)))
  .filter("isExpensive")
  .select("Description","UnitPrice").show(5)
df.withColumn("isExpensive",expr("NOT UnitPrice <= 250"))
  .filter("isExpensive")
  .select("Description","UnitPrice").show(5)

如果数据存在空值,则需要以不同的方式处理布尔表达式。下面这条语句可以保证执行空值安全的等价测试:

df.where(col("Description").eqNullSafe("hello")).show()

22. 在处理大数据时,过滤之后要执行的第二个常见任务是计数。举个例子,假设发现错误地记录了零售数据集中的数量,而真实数量其实等于(当前数量*单位价格)^2 + 5。这需要写pow函数来对指定列进行幂运算:

import org.apache.spark.sql.functions.{expr,pow}
val fabricatedQuantity =pow(col("Quantity") * col("UnitPrice"),2) + 5
df.select(expr("CustomerId"),fabricatedQuantity.alias("realQuantity")).show(2)

输出结果如下所示:

当然也可以使用SQL表达式来实现所有这些操作,如下所示:

df.selectExpr(
    "CustomerId",
    "(POWER((Quantity * UnitPrice),2.0) + 5) as realQuantity").show(2)

另一个常见的数值型操作是四舍五入操作。如果只想四舍五入为一个整数,将数值转换为整型即可,但是Spark中还有更具体的函数来执行某个级别精度的转换。下面的例子中将四舍五入至小数点后一位:

import org.apache.spark.sql.functions.{round,bround}
df.select(round(col("UnitPrice"),1).alias("rounded"),col("UnitPrice")).show(5)

默认情况下,如果恰好位于两个数字之间,则round函数会向上取整,也可以通过bround函数进行向下取整:

//in Scala
import org.apache.spark.sql.functions.lit
df.select(round(lit("2.5")),bround(lit("2.5"))).show(2)
--in SQL
SELECT round(2.5),bround(2.5)

结果输出如下所示:

23. 另一个数值型操作就是计算两列的相关性。例如可以通过两列的Pearson相关系数来查看是否东西越便宜买的就越多。可以通过函数以及DataFrame统计方法实现此操作:

// in Scala
import org.apache.spark.sql.functions.{corr}
df.stat.corr("Quantity","UnitPrice")
df.select(corr("Quantity","UnitPrice")).show()
--in SQL
SELECT corr(Quantity,UnitPrice) FROM dfTable

输出结果如下所示:

另一个常见操作是计算一列或一组列的汇总统计信息,可以用describe方法实现。它会计算所有数值列的计数、均值、标准差、最小值和最大值,如下所示:

df.describe().show()

输出结果如下表所示:

如果需要这些精确的数字,也可以import函数并在所需列上应用来实现聚合操作:

import org.apache.spark.sql.functions.{count,mean,stddev_pop,min,max}

StatFunctions包中封装了许多可供使用的统计函数(可以使用stat来访问),比如可以使用approxQuantile方法来计算数据的精确分位数或近似分位数:

val colName = "UnitPrice"
val quantileProbs = Array(0.5)
val relError = 0.05
df.stat.approxQuantile("UnitPrice",quantileProbs,relError) // 2.51

也可以使用它来查看交叉列表或频繁项对,如下所示:

df.stat.crosstab("StockCode","Quantity").show()
df.stat.freqItems(Seq("StockCode","Quantity")).show()

最后一点,还可以使用monotonically_increasing_id函数为每行添加一个唯一的ID。它会从0开始,为每行生成一个唯一值:

import org.apache.spark.sql.functions.monotonically_increasing_id
df.select(monotonically_increasing_id()).show(2)

24. 字符串操作几乎在每个数据流中都有,可能会对正在操作的日志文件执行正则表达式提取或替换,或者检查其中是否包含简单的字符串,或者使所有字符串都变成大写或小写。先从字符串大小写转换开始,Initcap函数会将给定字符串中空格分隔的每个单词首字母大写,如下所示:

// in Scala
import org.apache.spark.sql.functions.{initcap}
df.select(initcap(col("Description"))).show(2,false)
--in SQL
SELECT initcap(Description) FROM dfTable

输出结果如下所示:

正如刚提到的,还可以将字符串转为大写或小写:

// in Scala
import org.apache.spark.sql.functions.{lower,upper}
df.select(col("Description"), 
          lower(col("Description")), 
          upper(lower(col("Description")))).show(2)
--in SQL
SELECT Description,lower(Description),Upper(lower(Description)) FROM dfTable

输出结果如下所示:

另一个简单的任务是删除字符串周围的空格或者在其周围添加空格,可以使用lpad、ltrim、rpad 以及rtrim、trim来实现:

// in Scala
import org.apache.spark.sql.functions.{lit,ltrim,rtrim,rpad,lpad,trim}
df.select(
          ltrim(lit(" HELLO ")).as("ltrim"), 
          rtrim(lit(" HELLO ")).as("rtrim"), 
          trim(lit(" HELLO ")).as("trim"), 
          lpad(lit("HELLO"),3," ").as("lp"), 
          rpad(lit("HELLO"),10," ").as("rp")).show(2)
--in SQL
SELECT ltrim(' HELLLOOOO '), 
       rtrim(' HELLLOOOO '), 
       trim(' HELLLOOOO '), 
       lpad('HELLOOOO ',3,' '), 
       rpad('HELLOOOO ',10,' ') 
FROM dfTable

输出结果如下所示:

需要注意的是,如果lpad或rpad方法输入的数值参数小于字符串长度,它将从字符串的右侧删除字符。

25. 最常见的任务之一是在一个字符串中搜索子串,替换被选中的字符串等。想要执行正则表达式操作,需要用到Spark中两个关键函数:regexp_extract和regexp_replace,这两个函数分别用于提取值和替换值。接下来说明如何使用regexp_replace函数来替换掉Description列中的颜色名:

// in Scala
import org.apache.spark.sql.functions.regexp_replace
val simpleColors = Seq("black", "white", "red", "green", "blue")
val regexString = simpleColors.map(_.toUpperCase).mkString("|")
// “|”在正则表达式中是“或“的意思
df.select(regexp_replace(col("Description"), regexString, "COLOR").alias("color_clean"), col("Description")).show(2)
--in SQL
SELECT regexp_replace(Description, 'BLACK|WHITE|RED|GREEN|BLUE', 'COLOR') as color_clean, 
       Description
FROM dfTable

输出结果如下所示:

另一个任务是用其他字符替换给定的字符。构建正则表达式来实现该操作可能会有些冗长,所以Spark还提供了translate函数来实现该替换操作。这是在字符级上完成的操作,并用于给定字符串中替换掉所有出现的某字符串:

// in Scala
import org.apache.spark.sql.functions.translate
df.select(translate(col("Description"), "LEET", "1337"), col("Description")).show(2)
--in SQL
SELECT translate(Description, 'LEET', '1337'), Description FROM dfTable

输出结果为:

也可以执行其他类似的任务,比如取出第一个被提到的颜色:

// in Scala
import org.apache.spark.sql.functions.regexp_extract
val regexString = simpleColors.map(_.toUpperCase).mkString("(", "|", ")")
// “|”是正则表达式中的“或“的意思
df.select(
    regexp_extract(col("Description"), regexString, 1).alias("color_clean"), 
    col("Description")).show(2)
--in SQL
SELECT regexp_extract(Description, '(BLACK|WHITE|RED|GREEN|BLUE)', 1), 
       DescriptionFROM dfTable

输出结果如下所示:

26. 有时并不是要提取字符串,而是只想检查它们是否存在。此时可以在每列上用contains方法来实现这个操作。该方法将返回一个布尔值,它表示指定的值是否在该列的字符串中:

val containsBlack = col("Description").contains("BLACK")
val containsWhite = col("DESCRIPTION").contains("WHITE")
df.withColumn("hasSimpleColor", containsBlack.or(containsWhite))
  .where("hasSimpleColor")
  .select("Description").show(3, false)

SQL中可以使用instr函数,如下所示:

SELECT Description FROM dfTable
WHERE instr(Description, 'BLACK') >= 1 OR instr(Description, 'WHITE') >= 1

输出结果如下所示:

仅仅两个参数值时看起来很简单,但当有很多值时会变的更复杂。利用Spark可以接收不定数量参数的能力,来解决这个问题。当将一列值转换为一组参数并将它们传递到函数中时使用了var args,这可以有效地解析任意长度的数组,并将它作为参数传递给函数。结合select方法,就可以动态地创建任意数量的列:

val simpleColors = Seq("black","white","red","green","blue")
val selectedColumns = simpleColors.map(color => {
    col("Description").contains(color.toUpperCase).alias(s"is_$color")
}):+expr("*") // 也可以添加该值
df.select(selectedColumns:_*).where(col("is_white").or(col("is_red")))
  .select("Description").show(3,false)
+----------------------------------+
|Description |
+----------------------------------+
|WHITE HANGING HEART T-LIGHT HOLDER|
|WHITE METAL LANTERN |
|RED WOOLLY HOTTIE WHITE HEART. |
+----------------------------------+

27. 当设置inferSchema为true的时候Spark可以自动推理出日期和时间戳数据类型。常见的难题是Spark的TimestampType类只支持二级精度,这意味着如果要处理毫秒或微秒,可能需要将数据作为long类型操作才能解决该问题。在强制转换为TimestampType时,任何更高的精度都被删除。到目前为止Spark仍在使用Java 日期和时间戳,因此要确保符合这些标准。先从基础知识开始,获取当前日期和当前时间戳:

import org.apache.spark.sql.functions.{current_date,current_timestamp}
val dateDF = spark.range(10)
    .withColumn("today",current_date())
    .withColumn("now",current_timestamp())
dateDF.createOrReplaceTempView("dateTable")

该代码的结果如下所示:

|--id: long (nullable = false) |--today: date (nullable = false) |--now: timestamp (nullable = false)

现在有一个简单的DataFrame 可以使用,从今天起增加和减去5天,这些函数读取一列,然后将添加或减去的天数作为参数:

// in Scala
import org.apache.spark.sql.functions.{date_add,date_sub}
dateDF.select(date_sub(col("today"),5),date_add(col("today"),5)).show(1)
--in SQL
SELECT date_sub(today,5),date_add(today,5) FROM dateTable
+------------------+------------------+
|date_sub(today,5)|date_add(today,5)|
+------------------+------------------+
| 2017-06-12| 2017-06-22|
+------------------+------------------+

另一项常见任务是查看两个日期之间的间隔时间。可以使用datediff函数来完成,该函数将返回两个日期之间的天数。大多数情况下只关心天数,由于每个月的天数不同,还有一个months_between函数,它可以给出两个日期之间相隔的月数:

import org.apache.spark.sql.functions.{datediff,months_between,to_date}
dateDF.withColumn("week_ago",date_sub(col("today"),7))
      .select(datediff(col("week_ago"),col("today"))).show(1)
dateDF.select(
        to_date(lit("2016-01-01")).alias("start"),
        to_date(lit("2017-05-22")).alias("end"))
      .select(months_between(col("start"),col("end"))).show(1)
+-------------------------+
|datediff(week_ago,today)|
+-------------------------+
| -7|
+-------------------------+
+--------------------------+
|months_between(start,end)|
+--------------------------+
| -16.67741935|
+--------------------------+

这里引入了一个新函数to_date()。该函数以指定的格式将字符串转换为日期数据。如果使用这个函数则要在Java SimpleDateFormat中指定想要的格式,这一步非常重要:

import org.apache.spark.sql.functions.{to_date,lit}
spark.range(5).withColumn("date",lit("2017-01-01"))
     .select(to_date(col("date"))).show(1)

28. 如果Spark无法解析日期,它不会抛出错误而只是返回null。如果想获取某种格式的日期数据,再将其转化成另一种格式,可能就带来麻烦。为了解释这一点,来看看从“年-月-日”切换到到“年-日-月”的日期格式。Spark将无法解析此日期,并默认返回null:

dateDF.select(to_date(lit("2016-20-12")),to_date(lit("2017-12-11"))).show(1)
+-------------------+-------------------+
|to_date(2016-20-12)|to_date(2017-12-11)|
+-------------------+-------------------+
| null| 2017-12-11|
+-------------------+-------------------+

发现这是一个比较棘手的bug,因为某些日期可能与正确的格式相匹配,而某些日期则可能不匹配。在上面示例中,第二个日子显示为12月11日,而不是正确日期11月12日。Spark不会抛出错误,因为它无法知道是日期格式用错了,还是这行数据本身就是不正确的。

接下来逐步修复这个操作,并提出一个可靠的方法来完全避免这些问题。第一步,记住需要根据Java SimpleDateFormat 标准指定日期格式,将使用两个函数来解决此问题:to_date和to_timestamp。前者可选择一种日期格式,而后者则强制要求使用一种日期格式:

// in Scala
import org.apache.spark.sql.functions.to_date
val dateFormat = "yyyy-dd-MM"
val cleanDateDF = spark.range(1).select(
    to_date(lit("2017-12-11"), dateFormat).alias("date"), 
    to_date(lit("2017-20-12"), dateFormat).alias("date2"))
cleanDateDF.createOrReplaceTempView("dateTable2")
--in SQL
SELECT to_date(date, 'yyyy-dd-MM'), 
       to_date(date2, 'yyyy-dd-MM'), 
       to_date(date)
FROM dateTable2
+----------+----------+
| date| date2|
+----------+----------+
|2017-11-12|2017-12-20|
+----------+----------+

现在举一个to_timestamp的例子,它要求指定一种格式:

// in Scala
import org.apache.spark.sql.functions.to_timestamp
cleanDateDF.select(to_timestamp(col("date"),dateFormat)).show()
--in SQL
SELECT to_timestamp(date,'yyyy-dd-MM'),to_timestamp(date2,'yyyy-dd-MM')
FROM dateTable2

日期和时间戳之间的转换在所有语言中都很简单——在SQL 中,可以按以下方式实现此操作:

SELECT cast(to_date("2017-01-01","yyyy-dd-MM") as timestamp)

在以正确的格式和类型获取了日期或时间戳之后,它们之间的比较实际上很简单,只需要确保使用同一种日期/时间戳类型格式,或者根据yyyy-MM-dd这种正确格式来指定字符串:

cleanDateDF.filter(col("date2") > lit("2017-12-12")).show()

因此,使用隐式类型转换可能会在无意中搬起石头砸自己的脚,尤其是在处理null值或不同时区或格式的日期时,建议使用显式的类型转换,不要采取隐式转换的方法

29. 实践中,建议始终使用null来表示DataFrame中缺少或空的数据。相较于使用空字符串或其他值来说,使用null值更有利于Spark进行优化。基于DataFrame处理null值主要的方式是使用.na子包,还有一些用于执行操作并显式指定Spark应如何处理null值的函数。显式处理空值要比隐式处理要好一些。例如将列定义为允许null类型,这会带来一个问题,当声明列没有空值时,这并不是实际意义上的强制无空值,此时Spark不会强制拒绝空值插入如果在不该有空值的列中有空值,则可能会得到不正确的结果或很奇怪的表达式,这给调试带来很大困难。

对于null值可以执行以下两项操作: 显式删除null值,也可以用某实值来代替空值。例如合并,通过使用coalesce函数实现从一组列中选择第一个非空值。在下面例子中,因为没有null值,所以它只是返回第一列:

import org.apache.spark.sql.functions.coalesce
df.select(coalesce(col("Description"),col("CustomerId"))).show()

还有一些其他SQL函数可用于实现类似的操作。Ifnull()的功能是如果第一个值为空,则允许选择第二个值,并将其默认为第一个。或者可以使用nullif(),如果两个值相等,则返回null,否则返回第二个值。nvl()功能是如果第一个值为null,则返回第二个值,否则返回第一个。最后nvl2()功能是如果第一个不为null,返回第二个值;否则它将返回最后一个指定值(下面示例中的else_value):

SELECT
ifnull(null, 'return_value'), 
nullif('value', 'value'), 
nvl(null, 'return_value'), 
nvl2('not_null', 'return_value', "else_value")
FROM dfTable LIMIT 1

drop是最简单的函数,它用于删除包含null的行,默认删除包含null值的行:

df.na.drop()
df.na.drop("any")

在SQL中,必须逐列进行:

SELECT * FROM dfTable WHERE Description IS NOT NULL

若指定“any”作为参数,当存在一个值是null时,就删除改行;若指定“all”为参数,只有当所有的值为null或者NaN时才能删除该行:

df.na.drop("all")

也可以通过指定某几列,来对这些列进行删除空值操作:

df.na.drop("all", Seq("StockCode","InvoiceNo"))

30. fill函数可以用一组值填充一列或多列,它可以通过指定一个映射(即一个特定值和一组列)来完成此操作。例如,要替换某字符串类型列中的所有null值为某一字符串,可以指定以下内容:

df.na.fill("All Null values become this string")

对于Integer类型的列,可以使用df.na.fill(5:Integer)来实现;对于Doubles类型的列,则使用df.na.fill(5:Double)。想要指定多列,需传入一个列名的数组,如同前面的例子中所示:

df.na.fill(5, Seq("StockCode","InvoiceNo"))

还可以使用Scala的Map映射来实现,其中主键是列名,而值是想用来替换null的值:

val fillColValues = Map("StockCode" -> 5, "Description" -> "No Value")
df.na.fill(fillColValues)

除了像使用drop和fill函数来替换null 值之外,还有不单单针对空值的灵活操作。最常见的用例是根据当前值替换掉某列中的所有值,唯一的要求是替换值与原始值的类型相同:

df.na.replace("Description", Map("" -> "UNKNOWN"))

当然也可以使用asc_nulls_first,desc_nulls_first,asc_nulls_last或desc_nulls_last,来指定希望null值出现在有序DataFrame中的位置。

31. 可以把结构体视为DataFrame中的DataFrame,下面这个例子帮助更清晰的理解这一点。通过在查询中用圆括号括起一组列来创建一个结构体:

import org.apache.spark.sql.functions.struct
val complexDF = df.select(struct("Description", "InvoiceNo").alias("complex"))
complexDF.createOrReplaceTempView("complexDF")

现在有一个包含complex列的DataFrame,可以像查询另一个DataFrame一样查询它,唯一的区别是使用“.”来访问或列方法getField来实现:

complexDF.select("complex.Description")
complexDF.select(col("complex").getField("Description"))

还可以使用*来查询结构体中的所有值,这将调出顶层DataFrame的所有列:

//in Scala
df.selectExpr("(Description, InvoiceNo) as complex", "*")
df.selectExpr("struct(Description, InvoiceNo) as complex", "*")
complexDF.select("complex.*")
--in SQL
SELECT complex.* FROM complexDF

还有一种结构化数据是数组。定义数组之前先来看一个用例。使用当前数据,目标是读取Description列中的每个单词并将其转换成DataFrame中的一行。第一个操作是将Description列转换为一个复杂类型,即数组。使用split函数并指定分隔符来执行此操作:

// in Scala
import org.apache.spark.sql.functions.split
df.select(split(col("Description"), " ")).show(2)
--in SQL
SELECT split(Description, ' ') FROM dfTable
+---------------------+
|split(Description,)|
+---------------------+
| [WHITE,HANGING,...|
| [WHITE,METAL,LA...|
+---------------------+

这个功能非常有用,因为Spark允许将这种类型作为一个列来操作,用来查询数组的值:

// in Scala
df.select(split(col("Description"), " ").alias("array_col"))
.selectExpr("array_col[0]").show(2)
--in SQL
SELECT split(Description, ' ')[0] FROM dfTable
+------------+
|array_col[0]|
+------------+
| WHITE|
| WHITE|
+------------+

32. 还可以通过查询数组的大小来确定数组的长度:

import org.apache.spark.sql.functions.size
df.select(size(split(col("Description"), " "))).show(2) // 打印5和3

array_contains可以查询此数组是否包含某个值:

// in Scala
import org.apache.spark.sql.functions.array_contains
df.select(array_contains(split(col("Description"), " "),"WHITE")).show(2)
--in SQL
SELECT array_contains(split(Description, ' '), 'WHITE') FROM dfTable
+--------------------------------------------+
|array_contains(split(Description,),WHITE)|
+--------------------------------------------+
|true|
| true|
+--------------------------------------------+

但若想将复杂类型真正转换为一系列行(数组中的每个值为一行),则需要使用explode函数,它的输入参数为一个包含数组的列,并为该数组中的每个值创建一行(每行重复其他值),如下所示:

// in Scala
import org.apache.spark.sql.functions.{split, explode}
df.withColumn("splitted", split(col("Description"), " "))
  .withColumn("exploded", explode(col("splitted")))
  .select("Description", "InvoiceNo", "exploded").show(2)
--in SQL
SELECT Description, InvoiceNo, exploded
FROM (SELECT *, split(Description, " ") as splitted FROM dfTable)
LATERAL VIEW explode(splitted) as exploded
+--------------------+---------+--------+
| Description|InvoiceNo|exploded|
+--------------------+---------+--------+
|WHITE HANGING HEA...| 536365| WHITE|
|WHITE HANGING HEA...| 536365| HANGING|
+--------------------+---------+--------+

33. Map映射是通过map函数构建两列内容的键值对映射形式。然后便可以像在数组中一样去选择它们:

// in Scala
import org.apache.spark.sql.functions.map
df.select(map(col("Description"), col("InvoiceNo")).alias("complex_map")).show(2)
--in SQL
SELECT map(Description, InvoiceNo) as complex_map FROM dfTable
WHERE Description IS NOTNULL
+--------------------+
| complex_map|
+--------------------+
|Map(WHITE HANGING...|
|Map(WHITE METAL L...|
+--------------------+

可以使用正确的键值(key)对它们进行查询,若键值(key)不存在则返回null:

df.select(map(col("Description"), col("InvoiceNo")).alias("complex_map"))
  .selectExpr("complex_map['WHITE METAL LANTERN']").show(2)
+--------------------------------+
|complex_map[WHITE METAL LANTERN]|
+--------------------------------+
| null|
| 536365|
+--------------------------------+

还可以展开map类型,将其转换成列,如下所示:

df.select(map(col("Description"), col("InvoiceNo")).alias("complex_map"))
  .selectExpr("explode(complex_map)").show(2)
+--------------------+------+
| key| value|
+--------------------+------+
|WHITE HANGING HEA...|536365|
| WHITE METAL LANTERN|536365|
+--------------------+------+

34. Spark对处理JSON 数据有一些独特的支持,比如可以在Spark中直接操作JSON字符串,并解析JSON或提取JSON对象。首先创建一个JSON类型的列:

val jsonDF = spark.range(1).selectExpr("""
    '{"myJSONKey" : {"myJSONValue" : [1, 2, 3]}}' as jsonString""")

无论是字典还是数组,均可以使用get_json_object直接查询JSON对象。如果此查询的JSON对象仅有一层嵌套,则可使用json_tuple:

import org.apache.spark.sql.functions.{get_json_object, json_tuple}
jsonDF.select(
    get_json_object(col("jsonString"), "$.myJSONKey.myJSONValue[1]") as "column", 
    json_tuple(col("jsonString"), "myJSONKey")).show(2)

以下是SQL中的等价表示:

jsonDF.selectExpr(
    "json_tuple(jsonString, '$.myJSONKey.myJSONValue[1]') as column").show(2)

输出结果如下所示:

+------+--------------------+
|column| c0|
+------+--------------------+
| 2|{"myJSONValue":[1...|
+------+--------------------+

还可以使用to_json函数将StructType转换为JSON字符串:

import org.apache.spark.sql.functions.to_json
df.selectExpr("(InvoiceNo, Description) as myStruct")
  .select(to_json(col("myStruct")))

这个函数也可以接受参数的映射作为输入,这些映射与JSON的数据源相同。还可以使用from_json函数将JSON数据解析出来。这需要指定一个模式(Schema),也可以指定其他的映射:

import org.apache.spark.sql.functions.from_json
import org.apache.spark.sql.types._
val parseSchema = new StructType(Array(
new StructField("InvoiceNo", StringType, true), 
new StructField("Description", StringType, true)))
df.selectExpr("(InvoiceNo, Description) as myStruct")
  .select(to_json(col("myStruct")).alias("newJSON"))
  .select(from_json(col("newJSON"), parseSchema), col("newJSON")).show(2)
+----------------------+--------------------+
|jsontostructs(newJSON)| newJSON|
+----------------------+--------------------+
| [536365,WHITE HAN...|{"InvoiceNo":"536...|
| [536365,WHITE MET...|{"InvoiceNo":"536...|
+----------------------+--------------------+

35. Spark最强大的功能之一就是用户自定义函数(UDF),让用户可以使用Python或Scala 编写自己的自定义转换操作,甚至可以使用外部库。UDF可以将一个或多个列作为输入,同时也可以返回一个或多个列。这些函数只是描述了(一个接一个地)处理数据记录的方法。默认情况下,这些函数被注册为SparkSession或者Context的临时函数。第一步是设计一个实际的函数,用一个简单例子来说明,编写一个power3函数,该函数接收一个数字并返回它的三次幂:

val udfExampleDF = spark.range(5).toDF("num")
def power3(number:Double):Double = number * number * number
power3(2.0)

但是目前这个UDF函数的输入要求很高:它必须是特定类型(数值类型),并且不能为null。现在已经创建了这些函数并对它们进行了测试,接下来需要在Spark上注册UDF以便在所有的worker机器上使用它们。Spark将在driver进程上序列化该函数,并将它通过网络传递到所有executor进程

当使用该UDF函数时,基本上有两种不同的情况发生:如果该UDF函数是用Scala或Java编写的,则可以在JVM中使用它,这意味着不能使用spark为内置函数提供的代码生成能力,或导致性能的一些下降。如果函数是用Python编写的,则会出现截然不同的情况,Spark在worker上启动一个Python进程,将所有数据序列化为Python可解释的格式(注意数据之前在JVM 中),在Python进程中对该数据逐行执行UDF,最终将对每行的操作结果返回给JVM 和Spark,如下图所示:

但需要注意的是,启动此Python进程代价很高,但主要代价是将数据序列化为Python可理解的格式的这个过程。造成代价高的原因有两个:一个是计算昂贵,另一个是数据进入Python后Spark无法管理worker的内存。这意味着,如果某个worker因资源受限而失败(因为JVM和Python都在同一台机器上争夺内存),则可能会导致该worker出现故障。所以建议除了数据挖掘、机器学习等用pyspark,其他尽量使用Scala或Java编写UDF,不仅编写程序的时间少,还能提高性能

36. 现在来实现一个例子。首先需要注册该函数,以便其可用作DataFrame函数:

import org.apache.spark.sql.functions.udf
val power3udf = udf(power3(_:Double):Double)

可以像使用其他DataFrame函数一样使用UDF,如下所示:

udfExampleDF.select(power3udf(col("num"))).show()
+-----------+
|power3(num)|
+-----------+
+-----------+

此时只能将它用作DataFrame函数。也就是说不能在字符串表达式中使用它。但是也可以将此UDF注册为SparkSQL函数。这种做法很有用,因为它使得能在SQL语言中以及跨语言环境下使用此函数。接下来在Scala中注册该函数:

spark.udf.register("power3", power3(_:Double):Double)
udfExampleDF.selectExpr("power3(num)").show(2)

为了确保UDF函数正常工作,还要做的一件事是指定返回类型。Spark管理它自己的类型信息,它不完全与Python的类型相一致。因此最好的做法是在定义函数时定义该函数的返回值类型。指定返回值类型不是必需的,但建议指定。如果指定的返回值类型与函数返回的实际类型不匹配,则Spark不会抛出错误,但会返回null以表明失败。如果将以下函数中的返回值类型改为Double型,则可以看到返回null的情况:

# in Python
from pyspark.sql.types import IntegerType, DoubleType
# registered via Python
spark.udf.register("power3py", power3, DoubleType())
udfExampleDF.selectExpr("power3py(num)").show(2)

这是由于整型的表示范围造成的,当在Python中操作整数时,Python不会将它们转换为浮点型(对应于Spark的double类型),因此会看到返回null。可以确保Python函数返回浮点型而非整型来解决这个问题。在注册后,可以在SQL中使用任一个UDF函数:

SELECT power3(12), power3py(12) --doesn't work because of return type

当想从UDF中选择返回一个值时,应该在Python中返回None,并在Scala中返回一个Option类型。最后,还可以使用Hive语法来创建UDF/UDAF。为了实现这一点,首先必须在创建SparkSession时启用Hive支持(通过SparkSession.builder().enableHiveSupport()来启用)。然后可以在SQL中注册UDF。这仅支持预编译的Scala和Java包,因此需要将它们指定为依赖项:

CREATE TEMPORARY FUNCTION myFunc AS 'com.organization.hive.udf.FunctionName'

此外,还能通过删除TEMPORARY将其注册为Hive Metastore中的永久函数。

四、聚合操作

37. 在聚合操作中一件重要的事是考虑返回结果的精确度。在进行大数据计算时获得一个精确结果的开销会很大,但是计算出一个近似结果相对要容易得多。一些近似函数通常都会提高Spark作业执行速度和效率,特别是对交互式和ad hoc分析。下面的代码首先读取零售业的采购数据,然后对数据进行重分区以减少分区数量(事先知道仅有少量数据存储在大量小文件里),最后将这些数据缓存起来以便后续的快速访问:

val df = spark.read.format("csv")
    .option("header", "true")
    .option("inferSchema", "true")
    .load("/data/retail-data/all/*.csv")
    .coalesce(5)
df.cache()
df.createOrReplaceTempView("dfTable")

下面是数据样本,以便参考:

+---------+---------+--------------------+--------+--------------+---------+-----
|InvoiceNo|StockCode| Description|Quantity| InvoiceDate|UnitPrice|Cu...
+---------+---------+--------------------+--------+--------------+---------+-----
| 536365| 85123A|WHITE HANGING... | 6|12/1/2010 8:26| 2.55| ...
| 536365| 71053|WHITE METAL... | 6|12/1/2010 8:26| 3.39| ...
| 536367| 21755|LOVE BUILDING BLO...| 3|12/1/2010 8:34| 5.95| ...
| 536367| 21777|RECIPE BOX WITH M...| 4|12/1/2010 8:34| 7.95| ...
+---------+---------+--------------------+--------+--------------+---------+--

基本的聚合操作将作用于整个DataFrame。最简单的例子是count方法:

df.count()==541909

count操作与transformation操作不一样,它是action操作会立即返回计算结果。可以使用count来获得数据集的总体大小,但它还有一个作用是可以缓存整个DataFrame到内存里,当然用count这种方法实现缓存数据有点奇怪,主要是因为在本例中count立即执行,而不像转换操作那样惰性执行。下面还会看见如何把count作为惰性函数来使用,

在这种情况下可以执行以下两项操作之一:第一个是对指定的列进行计数,第二个是使用count(*)或count(1)对所有列进行计数,如下面例子所示:

import org.apache.spark.sql.functions.count
df.select(count("StockCode")).show() // 541909

关于对null值进行计数有一些注意的地方。例如当执行count(*)时,Spark会对null值进行计数,而当对某指定列计数时,则不会对null值进行计数。有时数据的总量不重要,而获得唯一(unique)组的数量才是需要的。要获得唯一组数量,可以使用countDistinct函数,而这个函数仅在统计针对某列的计数时才有意义:

// in Scala
import org.apache.spark.sql.functions.countDistinct
df.select(countDistinct("StockCode")).show() // 4070
--in SQL
SELECT COUNT(DISTINCT *) FROM DFTABLE

但如之前所说,为了避免OOM一般不建议用count distinct语句,而建议用group by

38. 通常在处理大数据集的时候,精确的统计计数并不那么重要,某种精度的近似值也是可以接受的,此时可以使用approx_count_distinct函数:

// in Scala
import org.apache.spark.sql.functions.approx_count_distinct
df.select(approx_count_distinct("StockCode", 0.1)).show() // 3364
--in SQL
SELECT approx_count_distinct(StockCode, 0.1) FROM DFTABLE

注意approx_count_distinct带了另一个参数,该参数指定可容忍的最大误差。本例中指定了一个相当大的误差率,因此得到的答案与正确值差距很大,但执行速度更快,比countDistinct函数执行耗时更少。当处理更大的数据集的时候,这种提升会更加明显。

first和last这两个函数可以得到DataFrame的第一个值和最后一个值,它是基于DataFrame中行的顺序而不是DataFrame中值的顺序

// in Scala
import org.apache.spark.sql.functions.{first, last}
df.select(first("StockCode"), last("StockCode")).show()
--in SQL
SELECT first(StockCode), last(StockCode) FROM dfTable
+-----------------------+----------------------+
|first(StockCode, false)|last(StockCode, false)|
+-----------------------+----------------------+
| 85123A| 22138|
+-----------------------+----------------------+

除了用sum计算总和外,还可以使用sumDistinct函数来对一组去重(distinct)值进行求和:

import org.apache.spark.sql.functions.sumDistinct
df.select(sumDistinct("Quantity")).show() // 29310

Spark还提供了一种通过avg或mean函数获取平均值的方法。在这个例子中使用alias(别名),以便以后更方便地使用这些值:

import org.apache.spark.sql.functions.{sum, count, avg, expr}
df.select(
          count("Quantity").alias("total_transactions"),
          sum("Quantity").alias("total_purchases"),
          avg("Quantity").alias("avg_purchases"),
          expr("mean(Quantity)").alias("mean_purchases"))
  .selectExpr(
              "total_purchases/total_transactions",
              "avg_purchases",
              "mean_purchases").show()
+--------------------------------------+----------------+----------------+
|(total_purchases / total_transactions)| avg_purchases| mean_purchases|
+--------------------------------------+----------------+----------------+
| 9.55224954743324|9.55224954743324|9.55224954743324|
+--------------------------------------+----------------+----------------+

39. Spark既支持统计样本标准差也支持统计总体标准差,它们两个在统计学上是完全不同的概念,一定要区分它们。如果使用variance函数和stddev函数,默认是计算样本样本方差或标准差的。还可以显式指定这些值或引用总体标准差或方差:

// in Scala
import org.apache.spark.sql.functions.{var_pop, stddev_pop}
import org.apache.spark.sql.functions.{var_samp, stddev_samp}
df.select(var_pop("Quantity"), var_samp("Quantity"),
stddev_pop("Quantity"), stddev_samp("Quantity")).show()
--in SQL
SELECT var_pop(Quantity), var_samp(Quantity),
stddev_pop(Quantity), stddev_samp(Quantity)
+------------------+------------------+--------------------+-------------------+
| var_pop(Quantity)|var_samp(Quantity)|stddev_pop(Quantity)|stddev_samp(Quan...|
+------------------+------------------+--------------------+-------------------+
|47559.303646609056|47559.391409298754| 218.08095663447796| 218.081157850...|
+------------------+------------------+--------------------+-------------------+

偏度系数(skewness)和峰度系数(kurtosis)都是对数据集中的极端数据点的衡量指标。偏度系数衡量数据相对于平均值的不对称程度,而峰度系数衡量数据分布形态陡缓程度。在将数据建模为随机变量的概率分布时,它们都很重要。使用以下函数可以计算偏度和峰度:

import org.apache.spark.sql.functions.{skewness, kurtosis}
df.select(skewness("Quantity"), kurtosis("Quantity")).show()
+-------------------+------------------+
| skewness(Quantity)|kurtosis(Quantity)|
+-------------------+------------------+
|-0.2640755761052562|119768.05495536952|
+-------------------+------------------+

前面讨论了单列聚合,不过有的函数是去比较两个不同列的值之间的相互关系。其中两个函数就是cov和corr,它们分别用于计算协方差和相关性。相关性采用Pearson相关系数来衡量,范围是-1到+1。协方差的范围由数据中的输入决定。跟var函数一样,协方差又分为样本协方差和总体协方差,因此在使用的时候需要指定。相关性没有这个概念,因此没有总体或样本的相关性之分。以下是它们的使用方式:

import org.apache.spark.sql.functions.{corr, covar_pop, covar_samp}
df.select(corr("InvoiceNo", "Quantity"), covar_samp("InvoiceNo", "Quantity"),
          covar_pop("InvoiceNo", "Quantity")).show()
+-------------------------+-------------------------------+---------------------+
|corr(InvoiceNo, Quantity)|covar_samp(InvoiceNo, Quantity)|covar_pop(InvoiceN...|
+-------------------------+-------------------------------+---------------------+
| 4.912186085635685E-4| 1052.7280543902734| 1052.7...|
+-------------------------+-------------------------------+---------------------

40. 在Spark中不仅可以在数值型上执行聚合操作,还能在复杂类型上执行聚合操作。例如,可以收集某列上的值到一个list列表里,或者将unique唯一值收集到一个set集合里。用户可以在流水线处理的后续操作中再访问该集合,或者将整个集合传递给UDF:

import org.apache.spark.sql.functions.{collect_set, collect_list}
df.agg(collect_set("Country"), collect_list("Country")).show()
+--------------------+---------------------+
|collect_set(Country)|collect_list(Country)|
+--------------------+---------------------+
|[Portugal, Italy,...| [United Kingdom, ...|
+--------------------+---------------------+

到目前为止还只在DataFrame级别上进行聚合操作,更常见的任务是根据分组数据进行计算,典型应用是处理类别数据,根据某一列中的数据进行分组,然后基于分组情况来对其他列的数据进行计算。接下来执行一些分组操作,首先执行计数操作,按每个唯一的invoice编号进行分组并获取该invoice上的项目数。注意这将返回另一个DataFrame并会延迟执行。

可以分两个阶段进行分组:首先指定要进行分组的一列或多列,然后指定一个或多个聚合操作。第一步返回一个RelationalGroupedDataset,第二步返回DataFrame,可以指定任意数量的列进行分组,如下所示:

//in Scala
df.groupBy("InvoiceNo", "CustomerId").count().show()
--in SQL
SELECT count(*) FROM dfTable GROUP BY InvoiceNo, CustomerId

正如前面提到计数有点特殊,因为它作为一种方法存在。可以不用将该函数作为表达式传递到select语句中,而是在agg中指定它。这使得仅需使用agg(),里面即可传入任意表达式,甚至可以在转换某列之后给它取别名,以便在之后的数据流处理中使用:

import org.apache.spark.sql.functions.count
df.groupBy("InvoiceNo").agg(
    count("Quantity").alias("quan"),
    expr("count(Quantity)")).show()
+---------+----+---------------+
|InvoiceNo|quan|count(Quantity)|
+---------+----+---------------+
| 536596| 6| 6|
| C542604| 8| 8|
+---------+----+---------------+

有时将transformation操作指定为一系列Map会更方便,其中键(key)为列,值为要执行的字符串形式聚合函数。如果以inline方式指定也可以重用多个列名:

// in Scala
df.groupBy("InvoiceNo").agg("Quantity"->"avg", "Quantity"->"stddev_pop").show()
--in SQL
SELECT avg(Quantity), stddev_pop(Quantity), InvoiceNo FROM dfTable
GROUP BY InvoiceNo
+---------+------------------+--------------------+
|InvoiceNo| avg(Quantity)|stddev_pop(Quantity)|
+---------+------------------+--------------------+
| 536596| 1.5| 1.1180339887498947|
| C542604| -8.0| 15.173990905493518|
+---------+------------------+--------------------+

41. 还可以使用window函数来执行某些特殊的聚合操作,具体就是在指定数据“窗口”上执行聚合操作,并使用对当前数据的引用来定义它,此窗口指定将哪些行传递给此函数。这么说有些抽象,它有点类似于一个标准的group by,所以来稍微对它们进行区分。

在用group by处理数据分组时,每一行只能进入一个分组。窗口函数基于称为框(frame)的一组行,计算表的每一输入行的返回值,每一行可以属于一个或多个框。常见用例就是查看某些值的滚动平均值,其中每一行代表一天,那么每行属于7个不同的框(某一周为一个frame)。稍后会定义框,Spark支持三种窗口函数:排名函数、解析函数和聚合,如下图所示:

为了更好演示,将添加一个date列,该列将发票日期转换为仅包含日期信息(不包括时间信息)的列:

import org.apache.spark.sql.functions.{col, to_date}
val dfWithDate = df.withColumn("date", to_date(col("InvoiceDate"),
                                               "MM/d/yyyy H:mm"))
dfWithDate.createOrReplaceTempView("dfWithDate")

配置窗口函数的第一步是创建一个窗口规范。请注意partition by与目前为止所接触的分组概念无关,它只是描述如何进行分区的一个类似概念。排序语句指定了在一个分区内如何对数据排序,最后的rowsBetween语句指定了frame配置。在本例中,设置了当前输入行之前的所有行都包含在这个frame里:

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.col
val windowSpec = Window
    .partitionBy("CustomerId", "date")
    .orderBy(col("Quantity").desc)
    .rowsBetween(Window.unboundedPreceding, Window.currentRow)

现在,使用聚合函数来了解有关每个特定客户的更多信息。一个例子是计算一个客户有史以来的最大购买数量,为了获得该结果,可以使用之前介绍的聚合函数,并将某一列名或表达式作为输入参数。此外还指明了使用某个具体的窗口规范,它定义了此函数将应用于哪些frame:

import org.apache.spark.sql.functions.max
val maxPurchaseQuantity = max(col("Quantity")).over(windowSpec)

这将返回一列(或表达式)。现在可以在DataFrame的select语句中使用它。不过在这样做之前,先创建购买数量排名,使用dense_rank函数来确定每个用户在哪天购买数量最多。使用dense_rank而不是rank,是为了避免在有等值(在该例子中是重复行)的情况下避免排序结果不连续:

import org.apache.spark.sql.functions.{dense_rank, rank}
val purchaseDenseRank = dense_rank().over(windowSpec)
val purchaseRank = rank().over(windowSpec)

该代码会返回可在select语句中使用的列。可以执行select来查看计算出的窗口值:

// in Scala
import org.apache.spark.sql.functions.col
dfWithDate.where("CustomerId IS NOT NULL").orderBy("CustomerId")
.select(
        col("CustomerId"),
        col("date"),
        col("Quantity"),
        purchaseRank.alias("quantityRank"),
        purchaseDenseRank.alias("quantityDenseRank"),
        maxPurchaseQuantity.alias("maxPurchaseQuantity")).show()
--in SQL
SELECT CustomerId, date, Quantity,
rank(Quantity) OVER (PARTITION BY CustomerId, date
                     ORDER BY Quantity DESC NULLS LAST
                     ROWS BETWEEN
                     UNBOUNDED PRECEDING AND
                     CURRENT ROW) as rank,
dense_rank(Quantity) OVER (PARTITION BY CustomerId, date
                     ORDER BY Quantity DESC NULLS LAST
                     ROWS BETWEEN
                     UNBOUNDED PRECEDING AND
                     CURRENT ROW) as dRank,
max(Quantity) OVER (PARTITION BY CustomerId, date
                    ORDER BY Quantity DESC NULLS LAST
                    ROWS BETWEEN
                    UNBOUNDED PRECEDING AND
                    CURRENT ROW) as maxPurchase
FROM dfWithDate WHERE CustomerId IS NOT NULL ORDER BY CustomerId
+----------+----------+--------+------------+-----------------+---------------+
|CustomerId| date|Quantity|quantityRank|quantityDenseRank|maxP...Quantity|
+----------+----------+--------+------------+-----------------+---------------+
| 12346|2011-01-18| 74215| 1| 1| 74215|
| 12346|2011-01-18| -74215| 2| 2|74215|
| 12347|2010-12-07| 36| 1| 1| 36|
| 12347|2010-12-07| 30| 2| 2| 36|
| 12347|2010-12-07| 12| 4| 4| 36|
| 12347|2010-12-07| 6| 17| 5| 36|
| 12347|2010-12-07| 6| 17| 5| 36|
+----------+----------+--------+------------+-----------------+---------------+

42. 配合group by表达式可以对一组列上的值进行聚合操作。但是在某些情况下需要更完善的功能,比如跨多个组的聚合操作。这能通过分组集(Grouping Set)来实现。分组集是用于将多组聚合操作组合在一起的底层工具,使得能够在group by语句中创建任意的聚合操作。来通过一个例子更好的理解它,希望获得所有用户各种股票的数量,为此使用以下SQL表达式:

// in Scala
val dfNoNull= dfWithDate.drop()
dfNoNull.createOrReplaceTempView("dfNoNull")
--in SQL
SELECT CustomerId, stockCode, sum(Quantity) FROM dfNoNull
GROUP BY customerId, stockCode
ORDER BY CustomerId DESC, stockCode DESC

可以使用grouping set实现完全相同的操作,如下所示:

SELECT CustomerId, stockCode, sum(Quantity) FROM dfNoNull
GROUP BY customerId, stockCode GROUPING SETS((customerId, stockCode))
ORDER BY CustomerId DESC, stockCode DESC

输出结果如下所示:

+----------+---------+-------------+
|CustomerId|stockCode|sum(Quantity)|
+----------+---------+-------------+
| 18287| 85173| 48|
| 18287| 85040A| 48|
| 18287| 85039B| 120|
| 18287| 23269| 36|
+----------+---------+-------------+

但需要注意的是,Grouping Set取决于聚合级别的null值。如果不过滤空值,则会得到不正确的结果。包括cube、rollup和grouping set都是这样。上面这个任务很简单,但是如果还想要统计股票总数,而不区分客户和股票,使用传统group by语句是不可能实现的,但是使用grouping set将会很简单。用户仅需要在grouping set中指定所希望执行聚合操作的级别,其实grouping set就是实现了将各种分组统计得到的结果union在一起:

SELECT CustomerId, stockCode, sum(Quantity) FROM dfNoNull
GROUP BY customerId, stockCode GROUPING SETS((customerId, stockCode),())
ORDER BY CustomerId DESC, stockCode DESC
+----------+---------+-------------+
|customerId|stockCode|sum(Quantity)|
+----------+---------+-------------+
| 18287| 85173| 48|
| 18287| 85040A| 48|
| 18287| 85039B| 120|
| 18287| 23269| 36|
+----------+---------+-------------+

43. GROUPING SETS操作仅在SQL中可用。若想在DataFrame中执行相同的操作,使用rollup和cube操作可以得到完全相同的结果,接下来看看如何使用这两个操作。

rollup分组聚合是一种多维聚合操作,可以执行group by维度范围从右到左的多个计算,例如group by A,B,C with rollup首先会对(A、B、C)进行group by,然后对(A、B)进行group by,然后是(A)进行group by,最后对全表进行group by操作(没有分组条件,所有的分组字段均为null),最后将结果进行union。接下来根据时间(Date)和地点(Country)来创建一个rollup分组,并且创建一个新的DataFrame,它将包括所有日期交易的总股票数、每个日期交易的所有股票数、以及在每个日期中每个国家产生的股票交易数:

val rolledUpDF = dfNoNull.rollup("Date", "Country").agg(sum("Quantity"))
    .selectExpr("Date", "Country", "`sum(Quantity)` as total_quantity")
    .orderBy("Date")
rolledUpDF.show()
+----------+--------------+--------------+
| Date| Country|total_quantity|
+----------+--------------+--------------+
| null| null| 5176450|
|2010-12-01|United Kingdom| 23949|
|2010-12-01| Germany| 117|
|2010-12-01| France| 449|
|2010-12-03| France| 239|
|2010-12-03| Italy| 164|
|2010-12-03| Belgium| 528|
+----------+--------------+--------------+

每列的null值表示不区分该列的总数(比如Country列为null值表示该日期所有地点的总数),而如果在两列中都是null值则表示所有日期和地点的总数。

上面rollup相当于cube的子集,cube分组聚合则更进一步,是对所有参与的列值进行所有维度的全组合聚合,例如首先对(A、B、C)进行group by,然后依次是(A、B),(A、C),(A),(B、C),(B),( C),最后对全表进行group by操作(没有分组条件,所有的分组字段均为null),最后将结果进行union。也就是说,它不仅基于任一日期对各地点进行汇总聚合,也会基于任一地点对各日期进行汇总聚合。cube分组聚合可以计算如下的分组聚合统计:

(1)在所有日期和所有国家发生的交易总数。

(2)在每个日期发生于所有国家的交易总数。

(3)在每个日期发生于每个国家的交易总数。

(4)在所有日期发生于每个国家的交易总数。

方法调用非常相似,但不是调用rollup而是调用cube这是一个快速简单获得数据表几乎所有汇总信息的好方法,这个汇总信息可以为以后的数据处理继续使用:

dfNoNull.cube("Date", "Country").agg(sum(col("Quantity")))
        .select("Date", "Country", "sum(Quantity)").orderBy("Date").show()
+----+--------------------+-------------+
|Date| Country|sum(Quantity)|
+----+--------------------+-------------+
|null| Japan| 25218|
|null| Portugal| 16180|
|null| Unspecified|3300|
|null| null| 5176450|
|null| Australia| 83653|
|null| Norway| 19247|
|null| Hong Kong| 4769|
|null| Spain| 26824|
|null| Czech Republic| 592|
+----+--------------------+-------------+

44. 有时当使用cube和rollup时,希望能够显示聚合的维度级别,以便可以轻松地找到自己想要的信息。可以使用grouping_id来完成此操作,这会在返回结果集中多增加一列。以下示例中的查询将返回四个不同的分组级别ID:

代码示例如下所示:

import org.apache.spark.sql.functions.{grouping_id, sum, expr}
dfNoNull.cube("customerId", "stockCode").agg(grouping_id(), sum("Quantity"))
        .orderBy(expr("grouping_id()").desc)
        .show()
+----------+---------+-------------+-------------+
|customerId|stockCode|grouping_id()|sum(Quantity)|
+----------+---------+-------------+-------------+
| null| null| 3| 5176450|
| null| 23217| 2| 1309|
| null| 90059E| 2| 19|
+----------+---------+-------------+-------------+

透视转换(pivot,即行列转置)可以根据某列中的不同行转换为多个列。例如在当前数据中有一个Country列,通过一个透视转换可以对每个Country执行聚合操作,并且以易于查看的方式显示它们:

val pivoted = dfWithDate.groupBy("date").pivot("Country").sum()

在使用了透视转换后,现在DataFrame会为每一个Country和数值型列组合产生一个新列,以及之前的date列。例如对于USA,就有USA_sum(Quantity),USA_sum(UnitPrice),USA_sum(CustomerID)这些列,这对应于数据集中的每个数值型列。以下是这个数据的一个示例查询和结果:

pivoted.where("date > '2011-12-05'").select("date" ,"`USA_sum(Quantity)`").show()
+----------+-----------------+
| date|USA_sum(Quantity)|
+----------+-----------------+
|2011-12-06| null|
|2011-12-09| null|
|2011-12-08| -196|
|2011-12-07| null|
+----------+-----------------+

45. 用户自定义聚合函数(UDAF)是用户根据自定义公式或业务逻辑定义自己聚合函数的一种方法。可以使用UDAF来计算输入数据组(与单行相对) 的自定义计算。Spark维护单个AggregationBuffer,它用于存储每组输入数据的中间结果。若要创建UDAF,必须继承UserDefinedAggregateFunction基类并实现以下方法

(1)inputSchema用于指定输入参数,输入参数类型为StructType。

(2)bufferSchema用于指定UDAF中间结果,中间结果类型为StructType。

(3)dataType用于指定返回结果,返回结果的类型为DataType。

(4)deterministic是一个布尔值,它指定此UDAF对于某个输入是否会返回相同的结果。

(5)initialize初始化聚合缓冲区的初始值。

(6)update描述应如何根据给定行更新内部缓冲区。

(7)merge描述应如何合并两个聚合缓冲区。

(8)evaluate将生成聚合最终结果。

下面的例子实现了一个BoolAnd,它将返回(给定列)所有的行是否为true; 如果不是则返回false:

import org.apache.spark.sql.expressions.MutableAggregationBuffer
import org.apache.spark.sql.expressions.UserDefinedAggregateFunction
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
class BoolAnd extends UserDefinedAggregateFunction {
  def inputSchema: org.apache.spark.sql.types.StructType =
    StructType(StructField("value", BooleanType) :: Nil)
  def bufferSchema: StructType = StructType(
    StructField("result", BooleanType) :: Nil
  def dataType: DataType = BooleanType
  def deterministic: Boolean = true
  def initialize(buffer: MutableAggregationBuffer): Unit = {
    buffer(0) = true
  def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
    buffer(0) = buffer.getAs[Boolean](0) && input.getAs[Boolean](0)
  def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
    buffer1(0) = buffer1.getAs[Boolean](0) && buffer2.getAs[Boolean](0)
  def evaluate(buffer: Row): Any = {
    buffer(0)

现在简单地实例化自定义的类,也可以将其注册为一个函数:

val ba = new BoolAnd
spark.udf.register("booland", ba)
import org.apache.spark.sql.functions._
spark.range(1)
     .selectExpr("explode(array(TRUE, TRUE, TRUE)) as t")
     .selectExpr("explode(array(TRUE, FALSE, TRUE)) as f", "t")
     .select(ba(col("t")), expr("booland(f)"))
     .show()
+----------+----------+
|booland(t)|booland(f)|
+----------+----------+
| true| false|
+----------+----------+

UDAF目前仅在Scala或Java中可用。但是在Spark2.3中,还可以通过注册该函数来调用Scala或Jav的UDF和UDAF。