閱讀963 返回首頁    go 技術社區[雲棲]


Spark SQL 物理執行計劃各操作實現

SparkStrategy: logical to physical

Catalyst作為一個實現無關的查詢優化框架,在優化後的邏輯執行計劃到真正的物理執行計劃這部分隻提供了接口,沒有提供像Analyzer和Optimizer那樣的實現。

本文介紹的是Spark SQL組件各個物理執行計劃的操作實現。把優化後的邏輯執行計劃映射到物理執行操作類這部分由SparkStrategies類實現,內部基於Catalyst提供的Strategy接口,實現了一些策略,用於分辨logicalPlan子類並替換為合適的SparkPlan子類。


SparkPlan繼承體係如下。接下裏會具體介紹其子類的實現。



SparkPlan

主要三部分:LeafNode、UnaryNode、BinaryNode

各自的實現類:



提供四個需要子類重載的方法

  // TODO: Move to `DistributedPlan`
  /** Specifies how data is partitioned across different nodes in the cluster. */
  def outputPartitioning: Partitioning = UnknownPartitioning(0) // TODO: WRONG WIDTH!
  /** Specifies any partition requirements on the input data for this operator. */
  def requiredChildDistribution: Seq[Distribution] =
    Seq.fill(children.size)(UnspecifiedDistribution)

  def execute(): RDD[Row]
  def executeCollect(): Array[Row] = execute().collect()

Distribution和Partitioning類用於表示數據分布情況。有以下幾類,可以望文生義。


LeafNode


ExistingRdd

先介紹下Row和GenericRow的概念。

Row是一行output對應的數據,提供getXXX(i: Int)方法

trait Row extends Seq[Any] with Serializable

支持數據類型包括Int, Long, Double, Float, Boolean, Short, Byte, String。支持按序數(ordinal)讀取某一個列的值。讀取前需要做isNullAt(i: Int)的判斷。

對應的有一個MutableRow類,提供setXXX(i: Int, value: Any)方法。可以修改(set)某序數上的值


GenericRow是Row的一種方便實現,存的是一個數組

class GenericRow(protected[catalyst] val values: Array[Any]) extends Row

所以對應的取值操作和判斷是否為空操作會轉化為數組上的定位取值操作。

它也有一個對應的GenericMutableRow類,可以修改(set)值。


ExistingRdd用於把綁定了case class的rdd的數據,轉變為RDD[Row],同時反射提取出case class的屬性(output)。轉化過程的單例類和伴生對象如下:

object ExistingRdd {
  def convertToCatalyst(a: Any): Any = a match {
    case s: Seq[Any] => s.map(convertToCatalyst)
    case p: Product => new GenericRow(p.productIterator.map(convertToCatalyst).toArray)
    case other => other
  }
  // 把RDD[A]映射成為RDD[Row],map A中每一行數據
  def productToRowRdd[A <: Product](data: RDD[A]): RDD[Row] = {
    // TODO: Reuse the row, don't use map on the product iterator.  Maybe code gen?
    data.map(r => new GenericRow(r.productIterator.map(convertToCatalyst).toArray): Row)
  }

  def fromProductRdd[A <: Product : TypeTag](productRdd: RDD[A]) = {
    ExistingRdd(ScalaReflection.attributesFor[A], productToRowRdd(productRdd))
  }
}

case class ExistingRdd(output: Seq[Attribute], rdd: RDD[Row]) extends LeafNode {
  def execute() = rdd
}

UnaryNode


Aggregate

隱式轉換聲明,針對本地分區的RDD,擴充了一些操作

/* Implicit conversions */
import org.apache.spark.rdd.PartitionLocalRDDFunctions._

Groups input data by`groupingExpressions` and computes the `aggregateExpressions` for each group.

@param child theinput data source.

case class Aggregate(
    partial: Boolean,
    groupingExpressions: Seq[Expression],
    aggregateExpressions: Seq[NamedExpression],
    child: SparkPlan)(@transient sc: SparkContext)

在初始化的時候,partial這個參數用來標誌本次Aggregate操作隻在本地做,還是要去到符合groupExpression的其他partition上都做。該判斷邏輯如下:

override def requiredChildDistribution =
    if (partial) { // true, 未知的分布
      UnspecifiedDistribution :: Nil
} else {
  // 如果為空,則分布情況是全部的tuple在一個single partition裏
      if (groupingExpressions == Nil) { 
        AllTuples :: Nil
	  // 否則是集群分布的,分布規則來自groupExpressions
      } else {
        ClusteredDistribution(groupingExpressions) :: Nil
      }
    }

最重要的execute()方法:
def execute() = attachTree(this, "execute") {
  // 這裏進行了一次隱式轉換,生成了PartitionLocalRDDFunctions
  val grouped = child.execute().mapPartitions { iter =>
    val buildGrouping = new Projection(groupingExpressions)
    iter.map(row => (buildGrouping(row), row.copy()))
  }.groupByKeyLocally()  // 這裏生成的結果是RDD[(K, Seq[V])]

  val result = grouped.map { case (group, rows) =>
// 這一步會把aggregateExpressions對應到具體的spark方法都找出來
// 具體做法是遍曆aggregateExpressions,各自newInstance
    val aggImplementations = createAggregateImplementations()

    // Pull out all the functions so we can feed each row into them.
    val aggFunctions = aggImplementations.flatMap(_ collect { case f: AggregateFunction => f })

    rows.foreach { row =>
      aggFunctions.foreach(_.update(row))
    }
    buildRow(aggImplementations.map(_.apply(group)))
  }

  // TODO: THIS BREAKS PIPELINING, DOUBLE COMPUTES THE ANSWER, AND USES TOO MUCH MEMORY...
  if (groupingExpressions.isEmpty && result.count == 0) {
    // When there is no output to the Aggregate operator, we still output an empty row.
    val aggImplementations = createAggregateImplementations()
    sc.makeRDD(buildRow(aggImplementations.map(_.apply(null))) :: Nil)
  } else {
    result
  }
}

AggregateExpression繼承體係如下,這部分代碼在Catalyst expressions包的aggregates.scala裏:


他的第一類實現AggregateFunction,帶一個update(input: Row)操作。子類的update操作是實際對Row執行變化。


DebugNode

DebugNode是把傳進來child SparkPlan調用execute()執行,然後把結果childRdd逐個輸出查看

case class DebugNode(child: SparkPlan) extends UnaryNode

Exchange

case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends UnaryNode

為某個SparkPlan,實施新的分區策略。
execute()方法:
def execute() = attachTree(this , "execute") {
    newPartitioning match {
      case HashPartitioning(expressions, numPartitions) =>
        // 把expression作用到rdd每個partition的每個row上
        val rdd = child.execute().mapPartitions { iter =>
          val hashExpressions = new MutableProjection(expressions)
          val mutablePair = new MutablePair[Row, Row]() // 相當於Tuple2
          iter.map(r => mutablePair.update(hashExpressions(r), r))
        }
        val part = new HashPartitioner(numPartitions)
        // 生成ShuffledRDD
        val shuffled = new ShuffledRDD[Row, Row, MutablePair[Row, Row]](rdd, part)
        shuffled.setSerializer(new SparkSqlSerializer(new SparkConf(false)))
        shuffled.map(_._2) // 輸出Tuple2裏的第二個值

      case RangePartitioning(sortingExpressions, numPartitions) =>
        // TODO: RangePartitioner should take an Ordering.
        implicit val ordering = new RowOrdering(sortingExpressions)

        val rdd = child.execute().mapPartitions { iter =>
          val mutablePair = new MutablePair[Row, Null](null, null)
          iter.map(row => mutablePair.update(row, null))
        }
        val part = new RangePartitioner(numPartitions, rdd, ascending = true)
        val shuffled = new ShuffledRDD[Row, Null, MutablePair[Row, Null]](rdd, part)
        shuffled.setSerializer(new SparkSqlSerializer(new SparkConf(false)))
        shuffled.map(_._1)

      case SinglePartition =>
        child.execute().coalesce(1, shuffle = true)

      case _ => sys.error(s"Exchange not implemented for $newPartitioning")
      // TODO: Handle BroadcastPartitioning.
    }
  }

Filter

case class Filter(condition: Expression, child: SparkPlan) extends UnaryNode

def execute() = child.execute().mapPartitions { iter =>
  iter.filter(condition.apply(_).asInstanceOf[Boolean])
}

Generate

case class Generate(
    generator: Generator,
    join: Boolean,
    outer: Boolean,
    child: SparkPlan)
  extends UnaryNode

首先,Generator是表達式的子類,繼承結構如下


Generator的作用是把input的row處理後輸出0個或多個rows,makeOutput()的策略由子類實現。

Explode類做法是將輸入的input array裏的每一個value(可能是ArrayType,可能是MapType),變成一個GenericRow(Array(v)),輸出就是一個


回到Generate操作,

join布爾值用於指定最後輸出的結果是否要和輸入的原tuple顯示做join

outer布爾值隻有在join為true的時候才生效,且outer為true的時候,每個input的row都至少會被作為一次output


總體上,Generate操作類似FP裏的flatMap操作

  def execute() = {
    if (join) {
      child.execute().mapPartitions { iter =>
        val nullValues = Seq.fill(generator.output.size)(Literal(null))
        // Used to produce rows with no matches when outer = true.
        val outerProjection =
          new Projection(child.output ++ nullValues, child.output)

        val joinProjection =
          new Projection(child.output ++ generator.output, child.output ++ generator.output)
        val joinedRow = new JoinedRow

        iter.flatMap {row =>
          val outputRows = generator(row)
          if (outer && outputRows.isEmpty) {
            outerProjection(row) :: Nil
          } else {
            outputRows.map(or => joinProjection(joinedRow(row, or)))
          }
        }
      }
    } else {
      child.execute().mapPartitions(iter => iter.flatMap(generator))
    }
  }

Project

case class Project(projectList: Seq[NamedExpression], child: SparkPlan) extends UnaryNode

project的執行:

  def execute() = child.execute().mapPartitions { iter =>
    @transient val reusableProjection = new MutableProjection(projectList)
    iter.map(reusableProjection)
  }

MutableProjection類是Row => Row的繼承類,它構造的時候接收一個Seq[Expression],還允許接收一個inputSchema: Seq[Attribute]。MutableProjection用於根據表達式(和Schema,如果有Schema的話)把Row映射成新的Row,改變內部的column。


Sample

case class Sample(fraction: Double, withReplacement: Boolean, seed: Int, child: SparkPlan)  extends UnaryNode

def execute() = child.execute().sample(withReplacement, fraction, seed)

RDD的sample操作:

  def sample(withReplacement: Boolean, fraction: Double, seed: Int): RDD[T] = {
    require(fraction >= 0.0, "Invalid fraction value: " + fraction)
    if (withReplacement) {
      new PartitionwiseSampledRDD[T, T](this, new PoissonSampler[T](fraction), seed)
    } else {
      new PartitionwiseSampledRDD[T, T](this, new BernoulliSampler[T](fraction), seed)
    }
  }

生成的PartitionwiseSampledRDD會在RDD的每個partition都選取樣本

PossionSampler和BernoulliSampler是RandomSampler的兩種實現。


Sort

case class Sort(
    sortOrder: Seq[SortOrder],
    global: Boolean,
    child: SparkPlan)
  extends UnaryNode

對分布有要求:

override def requiredChildDistribution =
  if (global) OrderedDistribution(sortOrder) :: Nil 
else UnspecifiedDistribution :: Nil

SortOrder類是UnaryExpression的實現,定義了tuple排序的策略(遞增或遞減)。該類隻是為child expression們聲明了排序策略。之所以繼承Expression,是為了能影響到子樹。

case class SortOrder(child: Expression, direction: SortDirection) extends UnaryExpression

// RowOrdering繼承Ordering[Row]
@transient
  lazy val ordering = new RowOrdering(sortOrder)

  def execute() = attachTree(this, "sort") {
    // TODO: Optimize sorting operation?
    child.execute()
      .mapPartitions(iterator => iterator.map(_.copy()).toArray.sorted(ordering).iterator,
        preservesPartitioning = true)
  }

有一次隱式轉換過程,.sorted是array自帶的一個方法,因為ordering是RowOrdering類,該類繼承Ordering[T],是scala.math.Ordering[T]類。


StopAfter

case class StopAfter(limit: Int, child: SparkPlan)(@transient sc: SparkContext) extends UnaryNode

StopAfter實質上是一次limit操作

  override def executeCollect() = child.execute().map(_.copy()).take(limit)
  def execute() = sc.makeRDD(executeCollect(), 1) // 設置並行度為1

makeRDD實質上調用的是new ParallelCollectionRDD[T]的操作,此處的seq為tabke()返回的Array[T],而numSlices為1:

/** Distribute a local Scala collection to form an RDD. */
  def parallelize[T: ClassTag](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T] = {
    new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]())
  }

TopK

case class TopK(limit: Int, sortOrder: Seq[SortOrder], child: SparkPlan)
(@transient sc: SparkContext) extends UnaryNode

可以把TopK理解為類似Sort和StopAfter的結合,

  @transient
  lazy val ordering = new RowOrdering(sortOrder)

  override def executeCollect() = child.execute().map(_.copy()).takeOrdered(limit)(ordering)
  def execute() = sc.makeRDD(executeCollect(), 1)

takeOrdered(num)(sorting)實際觸發的是RDD的top()()操作
 def top(num: Int)(implicit ord: Ordering[T]): Array[T] = {
    mapPartitions { items =>
      val queue = new BoundedPriorityQueue[T](num)
      queue ++= items
      Iterator.single(queue)
    }.reduce { (queue1, queue2) =>
      queue1 ++= queue2
      queue1
    }.toArray.sorted(ord.reverse)
  }

BoundedPriorityQueue是Spark util包裏的一個數據結構,包裝了PriorityQueue,他的優化點在於限製了優先隊列的大小,比如在添加元素的時候,如果超出size了,就會進行對堆進行比較和替換。適合TopK的場景。

所以每個partition在排序前,隻會產生一個num大小的BPQ(最後隻需要選Top num個),合並之後才做真正的排序,最後選出前num個。


BinaryNode


BroadcastNestedLoopJoin

case class BroadcastNestedLoopJoin(
    streamed: SparkPlan, broadcast: SparkPlan, joinType: JoinType, condition: Option[Expression])
    (@transient sc: SparkContext)
  extends BinaryNode

比較複雜的一次join操作,操作如下,
  def execute() = {
    // 先將需要廣播的SparkPlan執行後進行一次broadcast操作
    val broadcastedRelation = 
    sc.broadcast(broadcast.execute().map(_.copy()).collect().toIndexedSeq)

    val streamedPlusMatches = streamed.execute().mapPartitions { streamedIter =>
      val matchedRows = new mutable.ArrayBuffer[Row]
      val includedBroadcastTuples =  
        new mutable.BitSet(broadcastedRelation.value.size)
      val joinedRow = new JoinedRow
      
      streamedIter.foreach { streamedRow =>
        var i = 0
        var matched = false

        while (i < broadcastedRelation.value.size) {
          // TODO: One bitset per partition instead of per row.
          val broadcastedRow = broadcastedRelation.value(i)
          if (boundCondition(joinedRow(streamedRow, broadcastedRow)).asInstanceOf[Boolean]) {
            matchedRows += buildRow(streamedRow ++ broadcastedRow)
            matched = true
            includedBroadcastTuples += i
          }
          i += 1
        }

        if (!matched && (joinType == LeftOuter || joinType == FullOuter)) {
          matchedRows += buildRow(streamedRow ++ Array.fill(right.output.size)(null))
        }
      }
      Iterator((matchedRows, includedBroadcastTuples))
    }

    val includedBroadcastTuples = streamedPlusMatches.map(_._2)
    val allIncludedBroadcastTuples =
      if (includedBroadcastTuples.count == 0) {
        new scala.collection.mutable.BitSet(broadcastedRelation.value.size)
      } else {
        streamedPlusMatches.map(_._2).reduce(_ ++ _)
      }

    val rightOuterMatches: Seq[Row] =
      if (joinType == RightOuter || joinType == FullOuter) {
        broadcastedRelation.value.zipWithIndex.filter {
          case (row, i) => !allIncludedBroadcastTuples.contains(i)
        }.map {
          // TODO: Use projection.
          case (row, _) => buildRow(Vector.fill(left.output.size)(null) ++ row)
        }
      } else {
        Vector()
      }

    // TODO: Breaks lineage.
    sc.union(
      streamedPlusMatches.flatMap(_._1), sc.makeRDD(rightOuterMatches))
  }

CartesianProduct

case class CartesianProduct(left: SparkPlan, right: SparkPlan) extends BinaryNode

調用的是RDD的笛卡爾積操作,

def execute() = 
  left.execute().map(_.copy()).cartesian(right.execute().map(_.copy())).map {
    case (l: Row, r: Row) => buildRow(l ++ r)
  }

SparkEquiInnerJoin

case class SparkEquiInnerJoin(
    leftKeys: Seq[Expression],
    rightKeys: Seq[Expression],
    left: SparkPlan,
    right: SparkPlan) extends BinaryNode

該join操作適用於left和right兩部分partition一樣大且提供各自keys的情況。

基本上看代碼就可以了,沒有什麼可以說明的,做local join的時候借助的是PartitionLocalRDDFunctions裏的方法。

  def execute() = attachTree(this, "execute") {
    val leftWithKeys = left.execute().mapPartitions { iter =>
      val generateLeftKeys = new Projection(leftKeys, left.output) // 傳入了Schema
      iter.map(row => (generateLeftKeys(row), row.copy()))
    }

    val rightWithKeys = right.execute().mapPartitions { iter =>
      val generateRightKeys = new Projection(rightKeys, right.output)
      iter.map(row => (generateRightKeys(row), row.copy()))
    }

    // Do the join.
    // joinLocally是PartitionLocalRDDFunctions的方法
    val joined = filterNulls(leftWithKeys).joinLocally(filterNulls(rightWithKeys))
    // Drop join keys and merge input tuples.
    joined.map { case (_, (leftTuple, rightTuple)) => buildRow(leftTuple ++ rightTuple) }
  }

  /**
   * Filters any rows where the any of the join keys is null, ensuring three-valued
   * logic for the equi-join conditions.
   */
  protected def filterNulls(rdd: RDD[(Row, Row)]) =
    rdd.filter {
      case (key: Seq[_], _) => !key.exists(_ == null)
    }

PartitionLocalRDDFunctions方法如下,該操作並不引入shuffle操作。兩個RDD的partition數目需要相等。

  def joinLocally[W](other: RDD[(K, W)]): RDD[(K, (V, W))] = {
    cogroupLocally(other).flatMapValues {
      case (vs, ws) => for (v <- vs.iterator; w <- ws.iterator) yield (v, w)
    }
  }

Other

Union

該操作直接繼承SparkPlan

case class Union(children: Seq[SparkPlan])(@transient sc: SparkContext) extends SparkPlan

用傳入的SparkPlan集合各自的RDD執行結果生成一個UnionRDD

 def execute() = sc.union(children.map(_.execute()))




全文完 :)


最後更新:2017-04-03 12:55:58

  上一篇:go 製作.9.png
  下一篇:go 移動APP創業思緒之一:綠色生活