閱讀32 返回首頁    go 阿裏雲 go 技術社區[雲棲]


顛覆大數據分析之Spark為Shark所提供的擴展

在Spark的RDD上執行SQL查詢遵循的是傳統並行數據庫的三步流程:

  1. 查詢解析
  2. 邏輯計劃的生成
  3. 將邏輯計劃映射為物理的執行計劃

Shark使用Hive查詢編譯器來進行查詢語句的解析。它會生成一棵抽象語法樹,然後再將它轉化成一個邏輯計劃。Shark中邏輯計劃的生成方式也類似於Hive中的。但兩者的物理計劃的生成方式則不盡相同。Hive中的物理計劃是一係列的MR作業,而Shark中的則是分階段RDD轉換的一個有向無環圖。由於Shark的高工作負荷的這個性質(通常在Hive中機器學習及用戶定義函數(UDF)都很常見),因此在編譯期很難獲取到物理查詢計劃。對於新數據而言的確是這樣的(之前還未被加載到Shark中)。值得注意的是, Hive和Shark都經常用來查詢這類數據。因此,Shark引入了一個叫部分有向無環圖執行(Partial DAG Execution,PDE)的概念。

部分DAG執行

這項技術會基於運行時收集的數據來生成查詢語句的執行計劃,而不是在編譯期就去生成查詢的物理執行計劃。收集的數據包括分區大小,傾斜檢測的記錄條數,哪些記錄是頻繁出現的,以及RDD分區中數據分布的粗略的直方圖。Spark在洗牌階段之前會將map輸出存儲到內存中——之後reduce任務會通過MOT組件來使用這些數據。Shark的第一個改動是收集了指定分區以及全局環境的數據。另一個修改則是使得DAG可以在運行時根據所收集的數據來進行改變。必須注意的是,Shark是基於單個節點上的查詢優化方法來構建的,它使用了PDE的概念來結合本地優化器進行查詢的全局優化。

數據收集以及後續的DAG的修改對Shark實現分布式的連接操作至關重要。它提供了兩種類型的連接操作:洗牌連接(shuffle join)以及映射/廣播連接。廣播連接是通過將小表發送到所有的節點來實現的,在這裏它會和大表中不相交的分區進行本地合並。而在洗牌連接中,兩張表都會根據連接的主鍵來進行哈希分區。廣播連接隻有在表比較小的時候才會比較高效——從這讀者可以看到,為什麼在Shark的動態查詢優化中這些數據統計是如此重要了。Shark中數據統計用於優化查詢的另一種方式就是通過檢查分區的大小來合並較小分區以決定歸約器的數量或者說並行度。

列內存存儲

Spark中默認會將RDD存儲為JVM內存中的反序列化的Java對象。這樣的好處就是對JVM而言它們天生就是可用的,這加快了訪問的速度。這樣做的缺點就是無法在JVM內存中創建大量對象。讀者應當時刻牢記,隨著Java堆中對象數量的上升,垃圾回收器(GC)收集的時候就會越長。因此Shark(Muthukumar和Janakiram 2006)實現了列存儲,所有基礎類型的列都會創建一個對象來存儲,而對於複雜類型,它會使用字節數組。這極大地減少了內存中的對象數量也提高了GC及Shark的性能。同時和Spark原生的實現相比,它還提高了空間的使用率。[1]

分布式數據加載

Shark使用Spark執行器來加載數據,但會對它進行定製化。每一張表都會進行分區,每一個分區都會由一個單獨的Spark任務來進行加載。這個任務會獨立決定是否進行壓縮(這列是否需要壓縮,如果需要的話用何種技術進行壓縮——它是字典編碼還是遊程長度編碼[Abadi 等2006])。每個分區還會單獨保存壓縮後的結果元數據。然而必須注意的是,世係圖並不需要存儲壓縮元數據,這個會在重構RDD的時候進行計算。結果表明,和Hadoop相比,Shark加載數據到內存中的速度要更快,同時將數據加載到HDFS中的吞吐量也和Hadoop的一樣。

完全分區智能連接(Full Partition-Wise Join)

正如在傳統數據庫中所了解到的那樣,完全分區智能連接可以通過根據連接列來將兩張表分區的方式來實現。盡管Hadoop並不支持這樣的協同分區,Shark通過在數據定義中使用”distribute by”子句實現了這點。當連接兩張一致分區的表時,Shark會創建Spark的map任務以避免使用昂貴的洗牌操作,從而獲得了更高的運行效率。

分區修剪

正如在傳統數據庫中那樣,分區修剪指的是優化器在構建分區訪問列表時通過分析SQL中的where和from子句,刪除掉不必要的分區。Shark還通過存儲在分區元數據中的範圍值(range value)和非重複值(distinct value, 對應枚舉類型)增強了數據加載過程中的數據統計,以便在運行時指導分區修剪決策——這個過程又被Shark團隊稱之為映射修剪。

機器學習的支持

Shark的一個關鍵的獨特賣點(Unique Selling Points ,USP)在於它能夠支持機器學習算法。之所以能夠實現這個是因為Shark允許在返回查詢結果的同時還順便返回代表執行計劃的RDD對象。這說明用戶可以初始化這個RDD上的操作——這點非常關鍵,因為它使得Spark RDD的能力可以為Shark查詢所用。需要注意的是機器學習算法能夠在Spark RDD上實現,Kraska等人所開發的MLbase庫或本書的後續章節都會介紹到這點。

[1]   分代GC常用於現代的JVM。一類回收被稱為minor collection,它是用來將分區中存活的對象拷貝到存活區(suvivor space)以及持久代中。剩下的對象會被回收掉。另一種是stop-the-world的major collection,它會對老生代進行壓縮。

最後更新:2017-05-22 20:03:12

  上一篇:go  Memory Access Patterns Are Important
  下一篇:go  顛覆大數據分析之類似Spark的係統