閱讀343 返回首頁    go 微軟 go windows


Spark SQL Columnar模塊源碼分析

概述

本文介紹Spark SQL增加的Columnar模塊代碼實現。
首先介紹Columnar內的代碼結構和實現,然後介紹在SqlContext裏的使用方式。

Columnar

InMemoryColumnarTableScan

實現

InMemoryColumnarTableScan類是SparkPlan LeafNode的實現,即是一個物理執行計劃。

private[sql] case class InMemoryColumnarTableScan(attributes: Seq[Attribute], child: SparkPlan)
  extends LeafNode {

傳入的child是一個SparkPlan(確認了的物理執行計劃)和一個屬性序列。

 

行轉列並cache的過程如下:

  lazy val cachedColumnBuffers = {
    val output = child.output
    // 遍曆每個RDD的partiti	on
    val cached = child.execute().mapPartitions { iterator =>
      // 把屬性Seq轉換成為ColumnBuilder數組
      val columnBuilders = output.map { attribute =>
        // 都是基本ColumnBuilder,默認ByteBuffer大小
        ColumnBuilder(ColumnType(attribute.dataType).typeId, 0, attribute.name)
      }.toArray

      var row: Row = null
      // RDD每個Partition的Rows,每個Row的所有field信息存到ColumnBuilder裏
      while (iterator.hasNext) {
        row = iterator.next()
        var i = 0
        while (i < row.length) {
          columnBuilders(i).appendFrom(row, i)
          i += 1
        }
      }

      Iterator.single(columnBuilders.map(_.build()))
    }.cache()

    cached.setName(child.toString)
    // Force the materialization of the cached RDD.
    cached.count()
    cached
  }

ColumnType類用於表示Column的類型,他的typeId變量用來區分數據類型,生成對應的ColumnBuilder(typeId, initialSize=0, columnName)。ColumnBuilder的生成如下:

  def apply(typeId: Int, initialSize: Int = 0, columnName: String = ""): ColumnBuilder = {
    val builder = (typeId match {
      case INT.typeId     => new IntColumnBuilder
      case LONG.typeId    => new LongColumnBuilder
      case FLOAT.typeId   => new FloatColumnBuilder
      case DOUBLE.typeId  => new DoubleColumnBuilder
      case BOOLEAN.typeId => new BooleanColumnBuilder
      case BYTE.typeId    => new ByteColumnBuilder
      case SHORT.typeId   => new ShortColumnBuilder
      case STRING.typeId  => new StringColumnBuilder
      case BINARY.typeId  => new BinaryColumnBuilder
      case GENERIC.typeId => new GenericColumnBuilder
    }).asInstanceOf[ColumnBuilder]

    builder.initialize(initialSize, columnName)
    builder
  }

他的繼承結構如下,主要有三大體係:



這裏涉及到的是Basic這個體係,繼承結構如下:

BasicColumnBuilder裏,initialSize = 0,指使用ByteBuffer的默認大小,即10*1024*104。然後在initialize()方法,會初始化ByteBuffer。

 

接下來,針對RDD每個partition,

      var row: Row = null
      while (iterator.hasNext) {
        row = iterator.next()
        var i = 0
        while (i < row.length) {
          columnBuilders(i).appendFrom(row, i)
          i += 1
        }
      }

進行了appendFrom操作:

  override def appendFrom(row: Row, ordinal: Int) {
    val field = columnType.getField(row, ordinal)
    buffer = ensureFreeSpace(buffer, columnType.actualSize(field))
    columnType.append(field, buffer)
  }

用於把一個Row的每一個field,都存到一個ColumnBuilder裏。在這裏指BasicColumnBuilder這個類,維護了一個自己的ByteBuffer,把row裏的各個field信息都存在了buffer裏。


最後ColumnBuilders裏的每個ColumnBuilder進行build(),即BasicColumnBuilder.build()方法,進行了一次ByteBuffer的rewind()方法。

 

這個方法的結果是一個RDD集合。由於在結束前調用了.count()方法,所以RDD的計算是被執行了的,返回的是新的RDD。

在Spark SQL裏,外部調用cachedColumnBuffers方法隻有在uncache table的時候,進行了unpersisit()操作。


下麵看execute()方法:

  override def execute() = {
cachedColumnBuffers.mapPartitions { iterator =>
  // 在RDD partition裏,iterator.next()返回的是一個ByteBuffer
  // 也就是說,cachedColumnBuffers返回的結果RDD,類型是ByteBuffer
      val columnBuffers = iterator.next()
      assert(!iterator.hasNext)

      new Iterator[Row] {
	    // 訪問每一個ByteBuffer裏的列信息
        val columnAccessors = columnBuffers.map(ColumnAccessor(_))
        val nextRow = new GenericMutableRow(columnAccessors.length)

        override def next() = {
          var i = 0
          // 把column裏的信息再轉到Row裏
          while (i < nextRow.length) {
            columnAccessors(i).extractTo(nextRow, i)
            i += 1
          }
          nextRow
        }

        override def hasNext = columnAccessors.head.hasNext
      }
    }
  }

使用

在SqlContext裏選擇cache table的時候,會使用該類。

其實在cache的時候,首先去catalog裏尋找這個table的信息和table的執行計劃,然後會進行執行(執行到物理執行計劃生成),然後把這個table再放回catalog裏維護起來,這個時候的執行計劃已經是最終要執行的物理執行計劃了。但是此時Columner模塊相關的轉換等操作都是沒有觸發的。

真正的觸發還是在execute()的時候,同其他SparkPlan的execute()方法觸發場景是一樣的。


ColumnBuilder 與 ColumnAccessor

一個包裝Row的每個field成Column;一個訪問column,然後可以轉回Row


關於壓縮

private[sql] abstract class NativeColumnBuilder[T <: NativeType](
    override val columnStats: NativeColumnStats[T],
    override val columnType: NativeColumnType[T])
  extends BasicColumnBuilder[T, T#JvmType](columnStats, columnType)
  with NullableColumnBuilder
  with AllCompressionSchemes
  with CompressibleColumnBuilder[T]

private[sql] class BooleanColumnBuilder extends NativeColumnBuilder(new BooleanColumnStats, BOOLEAN)

private[sql] class IntColumnBuilder extends NativeColumnBuilder(new IntColumnStats, INT)


從繼承結構看,壓縮的builder和Accessor都以trait的方式繼承了ColumnBuilder,而子類比如IntColumnBuilder,不但繼承了BaseColumnBuilder,同時也具備壓縮處理能力。

 

具體壓縮處理可以參考CompressibleColumnBuilder類裏的實現。

是否壓縮會做一次判斷,壓縮比在0.8以下才執行壓縮。

在build()的時候實施壓縮,並且按照以下結構存在bytebuffer內。

 *    .--------------------------- Column type ID (4 bytes)
 *    |   .----------------------- Null count N (4 bytes)
 *    |   |   .------------------- Null positions (4 x N bytes, empty if null count is zero)
 *    |   |   |     .------------- Compression scheme ID (4 bytes)
 *    |   |   |     |   .--------- Compressed non-null elements
 *    V   V   V     V   V
 *    +---+---+-----+---+---------+
 *    |   |   | ... |   | ... ... |
 *    +---+---+-----+---+---------+
 *    \-----------/ \-----------/
 *        header         body


CompressionScheme子類是不同的壓縮實現

都是scala實現的,未借助第三方庫。不同的實現,指定了支持的column data類型。在build()的時候,會比較每種壓縮,選擇壓縮率最小的(若仍大於0.8就不壓縮了)。

這裏的估算能力,在子類實現裏,好像是由gatherCompressibilityStats方法實現的。


SqlContext

分析SqlContext內目前cache和uncache table的實現細節與Columnar的關係。


Cache Table

  /** Caches the specified table in-memory. */
  def cacheTable(tableName: String): Unit = {
    // 得到的是一個logicalPlan
    val currentTable = catalog.lookupRelation(None, tableName)

    // 物理執行計劃生成之後交給InMemoryColumnarTableScan
    val asInMemoryRelation =
      InMemoryColumnarTableScan(currentTable.output, executePlan(currentTable).executedPlan)
    
    // SparkLogicalPlan接受的Plan必須是已經確定plan好的SparkPlan
    catalog.registerTable(None, tableName, SparkLogicalPlan(asInMemoryRelation))
  }

從上麵那段代碼可以看到,cache之前,需要先把本次cache的table的物理執行計劃生成出來。上述的currentTable其實是一個logicalPlan,來自catalog的lookupRelation。

最後注冊表的時候,涉及到的SparkLogicalPlan類是LogicalPlan的實現類(但是本身其實是一個SparkPlan),它接受的是SparkPlan,並且是已經確定Plan好了的邏輯執行計劃,目前接受兩類:ExistingRdd和InMemoryColumnarTableScan。

 

在cache這個過程裏,InMemoryColumnarTableScan並沒有執行,但是生成了以InMemoryColumnarTableScan為物理執行計劃的SparkLogicalPlan,並存成table的plan。


Uncache Table

在這一步,除了刪除catalog裏的table信息之外,還調用了InMemoryColumnarTableScan的cacheColumnBuffers方法,得到RDD集合,並進行了unpersist()操作。cacheColumnBuffers方法具體見Columner內,主要做了把RDD每個partition裏的ROW的每個Field存到了ColumnBuilder內。




全文完 :)

最後更新:2017-04-03 12:56:11

  上一篇:go Sql查詢原理與Select執行順序(詳細)
  下一篇:go 【JAVA大數訓練】Exponentiation