/** Spark SQL源码分析系列文章*/   前几篇文章介绍了Spark SQL的Catalyst的核心运行流程、SqlParser,和Analyzer 以及核心类库TreeNode,本文将详细讲解Spark SQL的Optimizer的优化思想以及Optimizer在Catalyst里的表现方式,并加上自己的实践,对Optimizer有一个直观的认识。   /** Spark SQL源码分析系列文章 */

前几篇文章介绍了 Spark SQL的Catalyst的 核心运行流程 SqlParser ,和 Analyzer 以及核心类库 TreeNode ,本文将详细讲解Spark SQL的Optimizer的优化思想以及Optimizer在Catalyst里的表现方式,并加上自己的实践,对Optimizer有一个直观的认识。

Optimizer的主要职责是将Analyzer给Resolved的Logical Plan根据不同的优化策略Batch,来对语法树进行优化,优化逻辑计划节点(Logical Plan)以及表达式(Expression),也是转换成物理执行计划的前置。如下图:

一、Optimizer Optimizer这个类是在catalyst里的optimizer包下的唯一一个类,Optimizer的工作方式其实类似 Analyzer ,因为它们都继承自RuleExecutor[LogicalPlan],都是执行一系列的Batch操作:

Optimizer里的batches包含了3类优化策略:1、 Combine Limits 合并Limits 2、 ConstantFolding 常量合并 3、 Filter Pushdown 过滤器下推 ,每个Batch里定义的优化伴随对象都定义在Optimizer里了:

另外提一点,Optimizer里不但对Logical Plan进行了优化,而且对Logical Plan中的Expression也进行了优化,所以有必要了解一下Expression相关类,主要是用到了references和outputSet, references主要是Logical Plan或Expression节点的所依赖的那些Expressions,而outputSet是Logical Plan所有的Attribute的输出

如:Aggregate是一个Logical Plan, 它的 references 就是group by的表达式 和 aggreagate的表达式的并集去重。

[java] view plain copy 二、优化策略详解
Optimizer的优化策略不仅有对plan进行transform的,也有对expression进行transform的,究其原理就是遍历树,然后应用优化的Rule,但是注意一点,对Logical Plantransfrom的是 先序遍历(pre-order) ,而对Expression transfrom的时候是 后序遍历(post-order)
2.1、Batch: Combine Limits
如果出现了2个Limit,则将2个Limit合并为一个,这个要求一个Limit是另一个Limit的grandChild。
case ll @ Limit(le, nl @ Limit(ne, grandChild)) => //ll为当前Limit,le为其expression, nl是ll的grandChild,ne是nl的expression Limit(If(LessThan(ne, le), ne, le), grandChild) //expression比较,如果ne比le小则表达式为ne,否则为le 给定SQL:val query = sql("select * from (select * from temp_shengli limit 100)a limit 10 ")
[java] view plain copy 子查询里limit100,外层查询limit10,这里我们当然可以在子查询里不必查那么多,因为外层只需要10个,所以这里会合并Limit10,和Limit100 为 Limit 10。

2.2、 Batch: ConstantFolding 这个Batch里包含了Rules:NullPropagation,ConstantFolding,BooleanSimplification,SimplifyFilters,SimplifyCasts,SimplifyCaseConversionExpressions。

2.2.1、Rule:NullPropagation   这里先提一下Literal字面量,它其实是一个能匹配任意基本类型的类。(为下文做铺垫)

case e @ Count(Literal( null , _)) => Cast(Literal(0L), e.dataType) //如果count(null)则转化为count(0) case e @ Sum(Literal(c, _)) if c == 0 => Cast(Literal(0L), e.dataType)<span style= "font-family: Arial;" > //如果sum(null)则转化为sum(0)</span> case e @ Average(Literal(c, _)) if c == 0 => Literal( 0.0 , e.dataType) case e @ IsNull(c) if !c.nullable => Literal( false , BooleanType) case e @ IsNotNull(c) if !c.nullable => Literal( true , BooleanType) case e @ GetItem(Literal( null , _), _) => Literal( null , e.dataType) case e @ GetItem(_, Literal( null , _)) => Literal( null , e.dataType) case e @ GetField(Literal( null , _), _) => Literal( null , e.dataType) case e @ Coalesce(children) => { val newChildren = children.filter(c => c match { case Literal( null , _) => false case _ => true if (newChildren.length == 0 ) { Literal( null , e.dataType) } else if (newChildren.length == 1 ) { newChildren( 0 ) } else { Coalesce(newChildren) case e @ If(Literal(v, _), trueValue, falseValue) => if (v == true ) trueValue else falseValue case e @ In(Literal(v, _), list) if (list.exists(c => c match { case Literal(candidate, _) if candidate == v => true case _ => false })) => Literal( true , BooleanType) // Put exceptional cases above if any case e: BinaryArithmetic => e.children match { case Literal( null , _) :: right :: Nil => Literal( null , e.dataType) case left :: Literal( null , _) :: Nil => Literal( null , e.dataType) case _ => e case e: BinaryComparison => e.children match { case Literal( null , _) :: right :: Nil => Literal( null , e.dataType) case left :: Literal( null , _) :: Nil => Literal( null , e.dataType) case _ => e case e: StringRegexExpression => e.children match { case Literal( null , _) :: right :: Nil => Literal( null , e.dataType) case left :: Literal( null , _) :: Nil => Literal( null , e.dataType) case _ => e 给定SQL: val query = sql("select count(null) from temp_shengli where key is not null") 这个是对布尔表达式的优化,有点像
Java 布尔表达式中的短路判断,不过这个写的倒是很优雅。

看看布尔表达式2边能不能通过只计算1边,而省去计算另一边而提高效率,称为简化布尔表达式。

解释请看我写的注释:

* Simplifies boolean expressions where the answer can be determined without evaluating both sides. * Note that this rule can eliminate expressions that might otherwise have been evaluated and thus * is only safe when evaluations of expressions does not result in side effects. object BooleanSimplification extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transform { case q: LogicalPlan => q transformExpressionsUp { case and @ And(left, right) => //如果布尔表达式是AND操作,即exp1 and exp2 (left, right) match { //(左边表达式,右边表达式) case (Literal( true , BooleanType), r) => r // 左边true,返回右边的<span style="font-family: Arial;">bool</span><span style="font-family: Arial;">值</span> case (l, Literal( true , BooleanType)) => l //右边true,返回左边的bool值 case (Literal( false , BooleanType), _) => Literal( false ) //左边都false,右边随便,反正是返回false case (_, Literal( false , BooleanType)) => Literal( false ) //只要有1边是false了,都是false case (_, _) => and 2.3 Batch: Filter Pushdown
Filter Pushdown下包含了CombineFilters、PushPredicateThroughProject、PushPredicateThroughJoin、ColumnPruning
Ps:感觉Filter Pushdown的名字起的有点不能涵盖全部比如ColumnPruning列裁剪。
2.3.1、Combine Filters
合并两个相邻的Filter,这个和上述Combine Limit差不多。合并2个节点,就可以 减少树的深度从而减少重复执行过滤的代价
给定SQL:val query = sql("select key from (select key from temp_shengli where key >100)a where key > 80 ")

优化前:我们看到一个filter 是另一个filter的grandChild

原理就是更早的过滤掉不需要的元素来减少开销。

给定SQL:val query = sql("select key from (select * from temp_shengli)a where key>100")

生成的逻辑计划为:

case a @ Aggregate(_, _, child) if (child.outputSet -- a.references).nonEmpty => ////如果project的outputSet中减去a.references的元素如果不同,那么就将Aggreagte的child替换为a.references a.copy(child = Project(a.references.toSeq, child)) case Project(projectList, Join(left, right, joinType, condition)) => // 消除join的left 和 right孩子的不必要属性,将join的左右子树的列进行裁剪 // Collect the list of off references required either above or to evaluate the condition. val allReferences: Set[Attribute] = projectList.flatMap(_.references).toSet ++ condition.map(_.references).getOrElse(Set.empty) // e.g., 'SELECT ... FROM (SELECT a + b AS c, d ...)' produces Map(c -> Alias(a + b, c)). val aliasMap = projectList2.collect { case a @ Alias(e, _) => (a.toAttribute: Expression, a) }.toMap // Substitute any attributes that are produced by the child projection, so that we safely // eliminate it. // e.g., 'SELECT c + 1 FROM (SELECT a + b AS C ...' produces 'SELECT a + b + 1 ...' // TODO: Fix TransformBase to avoid the cast below. val substitutedProjection = projectList1.map(_.transform { case a if aliasMap.contains(a) => aliasMap(a) }).asInstanceOf[Seq[NamedExpression]]
分别举三个例子来对应三种情况进行说明:
1、在聚合操作中,可以做列裁剪
给定SQL:val query = sql("SELECT 1+1 as shengli, key from (select key, value from temp_shengli)a group by key")
给定SQL:val query = sql("select a.value qween from (select * from temp_shengli) a join (select * from temp_shengli)b  on a.key =b.key ")
没有优化之前:

本文介绍了Optimizer在Catalyst里的作用即将Analyzed Logical Plan 经过对Logical Plan和Expression进行Rule的应用transfrom,从而达到树的节点进行合并和优化。其中主要的优化的策略总结起来是合并、列裁剪、过滤器下推几大类。

Catalyst应该在不断迭代中,本文只是基于spark1.0.0进行研究,后续如果新加入的优化策略也会在后续补充进来。

欢迎大家讨论,共同进步!

——EOF——

原创文章,转载请注明:

转载自:
OopsOutOfMemory盛利的Blog ,作者: OopsOutOfMemory

本文链接地址: http://blog.csdn.net/oopsoom/article/details/38121259

注:本文基于 署名-非商业性使用-禁止演绎 2.5 中国大陆(CC BY-NC-ND 2.5 CN) 协议,欢迎转载、转发和评论,但是请保留本文作者署名和文章链接。如若需要用于商业目的或者与授权方面的协商,请联系我。