Spark SQL組件源碼分析
功能
Spark新發布的Spark SQL組件讓Spark對SQL有了別樣於Shark基於Hive的支持。參考官方手冊,具體分三部分:
其一,能在Scala代碼裏寫SQL,支持簡單的SQL語法檢查,能把RDD指定為Table存儲起來。此外支持部分SQL語法的DSL。
其二,支持Parquet文件的讀寫,且保留Schema。
其三,能在Scala代碼裏訪問Hive元數據,能執行Hive語句,並且把結果取回作為RDD使用。
第一點對SQL的支持主要依賴了Catalyst這個新的查詢優化框架(下麵會給出一些Catalyst的簡介),在把sql解析成邏輯執行計劃之後,利用Catalyst包裏的一些類和接口,執行了一些簡單的執行計劃優化,最後變成RDD的計算。雖然目前的sql parser比較簡單,執行計劃的優化比較通配,還有些參考價值,所以看了下這塊代碼。目前這個PR在昨天已經merge進了主幹,可以在sql模塊裏看到這部分實現,還有catalyst模塊看到Catalyst的代碼。下麵會具體介紹Spark SQL模塊的實現。
第二點對Parquet的支持不關注,因為我們的應用場景裏不會使用Parquet這樣的列存儲,適用場景不一樣。
第三點對Hive的這種結合方式,沒有什麼核心的進展。與Shark相比,Shark依賴Hive的Metastore,解析器等能把hql執行變成Spark上的計算,而Hive的現在這種結合方式與代碼裏引入Hive包執行hql沒什麼本質區別,隻是把hive hql的數據與RDD的打通這種交互做得更友好了。
Catalyst介紹
參考spark summit裏關於Catalyst的資料,Catalyst: A Query Optimization Framework for Spark and Shark
Query optimization can greatly improve both the productivity of developers and the performance of the queries that they write. A good query optimizer is capable of automatically rewriting relational queries to
execute more efficiently, using techniques such as filtering data early, utilizing available indexes, and even ensuring different data sources are joined in the most efficient order. By performing these transformations, the optimizer not only improves
the execution times of relational queries, but also
frees the developer to focus on the semantics of their application instead of its performance. Unfortunately, building an optimizer is a incredibly complex engineering task and thus many open source systems perform only very simple optimizations. Past
research[1][2] has attempted to combat this complexity by providing frameworks that allow the creators of optimizers to write possible optimizations as a set of declarative rules. However, the use of such frameworks has required the creation and maintenance
of special “optimizer compilers” and forced the burden of learning a complex domain specific language upon those wishing to add features to the optimizer. Instead, we propose Catalyst, a query optimization framework embedded in Scala.Catalyst
takes advantage of Scala’s powerful language features such as pattern matching and runtime metaprogramming to allow developers to concisely specify complex relational optimizations.In this talk I will describe the framework and how it allows developers
toexpress complex query transformations in very few lines of code. I will also describe our initialefforts at improving the execution time of Shark queries
by greatly improving its query optimization capabilities.
總體上說,Catalyst是一個 implementation-agnostic framework for manipulating trees of relational operators and expressions.主要由三部分組成:
- a TreeNode library for transforming trees that are expressed as Scala case classes,
- a logical plan representation for relational operators,
- an expression library.
分析
根據官方給出的Spark SQL例子,分析一下內部實現。
val sc: SparkContext // An existing SparkContext. val sqlContext = new SqlContext(sc) // Importing the SQL context gives access to all the public SQL functions and implicit conversions. import sqlContext._ // Define the schema using a case class. case class Person(name: String, age: String) // Create an RDD of Person objects and register it as a table. val people: RDD[Person] = sc.textFile("people.txt").map(_.split(",")).map(p => Person(p(0), p(1).toInt)) people.registerAsTable("people") val teenagers = sql("SELECT name FROM people WHERE age >= 10 && age <= 19") // The results of SQL queries are themselves RDDs and support all the normal operations teenagers.map(t => "Name: " + t(0)).collect().foreach(println)
SQLContext是SQL模塊一個總的執行環境,
val people: RDD[Person] = sc.textFile("people.txt").map(_.split(",")).map(p => Person(p(0), p(1).toInt)) 原本是通過sc的文件讀取和兩次MappedRDD transform生成一個people RDD,由於import sqlContext._ 之後,把隱式函數createSchemaRDD引進了上下文中,
implicit def createSchemaRDD[A <: Product: TypeTag](rdd: RDD[A]) = new SchemaRDD( this, SparkLogicalPlan(ExistingRdd.fromProductRdd(rdd)))被聲明為 RDD[Person]的people是一個帶case class的RDD,所以被轉換成了SchemaRDD。
SchemaRDD是SQL模塊增加的一個RDD實現類,SchemaRDD在new的時候需要兩部分
class SchemaRDD( @transient val sqlContext: SQLContext, @transient val logicalPlan: LogicalPlan) extends RDD[Row](sqlContext.sparkContext, Nil) {SQLContext和邏輯執行計劃。在createSchemaRDD方法中,ExsitingRdd.fromProductRdd(rdd)對people這個rdd做了這一件事情:
def fromProductRdd[A <: Product : TypeTag](productRdd: RDD[A]) = { ExistingRdd(ScalaReflection.attributesFor[A], productToRowRdd(productRdd)) }
首先根據A,即Person這個case class,通過Scala反射出了類的屬性,對於table來說就是取到了各個column。其次,productToRowRdd把rdd轉化成了一個RDD[Row],
def productToRowRdd[A <: Product](data: RDD[A]): RDD[Row] = { // TODO: Reuse the row, don't use map on the product iterator. Maybe code gen? data.map(r => new GenericRow(r.productIterator.map(convertToCatalyst).toArray): Row) }這兩步之後,其實就是基於people這個RDD,得到一個case class:
case class ExistingRdd(output: Seq[Attribute], rdd: RDD[Row])output對應的就是column的序列。
SparkLogicalPlan是一個簡單的類,可以看到內部實現
/** * Allows already planned SparkQueries to be linked into logical query plans. * * Note that in general it is not valid to use this class to link multiple copies of the same * physical operator into the same query plan as this violates the uniqueness of expression ids. * Special handling exists for ExistingRdd as these are already leaf operators and thus we can just * replace the output attributes with new copies of themselves without breaking any attribute * linking. */ case class SparkLogicalPlan(alreadyPlanned: SparkPlan) extends logical.LogicalPlan with MultiInstanceRelation { def output = alreadyPlanned.output def references = Set.empty def children = Nil override final def newInstance: this.type = { SparkLogicalPlan( alreadyPlanned match { case ExistingRdd(output, rdd) => ExistingRdd(output.map(_.newInstance), rdd) case _ => sys.error("Multiple instance of the same relation detected.") }).asInstanceOf[this.type] } }
到這裏為止,其實都是val people: RDD[Person] = sc.textFile("people.txt").map(_.split(",")).map(p => Person(p(0), p(1).toInt)) 這一步發生的事情,結果就是得到了一個SchemaRDD。
接下來,people.registerAsTable("people") 是SchemaRDD的一個方法,可以看到真正注冊table表依賴的還是SQLContext
def registerAsTable(tableName: String): Unit = { sqlContext.registerRDDAsTable(this, tableName) }
/** * Registers the given RDD as a temporary table in the catalog. Temporary tables exist only * during the lifetime of this instance of SQLContext. * * @group userf */ def registerRDDAsTable(rdd: SchemaRDD, tableName: String): Unit = { catalog.registerTable(None, tableName, rdd.logicalPlan) }SQLContext在注冊表的時候,依賴的是Catalog類的一個實現 SimpleCatalog類:
/** * An interface for looking up relations by name. Used by an [[Analyzer]]. */ trait Catalog { def lookupRelation( databaseName: Option[String], tableName: String, alias: Option[String] = None): LogicalPlan def registerTable(databaseName: Option[String], tableName: String, plan: LogicalPlan): Unit } class SimpleCatalog extends Catalog { val tables = new mutable.HashMap[String, LogicalPlan]() def registerTable(databaseName: Option[String],tableName: String, plan: LogicalPlan): Unit = { tables += ((tableName, plan)) } def dropTable(tableName: String) = tables -= tableName def lookupRelation( databaseName: Option[String], tableName: String, alias: Option[String] = None): LogicalPlan = { val table = tables.get(tableName).getOrElse(sys.error(s"Table Not Found: $tableName")) // If an alias was specified by the lookup, wrap the plan in a subquery so that attributes are // properly qualified with this alias. alias.map(a => Subquery(a.toLowerCase, table)).getOrElse(table) } }上麵這部分是Catalyst包裏的代碼,Catalog是一個維護table信息的類,能把注冊新表存儲起來,對舊表能進行查詢和刪除。SimpleCatalog的實現則是把tableName和logicalPlan存在了一個hashmap裏。
好了,在轉換好RDD並存儲成table之後,接下來是執行sql的時候了:val teenagers = sql("SELECT name FROM people WHERE age >= 10 && age <= 19")
sql方法同樣是SQLContext的方法:
/** * Executes a SQL query using Spark, returning the result as a SchemaRDD. * * @group userf */ def sql(sqlText: String): SchemaRDD = { val result = new SchemaRDD(this, parseSql(sqlText)) // We force query optimization to happen right away instead of letting it happen lazily like // when using the query DSL. This is so DDL commands behave as expected. This is only // generates the RDD lineage for DML queries, but do not perform any execution. result.queryExecution.toRdd result }先略過生成result的過程,看到最後在返回result之前,做了一次result.queryExecution.toRdd的操作。在執行toRdd之前,前麵rdd的轉換、邏輯執行計劃的生成、分析、優化工作都還沒有實際進行數據的計算,直到toRdd了之後,這一係列的plan才真正執行,目前sql()的實現裏麵把計算完成了。
parseSql(sqlText)這一步是使用一個簡單的sql parser解析了一下sql,這個scala parser是Catalyst包裏提供的一個SqlParser,源碼注釋是這麼說的:
/** * A very simple SQL parser. Based loosly on: * https://github.com/stephentu/scala-sql-parser/blob/master/src/main/scala/parser.scala * * Limitations: * - Only supports a very limited subset of SQL. * - Keywords must be capital. * * This is currently included mostly for illustrative purposes. Users wanting more complete support * for a SQL like language should checkout the HiveQL support in the sql/hive subproject. */ class SqlParser extends StandardTokenParsers貌似參考了人家實現的一個scala sql parser,支持很少量的sql語法,關鍵字還需要大寫。看實現裏目前可能支持下麵一些關鍵字:
protected val ALL = Keyword("ALL") protected val AND = Keyword("AND") protected val AS = Keyword("AS") protected val ASC = Keyword("ASC") protected val AVG = Keyword("AVG") protected val BY = Keyword("BY") protected val CAST = Keyword("CAST") protected val COUNT = Keyword("COUNT") protected val DESC = Keyword("DESC") protected val DISTINCT = Keyword("DISTINCT") protected val FALSE = Keyword("FALSE") protected val FIRST = Keyword("FIRST") protected val FROM = Keyword("FROM") protected val FULL = Keyword("FULL") protected val GROUP = Keyword("GROUP") protected val HAVING = Keyword("HAVING") protected val IF = Keyword("IF") protected val IN = Keyword("IN") protected val INNER = Keyword("INNER") protected val IS = Keyword("IS") protected val JOIN = Keyword("JOIN") protected val LEFT = Keyword("LEFT") protected val LIMIT = Keyword("LIMIT") protected val NOT = Keyword("NOT") protected val NULL = Keyword("NULL") protected val ON = Keyword("ON") protected val OR = Keyword("OR") protected val ORDER = Keyword("ORDER") protected val OUTER = Keyword("OUTER") protected val RIGHT = Keyword("RIGHT") protected val SELECT = Keyword("SELECT") protected val STRING = Keyword("STRING") protected val SUM = Keyword("SUM") protected val TRUE = Keyword("TRUE") protected val UNION = Keyword("UNION") protected val WHERE = Keyword("WHERE")
好了,現在到了最高潮的部分了,sql()方法裏result.queryExecution這一句是最重要的,queryExecution真正調用了SQLContext對邏輯執行計劃的處理,
/** * A lazily computed query execution workflow. All other RDD operations are passed * through to the RDD that is produced by this workflow. * * We want this to be lazy because invoking the whole query optimization pipeline can be * expensive. */ @transient protected[spark] lazy val queryExecution = sqlContext.executePlan(logicalPlan)處理方式是生成一個QueryExecution,
protected[sql] def executePlan(plan: LogicalPlan): this.QueryExecution = new this.QueryExecution { val logical = plan }而QueryExecution裏聲明了幾個lazy變量,直到toRdd被調用的時候,所有的分析、優化等處理都會觸發生效,讓我們看一下QueryExecution部分的代碼
/** * The primary workflow for executing relational queries using Spark. Designed to allow easy * access to the intermediate phases of query execution for developers. */ protected abstract class QueryExecution { def logical: LogicalPlan lazy val analyzed = analyzer(logical) lazy val optimizedPlan = optimizer(analyzed) // TODO: Don't just pick the first one... lazy val sparkPlan = planner(optimizedPlan).next() lazy val executedPlan: SparkPlan = prepareForExecution(sparkPlan) /** Internal version of the RDD. Avoids copies and has no schema */ lazy val toRdd: RDD[Row] = executedPlan.execute()
主要分為四步,analyzer -> optimizer -> planner -> prepareForExecution
在講述這四步之前,簡單提一下LogicalPlan,因為在轉化成物理執行前,本文很多類和接口的處理對象都是邏輯執行計劃,即LogicalPlan類。LogicalPlan本身是一個抽象類,他的實現有大致以下這些,他是Catalyst代碼裏的一個重要類,總體上是一棵語法樹結構:
/** * A logical plan node with no children. */ abstract class LeafNode extends LogicalPlan with trees.LeafNode[LogicalPlan ] { self: Product => // Leaf nodes by definition cannot reference any input attributes. def references = Set.empty } /** * A logical node that represents a non-query command to be executed by the system. For example, * commands can be used by parsers to represent DDL operations. */ abstract class Command extends LeafNode { self: Product => def output = Seq.empty } /** * Returned for commands supported by a given parser, but not catalyst. In general these are DDL * commands that are passed directly to another system. */ case class NativeCommand(cmd: String) extends Command /** * Returned by a parser when the users only wants to see what query plan would be executed, without * actually performing the execution. */ case class ExplainCommand(plan: LogicalPlan ) extends Command /** * A logical plan node with single child. */ abstract class UnaryNode extends LogicalPlan with trees.UnaryNode[LogicalPlan ] { self: Product => } /** * A logical plan node with a left and right child. */ abstract class BinaryNode extends LogicalPlan with trees.BinaryNode[LogicalPlan ] { self: Product => }
關於Catalyst QueryPlan的繼承結構如下圖,其中SparkPlan是SQL包裏的一個實現,前麵在new SparkLogicalPlan的時候傳入的就是一個SparkPlan。
第一步:analyzer
SQLContext有一個分析器,來自Catalyst包,裏麵的catalog就是存儲了table信息的那貨,
@transient protected[sql] lazy val analyzer: Analyzer = new Analyzer(catalog, EmptyFunctionRegistry, caseSensitive = true)從分析器的實現看,分析器是為了把不確定的屬性和關係,通過catalog新和一些適配器方法通通確定選來,
/** * Provides a logical query plan analyzer, which translates [[UnresolvedAttribute]]s and * [[UnresolvedRelation]]s into fully typed objects using information in a schema [[Catalog]] and * a [[FunctionRegistry]]. */ class Analyzer(catalog: Catalog, registry: FunctionRegistry, caseSensitive: Boolean) extends RuleExecutor[LogicalPlan] with HiveTypeCoercion { // TODO: pass this in as a parameter. val fixedPoint = FixedPoint(100) val batches: Seq[Batch] = Seq( Batch("MultiInstanceRelations", Once, NewRelationInstances), Batch("CaseInsensitiveAttributeReferences", Once, (if (caseSensitive) Nil else LowercaseAttributeReferences :: Nil) : _*), Batch("Resolution", fixedPoint, ResolveReferences :: ResolveRelations :: NewRelationInstances :: ImplicitGenerate :: StarExpansion :: ResolveFunctions :: GlobalAggregates :: typeCoercionRules :_*) )分析器的batches注冊了很多策略,或者說是處理適配器,在內部為每個都做了實現,比如:
/** * Replaces [[UnresolvedRelation]]s with concrete relations from the catalog. */ object ResolveRelations extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transform { case UnresolvedRelation(databaseName, name, alias) => catalog.lookupRelation(databaseName, name, alias) } }由於Analyzer是繼承了RuleExecutor,它的每種處理方法則是繼承了Rule。在RuleExecutor裏,apply()方法會遞歸邏輯執行計劃,執行batches裏的處理適配器,
/** * Executes the batches of rules defined by the subclass. The batches are executed serially * using the defined execution strategy. Within each batch, rules are also executed serially. */ def apply(plan: TreeType): TreeType = { var curPlan = plan batches.foreach { batch => var iteration = 1 var lastPlan = curPlan curPlan = batch.rules.foldLeft(curPlan) { case (curPlan, rule) => rule(curPlan) } // Run until fix point (or the max number of iterations as specified in the strategy. while (iteration < batch.strategy.maxIterations && !curPlan.fastEquals(lastPlan)) { lastPlan = curPlan curPlan = batch.rules.foldLeft(curPlan) { case (curPlan, rule) => val result = rule(curPlan) if (!result.fastEquals(curPlan)) { logger.debug( s""" |=== Applying Rule ${rule.ruleName} === |${sideBySide(curPlan.treeString, result.treeString).mkString("\n")} """.stripMargin) } result } iteration += 1 } } curPlan }
第二步:optimizer
優化器的實現和處理方式同分析器很相似,隻是出於不同的處理階段,他們職責不同。優化器也繼承了RuleExecutor,並實現了一批規則,這批規則會同分析器一樣對輸入的plan進行遞歸處理:
object Optimizer extends RuleExecutor[LogicalPlan] { val batches = Batch("Subqueries", Once, EliminateSubqueries) :: Batch("ConstantFolding", Once, ConstantFolding, BooleanSimplification, SimplifyCasts) :: Batch("Filter Pushdown", Once, EliminateSubqueries, CombineFilters, PushPredicateThroughProject, PushPredicateThroughInnerJoin) :: Nil }
第三步:planner
SQLContext內部的planner是一個自己實現的SparkPlanner,
protected[sql] class SparkPlanner extends SparkStrategies { val sparkContext = self.sparkContext val strategies: Seq[Strategy] = TopK :: PartialAggregation :: SparkEquiInnerJoin :: BasicOperators :: CartesianProduct :: BroadcastNestedLoopJoin :: Nil }
裏麵包含了一個策略序列,而SparkStrategies是Catalyst包裏的QueryPlanner的一個子類實現,QueryPlanner的apply()方法實現如下,
def apply(plan: LogicalPlan): Iterator[PhysicalPlan] = { // Obviously a lot to do here still... val iter = strategies.view.flatMap(_(plan)).toIterator assert(iter.hasNext, s"No plan for $plan") iter }對於SparkPlanner來說,就是把TopK, BasicOperator這些策略對物理執行計劃進行一次遍曆貫徹。
第四步:prepareForExecution
prepareForExecution依然是Catalyst RuleExecutor的一個實現,
/** * Prepares a planned SparkPlan for execution by binding references to specific ordinals, and * inserting shuffle operations as needed. */ @transient protected[sql] val prepareForExecution = new RuleExecutor[SparkPlan] { val batches = Batch("Add exchange", Once, AddExchange) :: Batch("Prepare Expressions", Once, new BindReferences[SparkPlan]) :: Nil裏麵注冊了兩個batch處理。
上述這些步驟其實都是對Catalyst RuleExecutor的實現,隻是每一步分工不同,實現的事情也不一樣,隻有在最後 調用
/** Internal version of the RDD. Avoids copies and has no schema */ lazy val toRdd: RDD[Row] = executedPlan.execute()這一步的時候,所有之前的lazy 計劃被觸發執行。我們再回顧一下QueryExecution代碼裏的幾個過程:
/** * The primary workflow for executing relational queries using Spark. Designed to allow easy * access to the intermediate phases of query execution for developers. */ protected abstract class QueryExecution { def logical: LogicalPlan lazy val analyzed = analyzer(logical) lazy val optimizedPlan = optimizer(analyzed) // TODO: Don't just pick the first one... lazy val sparkPlan = planner(optimizedPlan).next() lazy val executedPlan: SparkPlan = prepareForExecution(sparkPlan) /** Internal version of the RDD. Avoids copies and has no schema */ lazy val toRdd: RDD[Row] = executedPlan.execute()
最後,sql()執行完之後返回的是一個RDD,所以之後又可以使用RDD的transform和acrtion做別的處理。
這邊對analyzer, optimizer的分析略顯簡陋,主要是一些規則的定製,我也還沒有完全仔細地去看,希望可以梳理清楚就好。
總結
總結一下Spark SQL的過程:
通過SQLContext這個新的上下文類,我們可以把RDD注冊成table,這個RDD是SchemaRDD,擁有一個case class,相當於是sql表的schema信息,schema的column信息會從case class裏反射出來。把RDD注冊成table之後,他的信息會持有在Catalog裏,且生命周期是本次上下文內存裏。之後在進行sql()編寫的時候,就可以利用到這個SchemaRDD,執行之前會經曆幾個步驟,分別是通過簡單的sql parser把sql解析成邏輯執行計劃,從邏輯執行計劃到物理執行計劃之間,有分析器、優化器和Planner做進一步的處理,這些處理本質上都是Catalyst RuleExecutor的實現,每一步驟都定製和注冊了自己的規則序列,遞歸作用於邏輯執行計劃之內。最後前麵這些處理還都是lazy 的,隻有觸發toRdd的時候才真正執行,返回RDD,此RDD既是Spark上通用的RDD形態,可以被繼續處理,從而打通了從RDD到table,經過sql處理後再回到RDD的過程。整個過程的執行和優化完全依靠的是Catalyst這個新的查詢優化框架。
本文主要關注的是Spark SQL組件的sql這一塊,對於其他hive和Parquet的支持不做分析。後續會再閱讀Catalyst包的代碼,看看這一塊內容有沒有一些可以參考的點。感覺Spark core包的這些新的工具組件,一個接一個地出現,我們使用的時候還是具備一些選擇性,本身也不清楚哪個會是Databricks主要發展的方向,可能他們自己也是各個方麵都突一突,像Catalyst這個東西貌似還會滲透到Shark裏去為Shark做改良,而Spark 1.0裏對於Spark SQL這塊應該會是一個看點,但估計api什麼的還會有很大變化。反正咱自己也看看學學,能參考的就參考參考,能學習的就學習學習。
全文完 :)
最後更新:2017-04-03 12:55:46
上一篇:
歡迎使用Performance analyzer 開源小工具!
下一篇:
如何在鍵盤出現時滾動表格,以適應輸入框的顯示
初學者怎麼選擇神經網絡環境?對比MATLAB、Torch和TensorFlow
高質量的JavaScript
情色網站的另一麵:請尊重你不懂的領域
遊戲安全資訊精選 2017年 第六期:Akamai報告稱遊戲是流量型攻擊的主要受害者,英國二手遊戲經銷商CeX漏洞遭利用,MongoDB等數據服務被劫持勒索風險預警,網絡安全上榜五大稀缺職業
來雲上瞅瞅
Delphi 程序錯誤寫法造成Oracle數據庫負載異常
System.Runtime.InteropServices.COMException (0x8004E00F): COM+ 無法與 Microsoft 分布式事務協調程序交談 (異常來自 HRESU
保函
Django 博客開發教程 13 - 已知小問題修正
C# 常用函數集錦