閱讀30 返回首頁    go 阿裏雲 go 技術社區[雲棲]


Catalyst 優化邏輯執行計劃規則

Optimizer

本文分析Catalyst Optimize部分實現的對邏輯執行計劃(LogicalPlan)的處理規則


Optimizer處理的是LogicalPlan對象。

Optimizer的batches如下:
object Optimizer extends RuleExecutor[LogicalPlan] {
  val batches =
    Batch("ConstantFolding", Once,
      ConstantFolding, // 可靜態分析的常量表達式
      BooleanSimplification, // 布爾表達式提前短路
      SimplifyFilters, // 簡化過濾操作(false, true, null)
      SimplifyCasts) :: // 簡化轉換(對象所屬類已經是Cast目標類)
    Batch("Filter Pushdown", Once,
      CombineFilters, // 相鄰(上下級)Filter操作合並
      PushPredicateThroughProject, // 映射操作中的Filter謂詞下推
      PushPredicateThroughInnerJoin) :: Nil // inner join操作謂詞下推
}

這是4.1號最新的Catalyst  Optimizer的代碼。


ConstantFolding 

把可以靜態分析出結果的表達式替換成Literal表達式。

object ConstantFolding extends Rule[LogicalPlan] {
  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
    case q: LogicalPlan => q transformExpressionsDown {
      // Skip redundant folding of literals.
      case l: Literal => l
      case e if e.foldable => Literal(e.apply(null), e.dataType)
    }
  }
}

Literal能處理的類型包括Int, Long, Double, Float, Byte,Short, String, Boolean, null。這些類型分別對應的是Catalyst框架的DataType,包括IntegerType, LongType, DoubleType,FloatType, ByteType, ShortType, StringType, BooleanType, NullType。

普通的Literal是不可變的,還有一個可變的MutalLiteral類,有update方法可以改變裏麵的value。


BooleanSimplification 

提前短路可以短路的布爾表達式

object BooleanSimplification extends Rule[LogicalPlan] {
  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
    case q: LogicalPlan => q transformExpressionsUp {
      case and @ And(left, right) =>
        (left, right) match {
          case (Literal(true, BooleanType), r) => r
          case (l, Literal(true, BooleanType)) => l
          case (Literal(false, BooleanType), _) => Literal(false)
          case (_, Literal(false, BooleanType)) => Literal(false)
          case (_, _) => and
        }

      case or @ Or(left, right) =>
        (left, right) match {
          case (Literal(true, BooleanType), _) => Literal(true)
          case (_, Literal(true, BooleanType)) => Literal(true)
          case (Literal(false, BooleanType), r) => r
          case (l, Literal(false, BooleanType)) => l
          case (_, _) => or
        }
    }
  }
}

SimplifyFilters 

提前處理可以被判斷的過濾操作

object SimplifyFilters extends Rule[LogicalPlan] {
  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
    case Filter(Literal(true, BooleanType), child) =>
      child
    case Filter(Literal(null, _), child) =>
      LocalRelation(child.output)
    case Filter(Literal(false, BooleanType), child) =>
      LocalRelation(child.output)
  }
}

SimplifyCasts 

把已經是目標類的Cast表達式替換掉

object SimplifyCasts extends Rule[LogicalPlan] {
  def apply(plan: LogicalPlan): LogicalPlan = plan transformAllExpressions {
    case Cast(e, dataType) if e.dataType == dataType => e
  }
}

CombineFilters 

相鄰都是過濾操作的話,把兩個過濾操作合起來。相鄰指的是上下兩級。

object CombineFilters extends Rule[LogicalPlan] {
  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
    case ff @ Filter(fc, nf @ Filter(nc, grandChild)) => Filter(And(nc, fc), grandChild)
  }
}

PushPredicateThroughProject 

把Project操作中的過濾操作下推。這一步裏順帶做了別名轉換的操作(認為開銷不大的前提下)。

object PushPredicateThroughProject extends Rule[LogicalPlan] {
  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
    case filter @ Filter(condition, project @ Project(fields, grandChild)) =>
      val sourceAliases = fields.collect { case a @ Alias(c, _) =>
        (a.toAttribute: Attribute) -> c
      }.toMap // 把fields中的別名屬性都取出來
      project.copy(child = filter.copy( // 生成新的Filter操作
        replaceAlias(condition, sourceAliases), // condition中有別名的替換掉
        grandChild))
  }

  def replaceAlias(condition: Expression, sourceAliases: Map[Attribute, Expression]): Expression = {
    condition transform {
      case a: AttributeReference => sourceAliases.getOrElse(a, a)
    }
  }
}

PushPredicateThroughInnerJoin 

先找到Filter操作,若Filter操作裏麵是一次inner join,那麼先把Filter條件和inner join條件先全部取出來,

然後把隻涉及到左側或右側的過濾操作下推到join外部,把剩下來不能下推的條件放到join操作的condition裏。

object PushPredicateThroughInnerJoin extends Rule[LogicalPlan] with PredicateHelper {
  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
    case f @ Filter(filterCondition, Join(left, right, Inner, joinCondition)) =>
      // 這一步是把過濾條件和join條件裏的condition都提取出來
      val allConditions = splitConjunctivePredicates(filterCondition) ++
        joinCondition.map(splitConjunctivePredicates).getOrElse(Nil)
      
      // 把參考屬性都屬於右側輸出屬性的condition挑選到rightCondition裏
      val (rightConditions, leftOrJoinConditions) =
        allConditions.partition(_.references subsetOf right.outputSet)
      // 同理,把剩餘condition裏麵,參考屬性都屬於左側輸出屬性的condition挑選到
      // leftCondition裏,剩餘的就屬於joinCondition
      val (leftConditions, joinConditions) =
        leftOrJoinConditions.partition(_.references subsetOf left.outputSet)

      // 生成新的left和right:先把condition裏的操作用AND折疊起來,然後將該折疊後的表達式和原始的left/right logical plan合起來生成新的Filter操作,即新的Fil      // ter logical plan
      // 這樣就做到了把過濾條件中的謂詞下推到了left/right裏,即本次inner join的“外部”
      val newLeft = leftConditions.reduceLeftOption(And).map(Filter(_, left)).getOrElse(left)
      val newRight = rightConditions.reduceLeftOption(And).map(Filter(_, right)).getOrElse(right)
      Join(newLeft, newRight, Inner, joinConditions.reduceLeftOption(And))
  }
}

以下幫助理解上麵這段代碼。

Join操作(LogicalPlan的Binary)

case class Join(
  left: LogicalPlan,
  right: LogicalPlan,
  joinType: JoinType,
  condition: Option[Expression]) extends BinaryNode {

  def references = condition.map(_.references).getOrElse(Set.empty)
  def output = left.output ++ right.output
}

Filter操作(LogicalPlan的Unary)

case class Filter(condition: Expression, child: LogicalPlan) extends UnaryNode {
  def output = child.output
  def references = condition.references
}

reduceLeftOption邏輯是這樣的:

def reduceLeftOption[B >: A](op: (B, A) => B): Option[B] =
    if (isEmpty) None else Some(reduceLeft(op))

reduceLeft(op)的結果是op( op( ... op(x_1, x_2) ...,x_{n-1}), x_n)


謂詞助手這個trait,負責把And操作裏的condition分離開,返回表達式Seq

trait PredicateHelper {
  def splitConjunctivePredicates(condition: Expression): Seq[Expression] = condition match {
    case And(cond1, cond2) => splitConjunctivePredicates(cond1) ++ splitConjunctivePredicates(cond2)
    case other => other :: Nil
  }
}


Example

case class Person(name:String, age: Int)

case classNum(v1: Int, v2: Int)


case one

SELECT  people.age, num.v1,  num.v2

FROM

    people

    JOIN  num

    ON   people.age > 20  and  num.v1> 0

WHERE  num.v2< 50


== QueryPlan ==

Project [age#1:1,v1#2:2,v2#3:3]

CartesianProduct

      Filter(age#1:1 > 20)

          ExistingRdd[name#0,age#1], MappedRDD[4] at map at basicOperators.scala:124

      Filter((v2#3:1 < 50) && (v1#2:0 > 0))

          ExistingRdd [v1#2,v2#3],MappedRDD[10] at map at basicOperators.scala:124

 

分析:where條件 num.v2 < 50 下推到Join裏


case two

SELECT people.age,  1+2

FROM

    people

    JOIN  num

    ON   people.name<>’abc’  and  num.v1> 0

WHERE num.v2 < 50

 

== QueryPlan ==

Project [age#1:1,3 AS c1#14]

    CartesianProduct

        Filter NOT(name#0:0 = abc)

            ExistingRdd[name#0,age#1], MappedRDD[4] at map at basicOperators.scala:124

        Filter((v2#3:1 < 50) && (v1#2:0 > 0))

            ExistingRdd[v1#2,v2#3], MappedRDD[10] at map at basicOperators.scala:124

 

分析:1+2 被提前常量折疊,並被取了一個別名



全文完 :)



最後更新:2017-04-03 12:55:58

  上一篇:go Android 對Layout_weight屬性完全解析以及使用ListView來實現表格
  下一篇:go IOS中使用到的常用的第三方開放平台