閱讀400 返回首頁    go 技術社區[雲棲]


顛覆大數據分析之Spark彈性分布式數據集

Spark中迭代式機器學習算法的數據流可以通過圖2.3來進行理解。將它和圖2.1中Hadoop MR的迭代式機器學習的數據流比較一下。你會發現在Hadoop MR中每次迭代都會涉及HDFS的讀寫,而在Spark中則要簡單得多。它僅需從HDFS到Spark中的分布式共享對象空間的一次讀入——從HDFS文件中創建RDD。RDD可以重用,在機器學習的各個迭代中它都會駐留在內存裏,這樣能顯著地提升性能。當檢查結束條件發現迭代結束的時候,會將RDD持久化,把數據寫回到HDFS中。後續章節會對Spark的內部結構進行詳細介紹——包括它的設計,RDD,以及世係等等。

圖2.3  Spark中進行迭代式計算的數據共享

Spark的彈性分布式數據集

RDD這個概念跟我們討論到的Spark的動機有關——就是能讓用戶操作分布式係統上的Scala集合。Spark中的這個重要的集合就是RDD。RDD可以通過在其它RDD或者穩態存儲中的數據(比如說,HDFS中的文件)上執行確定性操作來進行創建。創建RDD的另一種方式就是將Scala集合並行化。RDD的創建也就是Spark中的轉換操作。RDD上除了轉換操作,還有其它的一些操作,比如說動作(action)。像map, filter以及join這些都是常見的轉換操作。RDD有意思的一點在於它可以將自己的世係或者說創建它所需的轉換序列,以及它上麵的動作給存儲起來。這意味著Spark程序隻能擁有一個RDD引用——它知道自己的世係,包括它是如何創建的,上麵執行過哪些操作。世係為RDD提供了容錯性——即使它丟失了,隻要世係本身被持久化或者複製了,就仍能重建整個RDD。RDD的持久化以及分塊可以由程序員來指定。比如說,你可以基於記錄的主鍵來進行分塊。

在RDD上可以執行許多操作。包括count,collect以及save,它們分別可以用來統計元素總數,返回記錄,以及保存到磁盤或者HDFS中。世係圖中存儲了RDD的轉換以及動作。表2.1中列舉了一係列的轉換及動作。

表2.1

轉換 描述
Map(function f1) 把RDD中的每個元素並行地傳遞給f1,並返回結果的RDD
Filter(function f2) 選取出那些傳遞給函數f2並返回true的RDD元素
flatMap(function f3) 和map類似,但f3返回的是一個序列,它能將單個輸入映射成多個輸出。
Union(RDD r1) 返回RDD r1和自身的並集
Sample(flag, p, seed) 返回RDD的百分之p的隨機采樣(使用種子seed)
動作 描述
groupByKey(noTasks) 隻能在鍵值對數據上進行調用——返回的數據按值進行分組。並行任務的數量通過一個參數來指定(默認是8)
reduceByKey(function f4,noTasks) 對相同key元素上應用函數f4的結果進行聚合。第二個參數是並行的任務數
Join(RDD r2, noTasks) 將RDD r2和對象自身進行連接——計算出指定key的所有可能的組合
groupWith(RDD r3, noTasks) 將RDD r3與對象自身進行連接,並按key進行分組
sortByKey(flag) 根據標記值將RDD自身按升序或降序來進行排序
動作 描述
Reduce(function f5) 使用函數f5來對RDD的所有元素進行聚合
Collect() 將RDD的所有元素作為一個數組來返回
Count() 計算RDD的元素總數
take(n) 獲取RDD的第n個元素
First() 等價於take(1)
saveAsTextFile(path) 將RDD持久化成HDFS或者其它Hadoop支持的文件係統中路徑為path的一個文件
saveAsSequenceFile(path) 將RDD持久化為Hadoop的一個序列文件。隻能在實現了Hadoop寫接口或類似接口的鍵值對類型的RDD上進行調用。
動作 描述
foreach(function f6) 並行地在RDD的元素上運行函數f6

下麵將通過一個例子來介紹下如何在Spark環境中進行RDD的編程。這裏是一個唿叫數據記錄(CDR)——基於影響力分析的應用程序——通過CDR來構建用戶的關係圖,並識別出影響力最大的K個用戶。CDR結構包括id,調用方,接收方,計劃類型,唿叫類型,持續時長,時間,日期。具體做法是從HDFS中獲取CDR文件,接著創建出RDD對象並過濾記錄,然後再在上麵執行一些操作,比如說通過查詢提取出特定的字段,或者執行諸如count的聚合操作。最終寫出的Spark代碼如下:

val spark = new SparkContext();

Call_record_lines = spark.textFile(“HDFS://….”);

Plan_a_users = call_record_lines.filter(_.

CONTAINS(“plana”)); // RDD上的過濾操作.

Plan_a_users.cache(); // 告訴Spark運行時,如果仍有空間,就將這個RDD緩存到內存裏Plan_a_users.count();

%% 唿叫數據集處理中.

 

RDD可以表示成一張圖,這樣跟蹤RDD在不同轉換/動作間的世係變化會簡單一些。RDD接口由五部分信息組成,詳見表2.2。

表2.2  RDD接口

信息 HadoopRDD FilteredRDD JoinedRDD
分區類型 每個HDFS塊一個分區 和父RDD一致 每個reduce任務一個
依賴類型 無依賴 和父RDD是一對一的依賴 在每一個父RDD上進行shuffle
基於父RDD來計算數據集的函數 讀取對應塊的數據 計算父RDD並進行過濾 讀取洗牌後的數據並進行連接
位置元數據(preferredLocations) 從命名節點中讀取HDFS塊的位置信息 無(從父RDD中獲取)
分區元數據(partitioningScheme) HashPartitioner

Spark的實現

Spark是由大概20000行Scala代碼寫就的,核心部分大概是14000行。Spark可以運行在Mesos, Nimbus或者YARN等集群管理器之上。它使用的是未經修改的Scala解釋器。當觸發RDD上的一個動作時,一個被稱為有向無環圖(DAG)調度器的Spark組件就會去檢查RDD的世係圖,同時會創建各階段的DAG。每個階段內都隻會出現窄依賴,寬依賴所需的洗牌操作就是階段的邊界。調度器在DAG的不同階段啟動任務來計算出缺失的分區,以便重構整個RDD對象。它將各階段的任務對象提交給任務調度器(Task Scheduler, TS)。任務對象是一個獨立的實體,它由代碼和轉換以及所需的元數據組成。調度器還負責重新提交那些輸出丟失了的階段。任務調度器使用一個被稱為延遲調度(Zaharia等 2010)的調度算法來將任務分配給各個節點。如果RDD中有指定了優先區域的話,任務會被傳送給這些節點,否則會被分配到那些有分區在請求內存任務的節點上。對於寬依賴而言,中間記錄會在那些包含父分區的節點上生成。這樣會使得錯誤恢複變得簡單,Hadoop MR中map輸出的物化也是類似的。

Spark中的Worker組件會負責接收任務對象並在一個線程池中調用它們的run方法。它將異常或者錯誤報告給TaskSetManager(TSM)。TSM是任務調度器管理的一個實體——每個任務集都會對應一個TSM,用於跟蹤任務的執行過程。TS是按先進先出的順序來輪詢TSM集的。通過插入不同的策略或者算法,這裏仍有一定的優化空間。執行器會與其它的組件進行交互,比如說塊管理器(BM),通信管理器(CM),Map輸出跟蹤器(MOT)。塊管理器是節點用於緩存RDD並接收洗牌數據的組件。它也可以看作是每個worker中隻寫一次的K-V存儲。塊管理器和通信管理器進行通信以便獲取到遠端的塊數據。通信管理器是一個異步網絡庫。MOT這個組件會負責跟蹤每個map任務都在哪運行並把這些信息返回給歸約器——Worker會緩存這個信息。當映射器的輸出丟失了的話,會使用一個“分代ID”來將這個緩存置為無效。Spark中各組件的交互如圖2.4中所示。

圖2.4  Spark集群中的組件

RDD的存儲可以通過下麵這三種方式來完成:

  1. 作為Java虛擬機中反序列化的Java對象:由於對象就在JVM內存中,這樣做的性能會更佳。
  2. 作為內存中序列化的Java對象:這麼表示內存的使用率會更高,但卻犧牲了訪問速度。
  3. 存儲在磁盤上:這樣做性能最差,但是如果RDD太大以至於無法存放到內存中的話就隻能這麼做了。

一旦內存滿了,Spark的內存管理會通過最近最少使用(LRU)策略來回收RDD。然而,屬於同一個RDD的分區是無法剔除的——因為通常來說,一個程序可能會在一個大的RDD上進行計算,如果將同一個RDD中的分區剔除的話則會出現係統顛簸。

世係圖擁有足夠的信息來重建RDD的丟失分區。然而,考慮到效率的因素(重建整個RDD可能會需要很大的計算量),檢查點仍是必需的——用戶可以自主控製哪個RDD作為檢查點。使用了寬依賴的RDD可以使用檢查點,因為在這種情況下,計算丟失的分區會需要顯著的通信及計算量。而對於隻擁有窄依賴的RDD而言,檢查點則不太適合。

最後更新:2017-05-22 20:03:38

  上一篇:go  解析Disruptor的依賴關係
  下一篇:go  Java並發性和多線程介紹目錄