30
技術社區[雲棲]
Catalyst 優化邏輯執行計劃規則
Optimizer
本文分析Catalyst Optimize部分實現的對邏輯執行計劃(LogicalPlan)的處理規則。
Optimizer處理的是LogicalPlan對象。
Optimizer的batches如下:object Optimizer extends RuleExecutor[LogicalPlan] {
val batches =
Batch("ConstantFolding", Once,
ConstantFolding, // 可靜態分析的常量表達式
BooleanSimplification, // 布爾表達式提前短路
SimplifyFilters, // 簡化過濾操作(false, true, null)
SimplifyCasts) :: // 簡化轉換(對象所屬類已經是Cast目標類)
Batch("Filter Pushdown", Once,
CombineFilters, // 相鄰(上下級)Filter操作合並
PushPredicateThroughProject, // 映射操作中的Filter謂詞下推
PushPredicateThroughInnerJoin) :: Nil // inner join操作謂詞下推
}
這是4.1號最新的Catalyst Optimizer的代碼。
ConstantFolding
把可以靜態分析出結果的表達式替換成Literal表達式。
object ConstantFolding extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case q: LogicalPlan => q transformExpressionsDown {
// Skip redundant folding of literals.
case l: Literal => l
case e if e.foldable => Literal(e.apply(null), e.dataType)
}
}
}
Literal能處理的類型包括Int, Long, Double, Float, Byte,Short, String, Boolean, null。這些類型分別對應的是Catalyst框架的DataType,包括IntegerType, LongType, DoubleType,FloatType, ByteType, ShortType, StringType, BooleanType, NullType。
普通的Literal是不可變的,還有一個可變的MutalLiteral類,有update方法可以改變裏麵的value。
BooleanSimplification
提前短路可以短路的布爾表達式
object BooleanSimplification extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case q: LogicalPlan => q transformExpressionsUp {
case and @ And(left, right) =>
(left, right) match {
case (Literal(true, BooleanType), r) => r
case (l, Literal(true, BooleanType)) => l
case (Literal(false, BooleanType), _) => Literal(false)
case (_, Literal(false, BooleanType)) => Literal(false)
case (_, _) => and
}
case or @ Or(left, right) =>
(left, right) match {
case (Literal(true, BooleanType), _) => Literal(true)
case (_, Literal(true, BooleanType)) => Literal(true)
case (Literal(false, BooleanType), r) => r
case (l, Literal(false, BooleanType)) => l
case (_, _) => or
}
}
}
}
SimplifyFilters
提前處理可以被判斷的過濾操作
object SimplifyFilters extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case Filter(Literal(true, BooleanType), child) =>
child
case Filter(Literal(null, _), child) =>
LocalRelation(child.output)
case Filter(Literal(false, BooleanType), child) =>
LocalRelation(child.output)
}
}
SimplifyCasts
把已經是目標類的Cast表達式替換掉
object SimplifyCasts extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transformAllExpressions {
case Cast(e, dataType) if e.dataType == dataType => e
}
}
CombineFilters
相鄰都是過濾操作的話,把兩個過濾操作合起來。相鄰指的是上下兩級。
object CombineFilters extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case ff @ Filter(fc, nf @ Filter(nc, grandChild)) => Filter(And(nc, fc), grandChild)
}
}
PushPredicateThroughProject
把Project操作中的過濾操作下推。這一步裏順帶做了別名轉換的操作(認為開銷不大的前提下)。
object PushPredicateThroughProject extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case filter @ Filter(condition, project @ Project(fields, grandChild)) =>
val sourceAliases = fields.collect { case a @ Alias(c, _) =>
(a.toAttribute: Attribute) -> c
}.toMap // 把fields中的別名屬性都取出來
project.copy(child = filter.copy( // 生成新的Filter操作
replaceAlias(condition, sourceAliases), // condition中有別名的替換掉
grandChild))
}
def replaceAlias(condition: Expression, sourceAliases: Map[Attribute, Expression]): Expression = {
condition transform {
case a: AttributeReference => sourceAliases.getOrElse(a, a)
}
}
}
PushPredicateThroughInnerJoin
先找到Filter操作,若Filter操作裏麵是一次inner join,那麼先把Filter條件和inner join條件先全部取出來,
然後把隻涉及到左側或右側的過濾操作下推到join外部,把剩下來不能下推的條件放到join操作的condition裏。
object PushPredicateThroughInnerJoin extends Rule[LogicalPlan] with PredicateHelper {
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case f @ Filter(filterCondition, Join(left, right, Inner, joinCondition)) =>
// 這一步是把過濾條件和join條件裏的condition都提取出來
val allConditions = splitConjunctivePredicates(filterCondition) ++
joinCondition.map(splitConjunctivePredicates).getOrElse(Nil)
// 把參考屬性都屬於右側輸出屬性的condition挑選到rightCondition裏
val (rightConditions, leftOrJoinConditions) =
allConditions.partition(_.references subsetOf right.outputSet)
// 同理,把剩餘condition裏麵,參考屬性都屬於左側輸出屬性的condition挑選到
// leftCondition裏,剩餘的就屬於joinCondition
val (leftConditions, joinConditions) =
leftOrJoinConditions.partition(_.references subsetOf left.outputSet)
// 生成新的left和right:先把condition裏的操作用AND折疊起來,然後將該折疊後的表達式和原始的left/right logical plan合起來生成新的Filter操作,即新的Fil // ter logical plan
// 這樣就做到了把過濾條件中的謂詞下推到了left/right裏,即本次inner join的“外部”
val newLeft = leftConditions.reduceLeftOption(And).map(Filter(_, left)).getOrElse(left)
val newRight = rightConditions.reduceLeftOption(And).map(Filter(_, right)).getOrElse(right)
Join(newLeft, newRight, Inner, joinConditions.reduceLeftOption(And))
}
}
以下幫助理解上麵這段代碼。
Join操作(LogicalPlan的Binary)
case class Join(
left: LogicalPlan,
right: LogicalPlan,
joinType: JoinType,
condition: Option[Expression]) extends BinaryNode {
def references = condition.map(_.references).getOrElse(Set.empty)
def output = left.output ++ right.output
}
Filter操作(LogicalPlan的Unary)
case class Filter(condition: Expression, child: LogicalPlan) extends UnaryNode {
def output = child.output
def references = condition.references
}
reduceLeftOption邏輯是這樣的:
def reduceLeftOption[B >: A](op: (B, A) => B): Option[B] =
if (isEmpty) None else Some(reduceLeft(op))
reduceLeft(op)的結果是op( op( ... op(x_1, x_2) ...,x_{n-1}), x_n)
謂詞助手這個trait,負責把And操作裏的condition分離開,返回表達式Seq
trait PredicateHelper {
def splitConjunctivePredicates(condition: Expression): Seq[Expression] = condition match {
case And(cond1, cond2) => splitConjunctivePredicates(cond1) ++ splitConjunctivePredicates(cond2)
case other => other :: Nil
}
}
Example
case class Person(name:String, age: Int)
case classNum(v1: Int, v2: Int)
case one
SELECT people.age, num.v1, num.v2
FROM
people
JOIN num
ON people.age > 20 and num.v1> 0
WHERE num.v2< 50
== QueryPlan ==
Project [age#1:1,v1#2:2,v2#3:3]
CartesianProduct
Filter(age#1:1 > 20)
ExistingRdd[name#0,age#1], MappedRDD[4] at map at basicOperators.scala:124
Filter((v2#3:1 < 50) && (v1#2:0 > 0))
ExistingRdd [v1#2,v2#3],MappedRDD[10] at map at basicOperators.scala:124
分析:where條件 num.v2 < 50 下推到Join裏
case two
SELECT people.age, 1+2
FROM
people
JOIN num
ON people.name<>’abc’ and num.v1> 0
WHERE num.v2 < 50
== QueryPlan ==
Project [age#1:1,3 AS c1#14]
CartesianProduct
Filter NOT(name#0:0 = abc)
ExistingRdd[name#0,age#1], MappedRDD[4] at map at basicOperators.scala:124
Filter((v2#3:1 < 50) && (v1#2:0 > 0))
ExistingRdd[v1#2,v2#3], MappedRDD[10] at map at basicOperators.scala:124
分析:1+2 被提前常量折疊,並被取了一個別名
全文完 :)
最後更新:2017-04-03 12:55:58