整理對Spark SQL的理解
Catalyst
Catalyst是與Spark解耦的一個獨立庫,是一個impl-free的執行計劃的生成和優化框架。
目前與Spark Core還是耦合的,對此user郵件組裏有人對此提出疑問,見mail。
以下是Catalyst較早時候的架構圖,展示的是代碼結構和處理流程。

Catalyst定位
其他係統如果想基於Spark做一些類sql、標準sql甚至其他查詢語言的查詢,需要基於Catalyst提供的解析器、執行計劃樹結構、邏輯執行計劃的處理規則體係等類體係來實現執行計劃的解析、生成、優化、映射工作。
對應上圖中,主要是左側的TreeNodelib及中間三次轉化過程中涉及到的類結構都是Catalyst提供的。至於右側物理執行計劃映射生成過程,物理執行計劃基於成本的優化模型,具體物理算子的執行都由係統自己實現。
Catalyst現狀
在解析器方麵提供的是一個簡單的scala寫的sql parser,支持語義有限,而且應該是標準sql的。
在規則方麵,提供的優化規則是比較基礎的(和Pig/Hive比沒有那麼豐富),不過一些優化規則其實是要涉及到具體物理算子的,所以部分規則需要在係統方那自己製定和實現(如spark-sql裏的SparkStrategy)。
Catalyst也有自己的一套數據類型。
下麵介紹Catalyst裏幾套重要的類結構。
TreeNode體係
TreeNode是Catalyst執行計劃表示的數據結構,是一個樹結構,具備一些scala collection的操作能力和樹遍曆能力。這棵樹一直在內存裏維護,不會dump到磁盤以某種格式的文件存在,且無論在映射邏輯執行計劃階段還是優化邏輯執行計劃階段,樹的修改是以替換已有節點的方式進行的。
TreeNode,內部帶一個children: Seq[BaseType]表示孩子節點,具備foreach、map、collect等針對節點操作的方法,以及transformDown(默認,前序遍曆)、transformUp這樣的遍曆樹上節點,對匹配節點實施變化的方法。
提供UnaryNode,BinaryNode, LeafNode三種trait,即非葉子節點允許有一個或兩個子節點。
TreeNode提供的是範型。
TreeNode有兩個子類繼承體係,QueryPlan和Expression。QueryPlan下麵是邏輯和物理執行計劃兩個體係,前者在Catalyst裏有詳細實現,後者需要在係統自己實現。Expression是表達式體係,後麵章節都會展開介紹。

Tree的transformation實現:
傳入PartialFunction[TreeType,TreeType],如果與操作符匹配,則節點會被結果替換掉,否則節點不會變動。整個過程是對children遞歸執行的。
執行計劃表示模型
邏輯執行計劃
QueryPlan繼承自TreeNode,內部帶一個output: Seq[Attribute],具備transformExpressionDown、transformExpressionUp方法。
在Catalyst中,QueryPlan的主要子類體係是LogicalPlan,即邏輯執行計劃表示。其物理執行計劃表示由使用方實現(spark-sql項目中)。
LogicalPlan繼承自QueryPlan,內部帶一個reference:Set[Attribute],主要方法為resolve(name:String): Option[NamedeExpression],用於分析生成對應的NamedExpression。
LogicalPlan有許多具體子類,也分為UnaryNode, BinaryNode, LeafNode三類,具體在org.apache.spark.sql.catalyst.plans.logical路徑下。

邏輯執行計劃實現
LeafNode主要子類是Command體係:
各command的語義可以從子類名字看出,代表的是係統可以執行的non-query命令,如DDL。
UnaryNode的子類:

BinaryNode的子類:

物理執行計劃
另一方麵,物理執行計劃節點在具體係統裏實現,比如spark-sql工程裏的SparkPlan繼承體係。

物理執行計劃實現
每個子類都要實現execute()方法,大致有以下實現子類(不全)。
LeadNode的子類:

UnaryNode的子類:

BinaryNode的子類:
提到物理執行計劃,還要提一下Catalyst提供的分區表示模型。
執行計劃映射
Catalyst還提供了一個QueryPlanner[Physical <: TreeNode[PhysicalPlan]]抽象類,需要子類製定一批strategies: Seq[Strategy],其apply方法也是類似根據製定的具體策略來把邏輯執行計劃算子映射成物理執行計劃算子。由於物理執行計劃的節點是在具體係統裏實現的,所以QueryPlanner及裏麵的strategies也需要在具體係統裏實現。

在spark-sql項目中,SparkStrategies繼承了QueryPlanner[SparkPlan],內部製定了LeftSemiJoin, HashJoin,PartialAggregation, BroadcastNestedLoopJoin, CartesianProduct等幾種策略,每種策略接受的都是一個LogicalPlan,生成的是Seq[SparkPlan],每個SparkPlan理解為具體RDD的算子操作。
比如在BasicOperators這個Strategy裏,以match-case匹配的方式處理了很多基本算子(可以一對一直接映射成RDD算子),如下:
case logical.Project(projectList, child) => execution.Project(projectList, planLater(child)) :: Nil case logical.Filter(condition, child) => execution.Filter(condition, planLater(child)) :: Nil case logical.Aggregate(group, agg, child) => execution.Aggregate(partial = false, group, agg, planLater(child))(sqlContext) :: Nil case logical.Sample(fraction, withReplacement, seed, child) => execution.Sample(fraction, withReplacement, seed, planLater(child)) :: Nil
Expression體係
Expression,即表達式,指不需要執行引擎計算,而可以直接計算或處理的節點,包括Cast操作,Projection操作,四則運算,邏輯操作符運算等。
具體可以參考org.apache.spark.sql.expressionspackage下的類。
Rules體係
凡是需要處理執行計劃樹(Analyze過程,Optimize過程,SparkStrategy過程),實施規則匹配和節點處理的,都需要繼承RuleExecutor[TreeType]抽象類。
RuleExecutor內部提供了一個Seq[Batch],裏麵定義的是該RuleExecutor的處理步驟。每個Batch代表著一套規則,配備一個策略,該策略說明了迭代次數(一次還是多次)。
protected case class Batch(name: String, strategy: Strategy, rules: Rule[TreeType]*)
Rule[TreeType <: TreeNode[_]]是一個抽象類,子類需要複寫apply(plan: TreeType)方法來製定處理邏輯。
RuleExecutor的apply(plan: TreeType): TreeType方法會按照batches順序和batch內的Rules順序,對傳入的plan裏的節點迭代處理,處理邏輯為由具體Rule子類實現。
Hive相關
Hive支持方式
Spark SQL對hive的支持是單獨的spark-hive項目,對Hive的支持包括HQL查詢、hive metaStore信息、hive SerDes、hive UDFs/UDAFs/ UDTFs,類似Shark。
隻有在HiveContext下通過hive api獲得的數據集,才可以使用hql進行查詢,其hql的解析依賴的是org.apache.hadoop.hive.ql.parse.ParseDriver類的parse方法,生成Hive AST。
實際上sql和hql,並不是一起支持的。可以理解為hql是獨立支持的,能被hql查詢的數據集必須讀取自hive api。下圖中的parquet、json等其他文件支持隻發生在sql環境下(SQLContext)。

Hive on Spark
Spark SQL裏現在對Hive的支持,體現在複用了Hive的meta store數據、hql解析、UDFs、SerDes,在執行DDL和某些簡單命令的時候,調的是hive客戶端。hql翻譯前會處理一些與query主體無關的set, cache, addfile等命令,然後調用ParserDriver翻譯hql,並把AST轉換成Catalyst的LogicalPlan,後續優化、物理執行計劃翻譯及執行過程,與Sql一樣使用的是Catalyst提供的內容,執行引擎是Spark。在整個結合過程中,ASTNode映射成LogicalPlan是重點。
而Hive社區的Hive on Spark會怎樣實現,具體參考jira裏的設計文檔。
與Shark對比,
Shark多依賴了Hive的執行計劃相關模塊以及CLI。CLI和JDBC部分是Spark SQL後續打算支持的。Shark額外提供的對Table數據行轉列、序列化、壓縮存內存的模塊,也被拿到了Spark Sql的sql工程裏。
以上說明了Shark與Spark SQL Hive的區別,對Shark這個項目繼承性的理解。
而Spark SQL Hive與Hive社區 Hive on Spark的區別需要具體參考jira裏的設計文檔,我也還沒有讀過。
spark-hive工程
解析過程
HiveQl.parseSql()把hql解析成logicalPlan。解析過程,提取出一些command,包括:
² set key=value
² cache table
² uncache table
² add jar
² add file
² dfs
² source
然後由Hive的ParseDriver把hql解析成AST,得到ASTNode,
def getAst(sql: String): ASTNode = ParseUtils.findRootNonNullToken((new ParseDriver).parse(sql))
把Node轉化為Catalyst的LogicalPlan,轉化邏輯較複雜,也是Sparksql對hql支持的最關鍵部分。詳見HiveQl.nodeToPlan(node: Node):LogicalPlan方法。
大致轉換邏輯包括:
處理TOK_EXPLAIN和TOK_DESCTABLE
處理TOK_CREATETABLE,包括創建表時候一係列表的設置TOK_XXX
處理TOK_QUERY,包括TOK_SELECT,TOK_WHERE,TOK_GROUPBY,TOK_HAVING,TOK_SORTEDBY,TOK_LIMIT等等,對FROM後麵跟的語句進行nodeToRelation處理。
處理TOK_UNION
對Hive AST樹結構和表示不熟悉,所以此處略過。Analyze過程
metadata交互
Catalog類為HiveMetastoreCatalog,通過hive的conf生成client(org.apache.hadoop.hive.ql.metadata.Hive,用於與MetaStore通信,獲得metadata以及進行DDL操作),catalog的lookupRelation方法裏麵,client.getTable()得到表信息,client.getAddPartitionsForPruner()得到分區信息。
udf相關
FunctionRegistry類為HiveFunctionRegistry,根據方法名,通過hive的相關類去查詢該方法,檢查是否具有該方法,是UDF,還是UDAF(aggregation),或是UDTF(table)。這裏隻做已有udf的查詢,不做新方法的include。
與Catalyst的Expression對應繼承關係如下:

Inspector相關
HiveInspectors提供了幾個映射數據類型和ObjectInspetor子類的方法,包括PrimitiveObjectInspector,ListObjectInspector,MapObjectInspector,StructObjectInspector四種
Optimizer過程
在做優化前,會嚐試對之前生成的邏輯執行計劃進行createtabl操作,因為執行的hql可能是“CREATE TABLE XXX”,這部分處理在HiveMetastoreCatalog的CreateTables單例裏,繼承了Rule[LogicalPlan]。
以及PreInsertionCasts處理,也是HiveMetastoreCatalog裏的單例,繼承了Rule[LogicalPlan]。
之後的optimizer過程同SQLContext裏,用的是同一個Catalyst提供的Optimizer類。
Planner及執行過程
HiveContext繼承自SQLContext,其QueryExecution也繼承自SQLContext的QueryExecution。後續執行計劃優化、物理執行計劃翻譯、處理及執行過程同SQL的處理邏輯是一致的。
翻譯物理執行計劃的時候,hive planner裏製定了些特定的策略,與SparkPlanner稍有不同。

多了Scripts,DataSinks,HiveTableScans和HiveCommandStrategy四種處理物理執行計劃的策略(見HiveStrategies)。
1. Scripts,用於處理那種hive命令行執行腳本的情況。實現方式是使用ProcessBuilder新起一個JVM進程的方式,用”/bin/bash –c scripts”的方式執行腳本並獲取輸出流數據,轉化為Catalyst Row數據格式。
2. DataSinks,用於把數據寫入到Hive表的情況。裏麵涉及到一些hive讀寫的數據格式轉化、序列化、讀配置等工作,最後通過SparkContext的runJob接口,提交作業。
3. HiveTableScans,用於對hive table進行掃描,支持使用謂詞的分區裁剪(Partition pruning predicates are detected and applied)。
4. HiveCommandStrategy,用於執行native command和describe command。我理解是這種命令是直接調hive客戶端單機執行的,因為可能隻與meta data打交道。
toRDD: RDD[Row]處理也有少許區別,返回RDD[Row]的時候,對每個元素做了一次拷貝。
SQL Core
Spark SQL的核心是把已有的RDD,帶上Schema信息,然後注冊成類似sql裏的”Table”,對其進行sql查詢。這裏麵主要分兩部分,一是生成SchemaRD,二是執行查詢。
生成SchemaRDD
如果是spark-hive項目,那麼讀取metadata信息作為Schema、讀取hdfs上數據的過程交給Hive完成,然後根據這倆部分生成SchemaRDD,在HiveContext下進行hql()查詢。
對於Spark SQL來說,
數據方麵,RDD可以來自任何已有的RDD,也可以來自支持的第三方格式,如json file、parquet file。
SQLContext下會把帶case class的RDD隱式轉化為SchemaRDD
implicit def createSchemaRDD[A <: Product: TypeTag](rdd: RDD[A]) = new SchemaRDD(this, SparkLogicalPlan(ExistingRdd.fromProductRdd(rdd)))
ExsitingRdd單例裏會反射出case class的attributes,並把RDD的數據轉化成Catalyst的GenericRow,最後返回RDD[Row],即一個SchemaRDD。這裏的具體轉化邏輯可以參考ExsitingRdd的productToRowRdd和convertToCatalyst方法。
之後可以進行SchemaRDD提供的注冊table操作、針對Schema複寫的部分RDD轉化操作、DSL操作、saveAs操作等等。
Row和GenericRow是Catalyst裏的行表示模型
Row用Seq[Any]來表示values,GenericRow是Row的子類,用數組表示values。Row支持數據類型包括Int, Long, Double, Float, Boolean, Short, Byte, String。支持按序數(ordinal)讀取某一個列的值。讀取前需要做isNullAt(i: Int)的判斷。
各自都有Mutable類,提供setXXX(i: int, value: Any)修改某序數上的值。
層次結構

下圖大致對比了Pig,Spark SQL,Shark在實現層次上的區別,僅做參考。


查詢流程
SQLContext裏對sql的一個解析和執行流程:
1. 第一步parseSql(sql: String),simple sql parser做詞法語法解析,生成LogicalPlan。
2. 第二步analyzer(logicalPlan),把做完詞法語法解析的執行計劃進行初步分析和映射,
目前SQLContext內的Analyzer由Catalyst提供,定義如下:
new Analyzer(catalog, EmptyFunctionRegistry, caseSensitive =true)
catalog為SimpleCatalog,catalog是用來注冊table和查詢relation的。
而這裏的FunctionRegistry不支持lookupFunction方法,所以該analyzer不支持Function注冊,即UDF。
Analyzer內定義了幾批規則:
val batches: Seq[Batch] = Seq( Batch("MultiInstanceRelations", Once, NewRelationInstances), Batch("CaseInsensitiveAttributeReferences", Once, (if (caseSensitive) Nil else LowercaseAttributeReferences :: Nil) : _*), Batch("Resolution", fixedPoint, ResolveReferences :: ResolveRelations :: NewRelationInstances :: ImplicitGenerate :: StarExpansion :: ResolveFunctions :: GlobalAggregates :: typeCoercionRules :_*), Batch("Check Analysis", Once, CheckResolution), Batch("AnalysisOperators", fixedPoint, EliminateAnalysisOperators) )
3. 從第二步得到的是初步的logicalPlan,接下來第三步是optimizer(plan)。
Optimizer裏麵也是定義了幾批規則,會按序對執行計劃進行優化操作。
val batches = Batch("Combine Limits", FixedPoint(100), CombineLimits) :: Batch("ConstantFolding", FixedPoint(100), NullPropagation, ConstantFolding, LikeSimplification, BooleanSimplification, SimplifyFilters, SimplifyCasts, SimplifyCaseConversionExpressions) :: Batch("Filter Pushdown", FixedPoint(100), CombineFilters, PushPredicateThroughProject, PushPredicateThroughJoin, ColumnPruning) :: Nil
4. 優化後的執行計劃,還要丟給SparkPlanner處理,裏麵定義了一些策略,目的是根據邏輯執行計劃樹生成最後可以執行的物理執行計劃樹,即得到SparkPlan。
val strategies: Seq[Strategy] = CommandStrategy(self) :: TakeOrdered :: PartialAggregation :: LeftSemiJoin :: HashJoin :: InMemoryScans :: ParquetOperations :: BasicOperators :: CartesianProduct :: BroadcastNestedLoopJoin :: Nil
5. 在最終真正執行物理執行計劃前,最後還要進行兩次規則,SQLContext裏定義這個過程叫prepareForExecution,這個步驟是額外增加的,直接new RuleExecutor[SparkPlan]進行的。
val batches = Batch("Add exchange", Once, AddExchange(self)) :: Batch("Prepare Expressions", Once, new BindReferences[SparkPlan]) :: Nil
6. 最後調用SparkPlan的execute()執行計算。這個execute()在每種SparkPlan的實現裏定義,一般都會遞歸調用children的execute()方法,所以會觸發整棵Tree的計算。
其他特性
內存列存儲
SQLContext下cache/uncache table的時候會調用列存儲模塊。
該模塊借鑒自Shark,目的是當把表數據cache在內存的時候做行轉列操作,以便壓縮。
實現類
InMemoryColumnarTableScan類是SparkPlan LeafNode的實現,即是一個物理執行計劃。傳入一個SparkPlan(確認了的物理執行計)和一個屬性序列,內部包含一個行轉列、觸發計算並cache的過程(且是lazy的)。
ColumnBuilder針對不同的數據類型(boolean, byte, double, float, int, long, short, string)由不同的子類把數據寫到ByteBuffer裏,即包裝Row的每個field,生成Columns。與其對應的ColumnAccessor是訪問column,將其轉回Row。
CompressibleColumnBuilder和CompressibleColumnAccessor是帶壓縮的行列轉換builder,其ByteBuffer內部存儲結構如下
* .--------------------------- Column type ID (4 bytes) * | .----------------------- Null count N (4 bytes) * | | .------------------- Null positions (4 x N bytes, empty if null count is zero) * | | | .------------- Compression scheme ID (4 bytes) * | | | | .--------- Compressed non-null elements * V V V V V * +---+---+-----+---+---------+ * | | | ... | | ... ... | * +---+---+-----+---+---------+ * \-----------/ \-----------/ * header body
CompressionScheme子類是不同的壓縮實現
都是scala實現的,未借助第三方庫。不同的實現,指定了支持的column data類型。在build()的時候,會比較每種壓縮,選擇壓縮率最小的(若仍大於0.8就不壓縮了)。
這裏的估算邏輯,來自子類實現的gatherCompressibilityStats方法。
Cache邏輯
cache之前,需要先把本次cache的table的物理執行計劃生成出來。
在cache這個過程裏,InMemoryColumnarTableScan並沒有觸發執行,但是生成了以InMemoryColumnarTableScan為物理執行計劃的SparkLogicalPlan,並存成table的plan。
其實在cache的時候,首先去catalog裏尋找這個table的信息和table的執行計劃,然後會進行執行(執行到物理執行計劃生成),然後把這個table再放回catalog裏維護起來,這個時候的執行計劃已經是最終要執行的物理執行計劃了。但是此時Columner模塊相關的轉換等操作都是沒有觸發的。
真正的觸發還是在execute()的時候,同其他SparkPlan的execute()方法觸發場景是一樣的。
Uncache邏輯
UncacheTable的時候,除了刪除catalog裏的table信息之外,還調用了InMemoryColumnarTableScan的cacheColumnBuffers方法,得到RDD集合,並進行了unpersist()操作。cacheColumnBuffers主要做了把RDD每個partition裏的ROW的每個Field存到了ColumnBuilder內。
UDF(暫不支持)
如前麵對SQLContext裏Analyzer的分析,其FunctionRegistry沒有實現lookupFunction。
在spark-hive項目裏,HiveContext裏是實現了FunctionRegistry這個trait的,其實現為HiveFunctionRegistry,實現邏輯見org.apache.spark.sql.hive.hiveUdfs
Parquet支持
待整理
Specific Docs and Codes:
https://github.com/apache/incubator-parquet-format
https://github.com/apache/incubator-parquet-mr
https://www.slideshare.net/julienledem/parquet-hadoop-summit-2013
JSON支持
SQLContext下,增加了jsonFile的讀取方法,而且目前看,代碼裏實現的是hadoop textfile的讀取,也就是這份json文件應該是在HDFS上的。具體這份json文件的載入,InputFormat是TextInputFormat,key class是LongWritable,value class是Text,最後得到的是value部分的那段String內容,即RDD[String]。
除了jsonFile,還支持jsonRDD,例子:
https://spark.apache.org/docs/latest/sql-programming-guide.html
讀取json文件之後,轉換成SchemaRDD。JsonRDD.inferSchema(RDD[String])裏有詳細的解析json和映射出schema的過程,最後得到該json的LogicalPlan。
Json的解析使用的是FasterXML/jackson-databind庫,GitHub地址,wiki
把數據映射成Map[String, Any]
Json的支持豐富了Spark SQL數據接入場景。
JDBC支持
Jdbc support branchis under going
SQL92
Spark SQL目前的SQL語法支持情況見SqlParser類。目標是支持SQL92??
1. 基本應用上,sql server 和oracle都遵循sql 92語法標準。
2. 實際應用中大家都會超出以上標準,使用各家數據庫廠商都提供的豐富的自定義標準函數庫和語法。
3. 微軟sql server的sql 擴展叫T-SQL(Transcate SQL).
4. Oracle 的sql 擴展叫PL-SQL.
存在問題
https://apache-spark-developers-list.1001551.n3.nabble.com/sparkSQL-thread-safe-td7263.html
https://apache-spark-user-list.1001560.n3.nabble.com/Supported-SQL-syntax-in-Spark-SQL-td9538.html
總結
最後更新:2017-04-03 05:39:17