Spark SQL Columnar模塊源碼分析
概述
Columnar
InMemoryColumnarTableScan
實現
InMemoryColumnarTableScan類是SparkPlan LeafNode的實現,即是一個物理執行計劃。

private[sql] case class InMemoryColumnarTableScan(attributes: Seq[Attribute], child: SparkPlan)
extends LeafNode {
傳入的child是一個SparkPlan(確認了的物理執行計劃)和一個屬性序列。
行轉列並cache的過程如下:
lazy val cachedColumnBuffers = {
val output = child.output
// 遍曆每個RDD的partiti on
val cached = child.execute().mapPartitions { iterator =>
// 把屬性Seq轉換成為ColumnBuilder數組
val columnBuilders = output.map { attribute =>
// 都是基本ColumnBuilder,默認ByteBuffer大小
ColumnBuilder(ColumnType(attribute.dataType).typeId, 0, attribute.name)
}.toArray
var row: Row = null
// RDD每個Partition的Rows,每個Row的所有field信息存到ColumnBuilder裏
while (iterator.hasNext) {
row = iterator.next()
var i = 0
while (i < row.length) {
columnBuilders(i).appendFrom(row, i)
i += 1
}
}
Iterator.single(columnBuilders.map(_.build()))
}.cache()
cached.setName(child.toString)
// Force the materialization of the cached RDD.
cached.count()
cached
}
ColumnType類用於表示Column的類型,他的typeId變量用來區分數據類型,生成對應的ColumnBuilder(typeId, initialSize=0, columnName)。ColumnBuilder的生成如下:
def apply(typeId: Int, initialSize: Int = 0, columnName: String = ""): ColumnBuilder = {
val builder = (typeId match {
case INT.typeId => new IntColumnBuilder
case LONG.typeId => new LongColumnBuilder
case FLOAT.typeId => new FloatColumnBuilder
case DOUBLE.typeId => new DoubleColumnBuilder
case BOOLEAN.typeId => new BooleanColumnBuilder
case BYTE.typeId => new ByteColumnBuilder
case SHORT.typeId => new ShortColumnBuilder
case STRING.typeId => new StringColumnBuilder
case BINARY.typeId => new BinaryColumnBuilder
case GENERIC.typeId => new GenericColumnBuilder
}).asInstanceOf[ColumnBuilder]
builder.initialize(initialSize, columnName)
builder
}
他的繼承結構如下,主要有三大體係:

這裏涉及到的是Basic這個體係,繼承結構如下:

BasicColumnBuilder裏,initialSize = 0,指使用ByteBuffer的默認大小,即10*1024*104。然後在initialize()方法,會初始化ByteBuffer。
接下來,針對RDD每個partition,
var row: Row = null
while (iterator.hasNext) {
row = iterator.next()
var i = 0
while (i < row.length) {
columnBuilders(i).appendFrom(row, i)
i += 1
}
}
進行了appendFrom操作:
override def appendFrom(row: Row, ordinal: Int) {
val field = columnType.getField(row, ordinal)
buffer = ensureFreeSpace(buffer, columnType.actualSize(field))
columnType.append(field, buffer)
}
用於把一個Row的每一個field,都存到一個ColumnBuilder裏。在這裏指BasicColumnBuilder這個類,維護了一個自己的ByteBuffer,把row裏的各個field信息都存在了buffer裏。
最後ColumnBuilders裏的每個ColumnBuilder進行build(),即BasicColumnBuilder.build()方法,進行了一次ByteBuffer的rewind()方法。
這個方法的結果是一個RDD集合。由於在結束前調用了.count()方法,所以RDD的計算是被執行了的,返回的是新的RDD。
在Spark SQL裏,外部調用cachedColumnBuffers方法隻有在uncache table的時候,進行了unpersisit()操作。
下麵看execute()方法:
override def execute() = {
cachedColumnBuffers.mapPartitions { iterator =>
// 在RDD partition裏,iterator.next()返回的是一個ByteBuffer
// 也就是說,cachedColumnBuffers返回的結果RDD,類型是ByteBuffer
val columnBuffers = iterator.next()
assert(!iterator.hasNext)
new Iterator[Row] {
// 訪問每一個ByteBuffer裏的列信息
val columnAccessors = columnBuffers.map(ColumnAccessor(_))
val nextRow = new GenericMutableRow(columnAccessors.length)
override def next() = {
var i = 0
// 把column裏的信息再轉到Row裏
while (i < nextRow.length) {
columnAccessors(i).extractTo(nextRow, i)
i += 1
}
nextRow
}
override def hasNext = columnAccessors.head.hasNext
}
}
}
使用
在SqlContext裏選擇cache table的時候,會使用該類。
其實在cache的時候,首先去catalog裏尋找這個table的信息和table的執行計劃,然後會進行執行(執行到物理執行計劃生成),然後把這個table再放回catalog裏維護起來,這個時候的執行計劃已經是最終要執行的物理執行計劃了。但是此時Columner模塊相關的轉換等操作都是沒有觸發的。
真正的觸發還是在execute()的時候,同其他SparkPlan的execute()方法觸發場景是一樣的。
ColumnBuilder 與 ColumnAccessor
一個包裝Row的每個field成Column;一個訪問column,然後可以轉回Row
關於壓縮
private[sql] abstract class NativeColumnBuilder[T <: NativeType](
override val columnStats: NativeColumnStats[T],
override val columnType: NativeColumnType[T])
extends BasicColumnBuilder[T, T#JvmType](columnStats, columnType)
with NullableColumnBuilder
with AllCompressionSchemes
with CompressibleColumnBuilder[T]
private[sql] class BooleanColumnBuilder extends NativeColumnBuilder(new BooleanColumnStats, BOOLEAN)
private[sql] class IntColumnBuilder extends NativeColumnBuilder(new IntColumnStats, INT)

從繼承結構看,壓縮的builder和Accessor都以trait的方式繼承了ColumnBuilder,而子類比如IntColumnBuilder,不但繼承了BaseColumnBuilder,同時也具備壓縮處理能力。
具體壓縮處理可以參考CompressibleColumnBuilder類裏的實現。
是否壓縮會做一次判斷,壓縮比在0.8以下才執行壓縮。
在build()的時候實施壓縮,並且按照以下結構存在bytebuffer內。
* .--------------------------- Column type ID (4 bytes) * | .----------------------- Null count N (4 bytes) * | | .------------------- Null positions (4 x N bytes, empty if null count is zero) * | | | .------------- Compression scheme ID (4 bytes) * | | | | .--------- Compressed non-null elements * V V V V V * +---+---+-----+---+---------+ * | | | ... | | ... ... | * +---+---+-----+---+---------+ * \-----------/ \-----------/ * header body
CompressionScheme子類是不同的壓縮實現

都是scala實現的,未借助第三方庫。不同的實現,指定了支持的column data類型。在build()的時候,會比較每種壓縮,選擇壓縮率最小的(若仍大於0.8就不壓縮了)。
這裏的估算能力,在子類實現裏,好像是由gatherCompressibilityStats方法實現的。
SqlContext
分析SqlContext內目前cache和uncache table的實現細節與Columnar的關係。
Cache Table
/** Caches the specified table in-memory. */
def cacheTable(tableName: String): Unit = {
// 得到的是一個logicalPlan
val currentTable = catalog.lookupRelation(None, tableName)
// 物理執行計劃生成之後交給InMemoryColumnarTableScan
val asInMemoryRelation =
InMemoryColumnarTableScan(currentTable.output, executePlan(currentTable).executedPlan)
// SparkLogicalPlan接受的Plan必須是已經確定plan好的SparkPlan
catalog.registerTable(None, tableName, SparkLogicalPlan(asInMemoryRelation))
}
從上麵那段代碼可以看到,cache之前,需要先把本次cache的table的物理執行計劃生成出來。上述的currentTable其實是一個logicalPlan,來自catalog的lookupRelation。
最後注冊表的時候,涉及到的SparkLogicalPlan類是LogicalPlan的實現類(但是本身其實是一個SparkPlan),它接受的是SparkPlan,並且是已經確定Plan好了的邏輯執行計劃,目前接受兩類:ExistingRdd和InMemoryColumnarTableScan。
在cache這個過程裏,InMemoryColumnarTableScan並沒有執行,但是生成了以InMemoryColumnarTableScan為物理執行計劃的SparkLogicalPlan,並存成table的plan。
Uncache Table
在這一步,除了刪除catalog裏的table信息之外,還調用了InMemoryColumnarTableScan的cacheColumnBuffers方法,得到RDD集合,並進行了unpersist()操作。cacheColumnBuffers方法具體見Columner內,主要做了把RDD每個partition裏的ROW的每個Field存到了ColumnBuilder內。
最後更新:2017-04-03 12:56:11