Spark SQL Columnar模塊源碼分析
概述
Columnar
InMemoryColumnarTableScan
實現
InMemoryColumnarTableScan類是SparkPlan LeafNode的實現,即是一個物理執行計劃。
private[sql] case class InMemoryColumnarTableScan(attributes: Seq[Attribute], child: SparkPlan) extends LeafNode {
傳入的child是一個SparkPlan(確認了的物理執行計劃)和一個屬性序列。
行轉列並cache的過程如下:
lazy val cachedColumnBuffers = { val output = child.output // 遍曆每個RDD的partiti on val cached = child.execute().mapPartitions { iterator => // 把屬性Seq轉換成為ColumnBuilder數組 val columnBuilders = output.map { attribute => // 都是基本ColumnBuilder,默認ByteBuffer大小 ColumnBuilder(ColumnType(attribute.dataType).typeId, 0, attribute.name) }.toArray var row: Row = null // RDD每個Partition的Rows,每個Row的所有field信息存到ColumnBuilder裏 while (iterator.hasNext) { row = iterator.next() var i = 0 while (i < row.length) { columnBuilders(i).appendFrom(row, i) i += 1 } } Iterator.single(columnBuilders.map(_.build())) }.cache() cached.setName(child.toString) // Force the materialization of the cached RDD. cached.count() cached }
ColumnType類用於表示Column的類型,他的typeId變量用來區分數據類型,生成對應的ColumnBuilder(typeId, initialSize=0, columnName)。ColumnBuilder的生成如下:
def apply(typeId: Int, initialSize: Int = 0, columnName: String = ""): ColumnBuilder = { val builder = (typeId match { case INT.typeId => new IntColumnBuilder case LONG.typeId => new LongColumnBuilder case FLOAT.typeId => new FloatColumnBuilder case DOUBLE.typeId => new DoubleColumnBuilder case BOOLEAN.typeId => new BooleanColumnBuilder case BYTE.typeId => new ByteColumnBuilder case SHORT.typeId => new ShortColumnBuilder case STRING.typeId => new StringColumnBuilder case BINARY.typeId => new BinaryColumnBuilder case GENERIC.typeId => new GenericColumnBuilder }).asInstanceOf[ColumnBuilder] builder.initialize(initialSize, columnName) builder }
他的繼承結構如下,主要有三大體係:

這裏涉及到的是Basic這個體係,繼承結構如下:

BasicColumnBuilder裏,initialSize = 0,指使用ByteBuffer的默認大小,即10*1024*104。然後在initialize()方法,會初始化ByteBuffer。
接下來,針對RDD每個partition,
var row: Row = null while (iterator.hasNext) { row = iterator.next() var i = 0 while (i < row.length) { columnBuilders(i).appendFrom(row, i) i += 1 } }
進行了appendFrom操作:
override def appendFrom(row: Row, ordinal: Int) { val field = columnType.getField(row, ordinal) buffer = ensureFreeSpace(buffer, columnType.actualSize(field)) columnType.append(field, buffer) }
用於把一個Row的每一個field,都存到一個ColumnBuilder裏。在這裏指BasicColumnBuilder這個類,維護了一個自己的ByteBuffer,把row裏的各個field信息都存在了buffer裏。
最後ColumnBuilders裏的每個ColumnBuilder進行build(),即BasicColumnBuilder.build()方法,進行了一次ByteBuffer的rewind()方法。
這個方法的結果是一個RDD集合。由於在結束前調用了.count()方法,所以RDD的計算是被執行了的,返回的是新的RDD。
在Spark SQL裏,外部調用cachedColumnBuffers方法隻有在uncache table的時候,進行了unpersisit()操作。
下麵看execute()方法:
override def execute() = { cachedColumnBuffers.mapPartitions { iterator => // 在RDD partition裏,iterator.next()返回的是一個ByteBuffer // 也就是說,cachedColumnBuffers返回的結果RDD,類型是ByteBuffer val columnBuffers = iterator.next() assert(!iterator.hasNext) new Iterator[Row] { // 訪問每一個ByteBuffer裏的列信息 val columnAccessors = columnBuffers.map(ColumnAccessor(_)) val nextRow = new GenericMutableRow(columnAccessors.length) override def next() = { var i = 0 // 把column裏的信息再轉到Row裏 while (i < nextRow.length) { columnAccessors(i).extractTo(nextRow, i) i += 1 } nextRow } override def hasNext = columnAccessors.head.hasNext } } }
使用
在SqlContext裏選擇cache table的時候,會使用該類。
其實在cache的時候,首先去catalog裏尋找這個table的信息和table的執行計劃,然後會進行執行(執行到物理執行計劃生成),然後把這個table再放回catalog裏維護起來,這個時候的執行計劃已經是最終要執行的物理執行計劃了。但是此時Columner模塊相關的轉換等操作都是沒有觸發的。
真正的觸發還是在execute()的時候,同其他SparkPlan的execute()方法觸發場景是一樣的。
ColumnBuilder 與 ColumnAccessor
一個包裝Row的每個field成Column;一個訪問column,然後可以轉回Row
關於壓縮
private[sql] abstract class NativeColumnBuilder[T <: NativeType]( override val columnStats: NativeColumnStats[T], override val columnType: NativeColumnType[T]) extends BasicColumnBuilder[T, T#JvmType](columnStats, columnType) with NullableColumnBuilder with AllCompressionSchemes with CompressibleColumnBuilder[T] private[sql] class BooleanColumnBuilder extends NativeColumnBuilder(new BooleanColumnStats, BOOLEAN) private[sql] class IntColumnBuilder extends NativeColumnBuilder(new IntColumnStats, INT)

從繼承結構看,壓縮的builder和Accessor都以trait的方式繼承了ColumnBuilder,而子類比如IntColumnBuilder,不但繼承了BaseColumnBuilder,同時也具備壓縮處理能力。
具體壓縮處理可以參考CompressibleColumnBuilder類裏的實現。
是否壓縮會做一次判斷,壓縮比在0.8以下才執行壓縮。
在build()的時候實施壓縮,並且按照以下結構存在bytebuffer內。
* .--------------------------- Column type ID (4 bytes) * | .----------------------- Null count N (4 bytes) * | | .------------------- Null positions (4 x N bytes, empty if null count is zero) * | | | .------------- Compression scheme ID (4 bytes) * | | | | .--------- Compressed non-null elements * V V V V V * +---+---+-----+---+---------+ * | | | ... | | ... ... | * +---+---+-----+---+---------+ * \-----------/ \-----------/ * header body
CompressionScheme子類是不同的壓縮實現

都是scala實現的,未借助第三方庫。不同的實現,指定了支持的column data類型。在build()的時候,會比較每種壓縮,選擇壓縮率最小的(若仍大於0.8就不壓縮了)。
這裏的估算能力,在子類實現裏,好像是由gatherCompressibilityStats方法實現的。
SqlContext
分析SqlContext內目前cache和uncache table的實現細節與Columnar的關係。
Cache Table
/** Caches the specified table in-memory. */ def cacheTable(tableName: String): Unit = { // 得到的是一個logicalPlan val currentTable = catalog.lookupRelation(None, tableName) // 物理執行計劃生成之後交給InMemoryColumnarTableScan val asInMemoryRelation = InMemoryColumnarTableScan(currentTable.output, executePlan(currentTable).executedPlan) // SparkLogicalPlan接受的Plan必須是已經確定plan好的SparkPlan catalog.registerTable(None, tableName, SparkLogicalPlan(asInMemoryRelation)) }
從上麵那段代碼可以看到,cache之前,需要先把本次cache的table的物理執行計劃生成出來。上述的currentTable其實是一個logicalPlan,來自catalog的lookupRelation。
最後注冊表的時候,涉及到的SparkLogicalPlan類是LogicalPlan的實現類(但是本身其實是一個SparkPlan),它接受的是SparkPlan,並且是已經確定Plan好了的邏輯執行計劃,目前接受兩類:ExistingRdd和InMemoryColumnarTableScan。
在cache這個過程裏,InMemoryColumnarTableScan並沒有執行,但是生成了以InMemoryColumnarTableScan為物理執行計劃的SparkLogicalPlan,並存成table的plan。
Uncache Table
在這一步,除了刪除catalog裏的table信息之外,還調用了InMemoryColumnarTableScan的cacheColumnBuffers方法,得到RDD集合,並進行了unpersist()操作。cacheColumnBuffers方法具體見Columner內,主要做了把RDD每個partition裏的ROW的每個Field存到了ColumnBuilder內。
最後更新:2017-04-03 12:56:11