閱讀871 返回首頁    go 阿裏雲 go 技術社區[雲棲]


Spark Catalyst 源碼分析

Architecture



Ø 把輸入的SQL,parse成unresolved logical plan,這一步參考SqlParser的實現


Ø 把unresolved logical plan轉化成resolved logical plan,這一步參考analysis的實現


Ø 把resolved logical plan轉化成optimized logical plan,這一步參考optimize的實現


Ø 把optimized logical plan轉化成physical plan,這一步參考QueryPlanner Strategy的實現


Source Code Module


Rule

Rule是一個抽象類,擁有一個名字,默認為類名。Rule的實現有很多,滲透在不同的處理過程(analyze, optimize)裏。

RuleExecutor是規則執行類,下麵兩個實現會具體講:
Analyzer
Optimizer

RuleExecutor 支持的策略:一次或多次。用來控製converge結束的條件。這裏的Strategy名字感覺有點誤導人。

/**
   * An execution strategy for rules that indicates the maximum number of executions. If the
   * execution reaches fix point (i.e. converge) before maxIterations, it will stop.
   */
  abstract class Strategy { def maxIterations: Int }

  /** A strategy that only runs once. */
  case object Once extends Strategy { val maxIterations = 1 }

  /** A strategy that runs until fix point or maxIterations times, whichever comes first. */
  case class FixedPoint(maxIterations: Int) extends Strategy

RuleExecutor的Batch類和batches變量:
/** A batch of rules. */
  protected case class Batch(name: String, strategy: Strategy, rules: Rule[TreeType]*)

  /** Defines a sequence of rule batches, to be overridden by the implementation. */
  protected val batches: Seq[Batch]
一個batch有多個Rule

batches在apply()裏執行的時候,把一個plan丟進來後,用左折疊處理每個batch,最後吐出來一個plan。
converge的條件是達到最大策略次數或者兩個TreeNode相等。apply()處理過程如下:
/**
   * Executes the batches of rules defined by the subclass. The batches are executed serially
   * using the defined execution strategy. Within each batch, rules are also executed serially.
   */
  def apply(plan: TreeType): TreeType = {
    var curPlan = plan

    batches.foreach { batch =>
      var iteration = 1 
      var lastPlan = curPlan
      curPlan = batch.rules.foldLeft(curPlan) { case (plan, rule) => rule(plan) }

      // Run until fix point (or the max number of iterations as specified in the strategy.
      while (iteration < batch.strategy.maxIterations && !curPlan.fastEquals(lastPlan)) {
        lastPlan = curPlan
        curPlan = batch.rules.foldLeft(curPlan) {
          case (plan, rule) =>
            val result = rule(plan)
            if (!result.fastEquals(plan)) {
              logger.debug(...)
            }
            result
        }
        iteration += 1
      }
    }
    curPlan
  }

下麵具體介紹RuleExecutor的實現。

Analyzer

Analyzer使用於對最初的unresolved logical plan轉化成為logical plan。這部分的分析會涵蓋整個analysis package。

作用是把未確定的屬性和關係,通過Schema信息(來自於Catalog類)和方法注冊類來確定下來,這個過程中有三步,第三步會包含許多次的迭代。

/**
 * Provides a logical query plan analyzer, which translates [[UnresolvedAttribute]]s and
 * [[UnresolvedRelation]]s into fully typed objects using information in a schema [[Catalog]] and
 * a [[FunctionRegistry]].
 */
class Analyzer(catalog: Catalog, registry: FunctionRegistry, caseSensitive: Boolean)
  extends RuleExecutor[LogicalPlan] with HiveTypeCoercion {

首先,Catalog類是一個記錄表信息的類,專門提供給Analyzer用。
trait Catalog {
  def lookupRelation(
    databaseName: Option[String],
    tableName: String,
    alias: Option[String] = None): LogicalPlan

  def registerTable(databaseName: Option[String], tableName: String, plan: LogicalPlan): Unit
}

看一個SimpleCatalog的實現,該類在SQLContext裏使用,把表名和LogicalPlan存在HashMap裏維護起來,生命周期隨上下文。提供注冊表、刪除表、查找表的功能。
class SimpleCatalog extends Catalog {
  val tables = new mutable.HashMap[String, LogicalPlan]()

  def registerTable(databaseName: Option[String],tableName: String, plan: LogicalPlan): Unit = {
    tables += ((tableName, plan))
  }

  def dropTable(tableName: String) = tables -= tableName

  def lookupRelation(
      databaseName: Option[String],
      tableName: String,
      alias: Option[String] = None): LogicalPlan = {
    val table = tables.get(tableName).getOrElse(sys.error(s"Table Not Found: $tableName"))

    // If an alias was specified by the lookup, wrap the plan in a subquery so that attributes are
    // properly qualified with this alias.
    alias.map(a => Subquery(a.toLowerCase, table)).getOrElse(table)
  }
}

在查找的時候可以代入一個別名,會把他包裝成一個Subquery。Subquery是個簡單的case class。
case class Subquery(alias: String, child: LogicalPlan) extends UnaryNode {
  def output = child.output.map(_.withQualifiers(alias :: Nil))
  def references = Set.empty
}

FunctionRegistry類似於Catalog,記錄的是函數,在hive package裏,處理的是Hive的UDF
trait FunctionRegistry {
  def lookupFunction(name: String, children: Seq[Expression]): Expression
}

FunctionRegistry的實現在Catalyst裏目前隻有一個(在Hive模塊裏有實現,具體在最後一節Hive內),如下,如果你要查找方法,就會拋異常。
/**
 * A trivial catalog that returns an error when a function is requested.  Used for testing when all
 * functions are already filled in and the analyser needs only to resolve attribute references.
 */
object EmptyFunctionRegistry extends FunctionRegistry {
  def lookupFunction(name: String, children: Seq[Expression]): Expression = {
    throw new UnsupportedOperationException
  }
}

回到Analyzer,SQLContext在使用Analyzer前,這樣生成:
@transient
protected[sql] lazy val catalog: Catalog = new SimpleCatalog
protected[sql] lazy val analyzer: Analyzer =
    new Analyzer(catalog, EmptyFunctionRegistry, caseSensitive = true)

接下來看Catalyst現在的Analyzer作為一個RuleExecutor,已經實現的功能:
class Analyzer(catalog: Catalog, registry: FunctionRegistry, caseSensitive: Boolean)
  extends RuleExecutor[LogicalPlan] with HiveTypeCoercion {

  // TODO: pass this in as a parameter.
  val fixedPoint = FixedPoint(100)

  val batches: Seq[Batch] = Seq(
    Batch("MultiInstanceRelations", Once,
      NewRelationInstances),
    Batch("CaseInsensitiveAttributeReferences", Once,
      (if (caseSensitive) Nil else LowercaseAttributeReferences :: Nil) : _*),
    Batch("Resolution", fixedPoint,
      ResolveReferences ::
      ResolveRelations ::
      NewRelationInstances ::
      ImplicitGenerate ::
      StarExpansion ::
      ResolveFunctions ::
      GlobalAggregates ::
      typeCoercionRules :_*)
  )

下麵分別分析三個batch裏麵的Rule做的事情。

Batch One

首先是第一個batch裏的NewRelationInstance這條Rule,他的作用就是避免一個邏輯計劃上同一個實例出現多次,如果出現就生成一個新的plan,保證每個表達式id都唯一。

/**
 * If any MultiInstanceRelation appears more than once in the query plan then the plan is updated so
 * that each instance has unique expression ids for the attributes produced.
 */
object NewRelationInstances extends Rule[LogicalPlan] {
  def apply(plan: LogicalPlan): LogicalPlan = {
    val localRelations = plan collect { case l: MultiInstanceRelation => l} // 這一步是搜集所有的多實例關係
    val multiAppearance = localRelations
      .groupBy(identity[MultiInstanceRelation])
      .filter { case (_, ls) => ls.size > 1 }
      .map(_._1)
      .toSet // 這一步是做過濾

    plan transform { // 這一步是把原來plan裏的多實例關係,凡是出現多個,就變成一個新的單一實例
      case l: MultiInstanceRelation if multiAppearance contains l => l.newInstance
    }
  }
}

LogicalPlan本身是TreeNode的子類,TreeNode具備collect等一些scala collection操作的能力,這個例子裏第一步搜集的過程中體現了collect能力。

TreeNode是Catalyst裏的重要基礎類,後麵有小節會具體講。

Batch Two

第二個batch是大小寫相關的,如果對大小寫不敏感,那麼就執行LowercaseAttributeReferences這條Rule,會把所有的屬性都變成小寫

/**
   * Makes attribute naming case insensitive by turning all UnresolvedAttributes to lowercase.
   */
  object LowercaseAttributeReferences extends Rule[LogicalPlan] {
    def apply(plan: LogicalPlan): LogicalPlan = plan transform {
      case UnresolvedRelation(databaseName, name, alias) => // 第一類:未確定的關係
        UnresolvedRelation(databaseName, name, alias.map(_.toLowerCase))
      case Subquery(alias, child) => Subquery(alias.toLowerCase, child) // 第二類:子查詢
      case q: LogicalPlan => q transformExpressions { // 第三類: 其他類型
        case s: Star => s.copy(table = s.table.map(_.toLowerCase))  // 指的是 * 號
        case UnresolvedAttribute(name) => UnresolvedAttribute(name.toLowerCase) // 未確定的屬性
        case Alias(c, name) => Alias(c, name.toLowerCase)() // 別名
      }
    }
  }

transform,transformExpressions是TreeNode提供的方法,用於前序遍曆樹(pre-order)。

從這個處理可以看到logicalPlan裏麵包含的種類。後續Expression這一塊具體還要展開介紹。

Alias的一點注釋:

/**
 * Used to assign a new name to a computation.
 * For example the SQL expression "1 + 1 AS a" could be represented as follows:
 *  Alias(Add(Literal(1), Literal(1), "a")()
 *

Batch Three

Resulotion是第三類batch,定義的結束條件是循環100次。下麵是我加的注釋,大致介紹Rule的作用,並挑選幾個Rule的實現介紹。

Batch("Resolution", fixedPoint,
      ResolveReferences :: // 確定屬性
      ResolveRelations :: // 確定關係(從catalog裏)
      NewRelationInstances :: // 去掉同一個實例出現多次的情況
      ImplicitGenerate :: // 把包含Generator且隻有一條的表達式轉化成Generate操作
      StarExpansion :: // 擴張 * 
      ResolveFunctions :: // 確定方法(從FunctionRegistry裏)
      GlobalAggregates :: // 把包含Aggregate的表達式轉化成Aggregate操作
      typeCoercionRules :_*) // 來自於HiveTypeCoercion,主要針對Hive語法做強製轉換,包含多種規則

用post-order遍曆樹,把未確定的屬性確定下來。如果沒有做成功,未確定的屬性依然會留下來,留給下一次迭代的時候再確定。

/**
   * Replaces [[UnresolvedAttribute]]s with concrete
   * [[expressions.AttributeReference AttributeReferences]] from a logical plan node's children.
   */
  object ResolveReferences extends Rule[LogicalPlan] {
    def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
      case q: LogicalPlan if q.childrenResolved =>
        logger.trace(s"Attempting to resolve ${q.simpleString}")
        q transformExpressions {
          case u @ UnresolvedAttribute(name) =>
            // Leave unchanged if resolution fails.  Hopefully will be resolved next round.
            val result = q.resolve(name).getOrElse(u)
            logger.debug(s"Resolving $u to $result")
            result
        }
    }
  }

確定是通過LogicalPlan的resolve方法做的。這個具體在LogicalPlan裏介紹,resolve方法是LogicalPlan的唯一且重要方法。


從catalog裏查找關係

/**
   * Replaces [[UnresolvedRelation]]s with concrete relations from the catalog.
   */
  object ResolveRelations extends Rule[LogicalPlan] {
    def apply(plan: LogicalPlan): LogicalPlan = plan transform {
      case UnresolvedRelation(databaseName, name, alias) =>
        catalog.lookupRelation(databaseName, name, alias)
    }
  }

Generator是表達式的一種,根據一種inputrow產生0個或多個rows。

/**
   * When a SELECT clause has only a single expression and that expression is a
   * [[catalyst.expressions.Generator Generator]] we convert the
   * [[catalyst.plans.logical.Project Project]] to a [[catalyst.plans.logical.Generate Generate]].
   */
  object ImplicitGenerate extends Rule[LogicalPlan] {
    def apply(plan: LogicalPlan): LogicalPlan = plan transform {
      case Project(Seq(Alias(g: Generator, _)), child) =>
        Generate(g, join = false, outer = false, None, child)
    }
  }

確定方法類似確定關係。

/**
   * Replaces [[UnresolvedFunction]]s with concrete [[expressions.Expression Expressions]].
   */
  object ResolveFunctions extends Rule[LogicalPlan] {
    def apply(plan: LogicalPlan): LogicalPlan = plan transform {
      case q: LogicalPlan =>
        q transformExpressions {
          case u @ UnresolvedFunction(name, children) if u.childrenResolved =>
            registry.lookupFunction(name, children)
        }
    }
  }


換針對Hive語法做強製轉換,規則如下

trait HiveTypeCoercion {
  val typeCoercionRules = List(PropagateTypes, ConvertNaNs, WidenTypes, PromoteStrings, BooleanComparisons, BooleanCasts, StringToIntegralCasts, FunctionArgumentConversion)

舉個簡單的例子來看下表達式的使用和替換:

/**
   * Converts string "NaN"s that are in binary operators with a NaN-able types (Float / Double) * to the appropriate numeric equivalent.
   */
  object ConvertNaNs extends Rule[LogicalPlan] {
    val stringNaN = Literal("NaN", StringType)

    def apply(plan: LogicalPlan): LogicalPlan = plan transform {
      case q: LogicalPlan => q transformExpressions {
        // Skip nodes who's children have not been resolved yet.
        case e if !e.childrenResolved => e

        /* Double Conversions */
        case b: BinaryExpression if b.left == stringNaN && b.right.dataType == DoubleType =>
          b.makeCopy(Array(b.right, Literal(Double.NaN)))
        case b: BinaryExpression if b.left.dataType == DoubleType && b.right == stringNaN =>
          b.makeCopy(Array(Literal(Double.NaN), b.left))
        case b: BinaryExpression if b.left == stringNaN && b.right == stringNaN =>
          b.makeCopy(Array(Literal(Double.NaN), b.left))

        /* Float Conversions */
        case b: BinaryExpression if b.left == stringNaN && b.right.dataType == FloatType =>
          b.makeCopy(Array(b.right, Literal(Float.NaN)))
        case b: BinaryExpression if b.left.dataType == FloatType && b.right == stringNaN =>
          b.makeCopy(Array(Literal(Float.NaN), b.left))
        case b: BinaryExpression if b.left == stringNaN && b.right == stringNaN =>
          b.makeCopy(Array(Literal(Float.NaN), b.left))
      }
    }
  }

Optimizer

Optimizer用於把analyzedplan轉化成為optimized plan。目前Catalyst的optimizer包下就這一個類,SQLContext也是直接使用的這個類。

同樣,我們看一下裏麵包括了哪些處理過程:

object Optimizer extends RuleExecutor[LogicalPlan] {
  val batches =
    Batch("Subqueries", Once,
      EliminateSubqueries) ::
    Batch("ConstantFolding", Once,
      ConstantFolding,
      BooleanSimplification,
      SimplifyCasts) ::
    Batch("Filter Pushdown", Once,
      EliminateSubqueries,
      CombineFilters,
      PushPredicateThroughProject,
      PushPredicateThroughInnerJoin) :: Nil
}


Batch One

和子查詢相關的一批規則,包含一條消除子查詢的規則:EliminateSubqueries

/**
 * Removes [[catalyst.plans.logical.Subquery Subquery]] operators from the plan.  Subqueries are
 * only required to provide scoping information for attributes and can be removed once analysis is
 * complete.
 */
object EliminateSubqueries extends Rule[LogicalPlan] {
  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
    case Subquery(_, child) => child // 處理方式是凡是帶child的,都用child替換自己
  }
}

注釋提到,過了analysis這一步之後,子查詢就可以移除了。


Batch Two

第二批規則,常量折疊

Batch("ConstantFolding", Once,
      ConstantFolding, // 常量折疊
      BooleanSimplification, // 提早短路掉布爾表達式
      SimplifyCasts) // 去掉多餘的Cast操作

具體看:
/**
 * Replaces [[catalyst.expressions.Expression Expressions]] that can be statically evaluated with
 * equivalent [[catalyst.expressions.Literal Literal]] values.
 */
object ConstantFolding extends Rule[LogicalPlan] {
  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
    case q: LogicalPlan => q transformExpressionsDown {
      // Skip redundant folding of literals.
      case l: Literal => l
      case e if e.foldable => Literal(e.apply(null), e.dataType)
    }
  }
}

這裏不得不提一下foldable字段在Expression類裏的定義:

/**
   * Returns true when an expression is a candidate for static evaluation before the query is
   * executed.
   *
   * The following conditions are used to determine suitability for constant folding:
   *  - A [[expressions.Coalesce Coalesce]] is foldable if all of its children are foldable
   *  - A [[expressions.BinaryExpression BinaryExpression]] is foldable if its both left and right
   *    child are foldable
   *  - A [[expressions.Not Not]], [[expressions.IsNull IsNull]], or
   *    [[expressions.IsNotNull IsNotNull]] is foldable if its child is foldable.
   *  - A [[expressions.Literal]] is foldable.
   *  - A [[expressions.Cast Cast]] or [[expressions.UnaryMinus UnaryMinus]] is foldable if its
   *    child is foldable.
   */
  // TODO: Supporting more foldable expressions. For example, deterministic Hive UDFs.
  def foldable: Boolean = false
隻有Literal表達式是foldable的,其餘表達式必須表達式中每個元素都滿足foldable。

第二種規則也好理解,簡化布爾表達式。也就是早早地給表達式做一個短路判斷。

/**
 * Simplifies boolean expressions where the answer can be determined without evaluating both sides.
 * Note that this rule can eliminate expressions that might otherwise have been evaluated and thus
 * is only safe when evaluations of expressions does not result in side effects.
 */
object BooleanSimplification extends Rule[LogicalPlan] {
  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
    case q: LogicalPlan => q transformExpressionsUp {
      case and @ And(left, right) =>
        (left, right) match {
          case (Literal(true, BooleanType), r) => r
          case (l, Literal(true, BooleanType)) => l
          case (Literal(false, BooleanType), _) => Literal(false)
          case (_, Literal(false, BooleanType)) => Literal(false)
          case (_, _) => and
        }

      case or @ Or(left, right) =>
        (left, right) match {
          case (Literal(true, BooleanType), _) => Literal(true)
          case (_, Literal(true, BooleanType)) => Literal(true)
          case (Literal(false, BooleanType), r) => r
          case (l, Literal(false, BooleanType)) => l
          case (_, _) => or
        }
    }
  }
}

把Cast操作全部移走。

/**
 * Removes [[catalyst.expressions.Cast Casts]] that are unnecessary because the input is already
 * the correct type.
 */
object SimplifyCasts extends Rule[LogicalPlan] {
  def apply(plan: LogicalPlan): LogicalPlan = plan transformAllExpressions {
    case Cast(e, dataType) if e.dataType == dataType => e
  }
}

Batch Three

一批 過濾下推 規則,

Batch("Filter Pushdown", Once,
      EliminateSubqueries, // 消除子查詢
      CombineFilters, // 過濾操作取合集
      PushPredicateThroughProject, // 為映射操作下推謂詞
      PushPredicateThroughInnerJoin) // 為inner join下推謂詞

具體不一一列舉了。

SQLContext

SQLContext的這一個RuleExecutor實現已經到了物理執行計劃SparkPlan的處理了。也是一種實現,注冊了自己的batch,如下:
/**
   * Prepares a planned SparkPlan for execution by binding references to specific ordinals, and
   * inserting shuffle operations as needed.
   */
  @transient
  protected[sql] val prepareForExecution = new RuleExecutor[SparkPlan] {
    val batches =
      Batch("Add exchange", Once, AddExchange) ::
      Batch("Prepare Expressions", Once, new BindReferences[SparkPlan]) :: Nil
  }


以上就是Rule包,及RuleExecutor在各處的實現。其中Analyze和Optimize是Catalyst目前提供的,SQL組件直接拿來使用。

TreeNode

TreeNode Library支持的三個特性:

    · Scala collection like methods (foreach, map, flatMap, collect, etc)

    · transform accepts a partial function that is used to generate a newtree.

    · debugging support pretty printing, easy splicing of trees, etc.

Collection操作能力

偏函數


繼承結構


全局唯一id

object TreeNode {
  private val currentId = new java.util.concurrent.atomic.AtomicLong
  protected def nextId() = currentId.getAndIncrement()
}

幾種節點

/**
 * A [[TreeNode]] that has two children, [[left]] and [[right]].
 */
trait BinaryNode[BaseType <: TreeNode[BaseType]] {
  def left: BaseType
  def right: BaseType

  def children = Seq(left, right)
}

/**
 * A [[TreeNode]] with no children.
 */
trait LeafNode[BaseType <: TreeNode[BaseType]] {
  def children = Nil
}

/**
 * A [[TreeNode]] with a single [[child]].
 */
trait UnaryNode[BaseType <: TreeNode[BaseType]] {
  def child: BaseType
  def children = child :: Nil
}

每個node唯一id,導致在比較的時候,不同分支上長得一樣結構的node也不相同,比較如下:

  def sameInstance(other: TreeNode[_]): Boolean = {
    this.id == other.id
  }

  def fastEquals(other: TreeNode[_]): Boolean = {
    sameInstance(other) || this == other
  }

foreach的時候,先做自己,再把孩子們做一遍
def foreach(f: BaseType => Unit): Unit = {
    f(this)
    children.foreach(_.foreach(f))
  }

map的時候是按前序對每個節點都做一次處理

def map[A](f: BaseType => A): Seq[A] = {
    val ret = new collection.mutable.ArrayBuffer[A]()
    foreach(ret += f(_))
    ret
  }

其他的很多變化都類似,接收的是函數或偏函數,把他們作用到匹配的節點上去執行

變化總共有這些,按類別分:

map, flatMap, collect,

mapChildren,  withNewChildren,

transform, transformDown, transformChildrenDown 前序

                    transformUp,  transformChildrenUp          後序

基本上就這些,其實就是提供對這棵樹及其子節點的順序遍曆和處理能力


Plan

QueryPlan的繼承結構



QueryPlan提供了三個東西,

Ø  其一是定義了output,是對外輸出的一個屬性序列

    def output:Seq[Attribute]


Ø  其二是借用TreeNode的那套transform方法,實現了一套transformExpression方法,用途是把partialfunction遍曆到各個子節點上。

 

Ø  其三是一個expressions方法,返回Seq[expression],用於搜集本query裏所有的表達式。

 

QueryPlanCatalyst裏的實現是LogicalPlan,在SQL組件裏的實現是SparkPlan,前者主要要被處理、分析和優化,後者是真正被處理執行的,下麵簡單介紹兩者。


Logical Plan

在QueryPlan上增加的幾個屬性:

1.      references 用於生成output屬性列表的參考屬性列表

          def references: Set[Attribute]

 

2.      lazy val inputSet: Set[Attribute] = children.flatMap(_.output).toSet

 

3.      自己及children是否resolved

 

4.      resolve方法,重要,看起來費勁

def resolve(name: String): Option[NamedExpression] = {
    val parts = name.split("\\.")
    // Collect all attributes that are output by this nodes children where either the first part
    // matches the name or where the first part matches the scope and the second part matches the
    // name.  Return these matches along with any remaining parts, which represent dotted access to
    // struct fields.
    val options = children.flatMap(_.output).flatMap { option =>
      // If the first part of the desired name matches a qualifier for this possible match, drop it.
      val remainingParts = if (option.qualifiers contains parts.head) parts.drop(1) else parts
      if (option.name == remainingParts.head) (option, remainingParts.tail.toList) :: Nil else Nil
    }

    options.distinct match {
      case (a, Nil) :: Nil => Some(a) // One match, no nested fields, use it.
      // One match, but we also need to extract the requested nested field.
      case (a, nestedFields) :: Nil =>
        a.dataType match {
          case StructType(fields) =>
            Some(Alias(nestedFields.foldLeft(a: Expression)(GetField), nestedFields.last)())
          case _ => None // Don't know how to resolve these field references
        }
      case Nil => None         // No matches.
      case ambiguousReferences =>
        throw new TreeNodeException(
          this, s"Ambiguous references to $name: ${ambiguousReferences.mkString(",")}")
    }
  }

三種抽象子類:

/**
 * A logical plan node with no children.
 */
abstract class LeafNode extends LogicalPlan with trees.LeafNode[LogicalPlan] {
  self: Product =>
  // Leaf nodes by definition cannot reference any input attributes.
  def references = Set.empty
}

/**
 * A logical plan node with single child.
 */
abstract class UnaryNode extends LogicalPlan with trees.UnaryNode[LogicalPlan] {
  self: Product =>
}

/**
 * A logical plan node with a left and right child.
 */
abstract class BinaryNode extends LogicalPlan with trees.BinaryNode[LogicalPlan] {
  self: Product =>
}

分別看LogicalPlan的三種Node的實現結構:LeafNode,UnaryNode,BinaryNode


LeafNode

/**
 * A logical node that represents a non-query command to be executed by the system.  For example,
 * commands can be used by parsers to represent DDL operations.
 */
abstract class Command extends LeafNode {
  self: Product => 
  def output = Seq.empty
}

/**
 * Returned for commands supported by a given parser, but not catalyst.  In general these are DDL
 * commands that are passed directly to another system.
 */
case class NativeCommand(cmd: String) extends Command

/**
 * Returned by a parser when the users only wants to see what query plan would be executed, without
 * actually performing the execution.
 */
case class ExplainCommand(plan: LogicalPlan) extends Command

case object NoRelation extends LeafNode {
  def output = Nil
}

對於Command和BaseRelation,在sql.hive包內有更多實現


MetastoreRelation的作用在Hive一節會說明。


Command略。

UnaryNode


BinaryNode


Spark Plan

SparkPlan類繼承結構如下圖:


在SQL模塊的execution package的basicOperator類裏,有許多SparkPlan的實現,包括

Project,Filter,Sample,Union,StopAfter,TopK,Sort,ExsitingRdd

 

這些實現和Catalyst的basicOperator類裏有很多重了,區別在於,SparkPlanQueryPlan的實現,同logical plan不同的是,SparkPlan會被Spark實現的Strategy真正執行,所以SQL模塊裏的basicOperator內的這些caseclass,比Catalyst多了execute()方法

 

具體Spark策略的實現參考下一小節。


Planning


Query Planner

QueryPlanner的職責是把邏輯執行計劃轉化成為物理執行計劃,具備一係列Strategy的實現



abstract class QueryPlanner[PhysicalPlan <: TreeNode[PhysicalPlan]] {
  /** A list of execution strategies that can be used by the planner */
  def strategies: Seq[Strategy]

  /**
   * Given a [[plans.logical.LogicalPlan LogicalPlan]], returns a list of `PhysicalPlan`s that can
   * be used for execution. If this strategy does not apply to the give logical operation then an
   * empty list should be returned.
   */
  abstract protected class Strategy extends Logging {
    def apply(plan: LogicalPlan): Seq[PhysicalPlan]
  }

  /**
   * Returns a placeholder for a physical plan that executes `plan`. This placeholder will be
   * filled in automatically by the QueryPlanner using the other execution strategies that are
   * available.
   */
  protected def planLater(plan: LogicalPlan) = apply(plan).next()

  def apply(plan: LogicalPlan): Iterator[PhysicalPlan] = {
    // Obviously a lot to do here still...
    val iter = strategies.view.flatMap(_(plan)).toIterator
    assert(iter.hasNext, s"No plan for $plan")
    iter
  }
}

QueryPlanner impl

目前的實現是SparkStrategies

在SQLContext裏的使用是SparkPlanner:

protected[sql] class SparkPlanner extends SparkStrategies {
    val sparkContext = self.sparkContext

    val strategies: Seq[Strategy] =
      TopK ::
      PartialAggregation ::
      SparkEquiInnerJoin ::
      BasicOperators ::
      CartesianProduct ::
      BroadcastNestedLoopJoin :: Nil
  }

在HiveContext裏的使用是帶了hive策略的SparkPlanner:

val hivePlanner = new SparkPlanner with HiveStrategies {
    val hiveContext = self

    override val strategies: Seq[Strategy] = Seq(
      TopK,
      ColumnPrunings,
      PartitionPrunings,
      HiveTableScans,
      DataSinks,
      Scripts,
      PartialAggregation,
      SparkEquiInnerJoin,
      BasicOperators,
      CartesianProduct,
      BroadcastNestedLoopJoin
    )
  }

Strategy & impl

Strategy的實現主要包含Spark Strategy和Hive Strategy。前者基本上對應了sql.execution包裏的類。後者是在Spark策略的基礎上附加的一些策略。

Expression

Expression幾個屬性:

1.      帶DataType,並且自帶一些inline方法幫助一些dataType的轉換

2.      帶reference,reference是Seq[Attribute],Attribute是NamedExpression子類。

3.      foldable ,即靜態可以直接執行的表達式

Expression裏隻有Literal可折疊,Literal是LeafExpression,根據dataType生成不同類型表達式

object Literal {
  def apply(v: Any): Literal = v match {
    case i: Int => Literal(i, IntegerType)
    case l: Long => Literal(l, LongType)
    case d: Double => Literal(d, DoubleType)
    case f: Float => Literal(f, FloatType)
    case b: Byte => Literal(b, ByteType)
    case s: Short => Literal(s, ShortType)
    case s: String => Literal(s, StringType)
    case b: Boolean => Literal(b, BooleanType)
    case null => Literal(null, NullType)
  }
}

case class Literal(value: Any, dataType: DataType) extends LeafExpression {

  override def foldable = true
  def nullable = value == null
  def references = Set.empty

  override def toString = if (value != null) value.toString else "null"

  type EvaluatedType = Any
  override def apply(input: Row):Any = value // 執行這個葉子表達式的話就是返回value值
}

4.      resolved 具體關心children是否都resolved。

childeren是TreeNode裏的概念,在TreeNode裏是一個Seq[BaseType],而BaseType是TreeNode[T]裏的範型。在Expression這裏,即TreeNode[Expression],BaseType就是Expression。


Expression繼承結構



抽象子類如下:

abstract class BinaryExpression extends Expression with trees.BinaryNode[Expression] {
  self: Product =>
  def symbol: String
  override def foldable = left.foldable && right.foldable
  def references = left.references ++ right.references
  override def toString = s"($left $symbol $right)"
}

abstract class LeafExpression extends Expression with trees.LeafNode[Expression] {
  self: Product =>
}

abstract class UnaryExpression extends Expression with trees.UnaryNode[Expression] {
  self: Product =>
  def references = child.references
}

Expression impl


SchemaRDD

SchemaRDD是一個RDD[Row],Row在Catalyst對應的是Table裏的一行,定義是

trait Row extends Seq[Any] with Serializable

SchemaRDD就兩部分實現,還有幾個SQLContext的方法調用

一是RDD的Function的實現

  // =========================================================================================
  // RDD functions: Copy the interal row representation so we present immutable data to users.
  // =========================================================================================
  override def compute(split: Partition, context: TaskContext): Iterator[Row] =
    firstParent[Row].compute(split, context).map(_.copy())

  override def getPartitions: Array[Partition] = firstParent[Row].partitions

  override protected def getDependencies: Seq[Dependency[_]] =
    List(new OneToOneDependency(queryExecution.toRdd))  // 該SchemaRDD與優化後的RDD是窄依賴

二是DSL function的實現,如

def select(exprs: NamedExpression*): SchemaRDD =
    new SchemaRDD(sqlContext, Project(exprs, logicalPlan))

每次DSL的操作會轉化成為新的SchemaRDD,

SchemaRDD的DSL操作與Catalyst組件提供的操作的對應關係為



DSL Operator的實現都依賴Catalyst的basicOperator,basicOperator裏的操作都是LogicalPlan的繼承類,主要分兩類,一元UnaryNode和二元BinaryNode操作。而UnaryNode和BinaryNode都是TreeNode的實現,TreeNode裏還有一種就是LeafNode。

basicOperator的各種實現都是caseclass,都是LogicalPlan,不具備execute能力




Hive


Hive Context

HiveContext是Spark SQL執行引擎之一,將hive數據結合到Spark環境中,讀取的配置在hive-site.xml裏指定。

繼承關係



HiveContext裏的sql parser使用的是HiveQl,

 

執行hql的時候,runHive方法接收cmd,且設置了最大返回行數

protected def runHive(cmd: String, maxRows: Int = 1000): Seq[String]

調用的方法是hive裏的類,返回結果存在java的ArrayList裏

 

錯誤日誌會記錄在outputBuffer裏,用於打印輸出


邏輯執行計劃的幾個步驟仍然類似SqlContext,因為QueryExecution也繼承了過來

abstract class QueryExecution extends super.QueryExecution {

區別在於使用的實例不一樣,且toRdd操作邏輯不一樣


Hive Catalog


使用HiveMetastoreCatalog存表信息

 

HiveMetastoreCatalog內,通過HiveContext的hiveconf,創建了hiveclient,所以可以進行getTable,getPartition,createTable操作

 

HiveMetastoreCatalog內的MetastoreRelation,繼承結構如下


通過hive的接口創建了Table,Partition,TableDesc,並帶一個隱式轉換HiveMetastoreTypes類,因為在把Schema裏的Field轉成Attribute的過程中,借助HiveMetastoreTypes的toDataType把Catalyst支持的DataType parse成hive支持的類型


Hive QL

參考HiveQl類

Hive UDF

object HiveFunctionRegistry
  extends analysis.FunctionRegistry with HiveFunctionFactory with HiveInspectors {

繼承FunctionRegistry,實現的是lookupFunction方法


HiveFunctionFactory主要做反射的事情,以及把hive的類型轉化成為catalyst type

包括

  def getFunctionInfo(name: String) = FunctionRegistry.getFunctionInfo(name)
  def getFunctionClass(name: String) = getFunctionInfo(name).getFunctionClass
  def createFunction[UDFType](name: String) =
    getFunctionClass(name).newInstance.asInstanceOf[UDFType]

HiveInspectors是Catalyst DataType和Hive ObjectInspector的轉化


Java類到Catalyst dataType的轉化

def javaClassToDataType(clz: Class[_]): DataType = clz match 

Hive Strategy

val hivePlanner = new SparkPlanner with HiveStrategies {
    val hiveContext = self

    override val strategies: Seq[Strategy] = Seq(
      TopK,
      ColumnPrunings,
      PartitionPrunings,
      HiveTableScans,
      DataSinks,
      Scripts,
      PartialAggregation,
      SparkEquiInnerJoin,
      BasicOperators,
      CartesianProduct,
      BroadcastNestedLoopJoin
    )
  }


Summary

之前的那篇Spark SQL組件源碼分析走讀了SQLContext的整個執行過程,有很多內容不夠具體。本文結合Catalyst,做了更詳細的說明。


全文完 :)

最後更新:2017-04-03 12:55:52

  上一篇:go 使用SQL Server Profiler跟蹤數據庫
  下一篇:go Java運算符