《Spark大數據分析:核心概念、技術及實踐》Spark Core
Spark Core
Spark是大數據領域最活躍的開源項目,甚至比Hadoop還要熱門。如第1章所述,它被認為是Hadoop的繼任者。Spark的使用率大幅增長。很多組織正在用Spark取代Hadoop。
從概念上看,Spark類似於Hadoop,它們都用於處理大數據。它們都能用商用硬件以很低的成本處理大數據。然而,相比於Hadoop,Spark有很多的優勢,這些將在本章進行介紹。
本章主要介紹Spark Core,這也是Spark生態係統的基礎。我們首先概述Spark Core,然後介紹Spark的總體架構和應用程序運行時的情況。Spark Core的編程接口也會一並介紹。
3.1 概述
Spark是一個基於內存的用於處理、分析大數據的集群計算框架。它提供了一套簡單的編程接口,從而使得應用程序開發者方便使用集群節點的CPU、內存、存儲資源來處理大數據。
3.1.1 主要特點
Spark的主要特點如下:
使用方便
快速
通用
可擴展
容錯
使用方便
Spark提供了比MapReduce更簡單的編程模型。使用Spark開發分布式的數據處理應用程序比用MapReduce簡單多了。
Spark針對開發大數據應用程序提供了豐富的API。它提供了80多個用於處理數據的操作符。而且,Spark提供了比Hadoop MapReduce更易讀的API。相比之下,Hadoop MapReduce隻有兩個操作符,map和reduce。Hadoop要求任何問題都必須能夠分解為一係列的map作業和reduce作業。然而,有些算法卻難以隻用map和reduce來描述。相比於Hadoop MapReduce,使用Spark提供的操作符來處理複雜的數據顯得更加簡單。
而且,使用Spark可以寫出比用Hadoop MapReduce更簡潔的代碼。用Hadoop Map-Reduce需要寫大量的模塊代碼。同樣的數據處理算法,用Hadoop MapReduce實現需要50行,而用Spark隻需要10不到。有了豐富易讀的API,消除了模塊代碼,開發者的生產力大幅提升。相對於使用Hadoop,使用Spark開發者的生產力會有5~10倍的提升。
快速
Spark要比Hadoop快上若幹個數量級。如果數據都加載在內存中,它能快上數百倍,哪怕數據無法完全載入內存,Spark也能快上數十倍。
尤其是在處理大數據集的時候,速度顯得至關重要。如果一個處理數據的作業要花費數天或小時,那麼它將拖慢決策的速度,從而降低數據的價值。反之,如果同樣的處理能提速十倍乃至百倍,它將會創造更多的機會。它甚至可能開創出前所未有的新數據驅動應用程序。
Spark比Hadoop快的原因有兩方麵。一方麵,它可以使用基於內存的集群計算。另一方麵,它實現了更先進的執行引擎。
得益於基於內存的集群計算,Spark的性能有了數量級的提升。相比於從硬盤讀取數據,采用從內存讀取數據的方式,獲得的順序讀取吞吐量要大100倍。換句話說,從內存讀取數據要比從硬盤快100倍。當應用程序隻讀取和處理少量數據時,內存和硬盤之間讀取速度的差距並不太明顯。然而,一旦數據量達到太字節級別,I/O延遲(數據從硬盤載入內存所花費的時間)就會顯著影響作業執行時間。
Spark允許應用程序利用內存緩存數據。這能減少磁盤I/O。一個基於MapReduce的數據處理流水線可能包含多個作業。每個作業都需要從硬盤載入數據,處理它,而後再寫入硬盤中。而且,一個使用MapReduce實現的複雜數據處理應用程序可能需要反複從硬盤讀取數據,寫入數據。由於Spark允許利用內存緩存數據,因此使用Spark實現的同樣的應用程序隻需要從硬盤讀取數據一次即可。一旦數據緩存在內存中,接下來的每一個操作都可以直接操作緩存的數據。就像前麵說的一樣,Spark可以減少I/O延遲,這樣就能顯著減少作業總的執行時間。
需要注意的是,Spark不會自動將輸入數據緩存在內存中。一個普遍的誤解是,一旦無法把輸入數據完全載入內存,那麼Spark將無法使用。這並不正確。Spark可以在集群上處理太字節級的數據,哪怕集群的總內存隻有僅僅100GB。在數據處理流水線上何時緩存和緩存哪部分數據完全由應用程序決定。實際上,如果數據處理應用程序隻使用一次數據,那麼它完全不需要緩存數據。
Spark比Hadoop MapReduce快的第二個原因是它擁有更先進的作業執行引擎。Spark和Hadoop一樣都將一個作業轉化為由若幹個階段構成的有向無環圖(DAG)。如果你不熟悉圖論,這裏簡單介紹下。圖是一個由頂點構成的集合,這些頂點由邊相連。有向圖指的是那些邊有方向的圖。無環圖指的是不存在環路的圖。DAG指的就是不存在環路的有向圖。換句話說,在DAG中不存在一條起點和終點都是同一個頂點的通路。第11章將對圖進行更詳細的介紹。
Hadoop MapReduce對任意一個作業都會創建由map和Reduce兩個階段構成的有向無環圖。如果一個複雜的數據處理算法用MapReduce實現,可能需要劃分成多個作業,而後按順序執行。這種設計導致Hadoop MapReduce無法做任何的優化。
與之相反,Spark並沒有迫使開發者在實現數據處理算法的時候將其劃分成多個作業。Spark中的DAG可以包含任意個階段。一個簡單的作業可能隻有一個階段,而一個複雜的作業可能會有多個階段。這使得Spark可以做些Hadoop無法實現的優化。Spark可以一次執行一個包含多階段的複雜作業。因為它擁有所有階段的信息,所以可以進行優化。舉例來說,它可以減少磁盤I/O和數據shuffle操作的時間。數據的shuffle操作通常會涉及網絡間的數據傳輸,並且會增加應用程序的執行時間。
通用
Spark為各種類型的數據處理作業提供一個統一的集成平台。它可以用於批處理、交互分析、流處理、機器學習和圖計算。相比之比,Hadoop MapReduce隻適合批處理。因此一個使用Hadoop MapReduce的開發者為了能做流處理和圖計算隻能使用其他的框架。
對於不同類型的數據處理作業使用不同的框架,帶來了很多問題。首先,開發者不得不學習各種框架,每種框架的接口都不相同。這降低了開發者的生產力。其次,每種框架都相對獨立。因此,數據也必須複製多份,存放在不同的地方。類似地,代碼也必須重複多份,存放在多個地方。比如,你想使用Hadoop MapReduce處理曆史數據,同時使用Storm(一個流處理框架)處理流式數據,二者采用同樣的算法,那麼你不得不維護兩份相同的代碼,一份是Hadoop MapReduce的,一份是Storm的。最後,同時使用多個框架帶來了運維上的麻煩。你得為每一個框架創建並維護一個單獨的集群。要知道維護多個集群可比維護一個困難多了。
Spark自帶了一係列的庫,用於批處理、交互分析、流處理、機器學習和圖計算。使用Spark,可以使用單一框架來創建一個包含多個不同類型任務的數據處理流水線。從而,再也沒有必要為了多個不同類型的數據處理任務而學習不同框架或者部署單獨的集群了。使用Spark有助於降低運維的困難度,減少代碼和數據的重複。
有意思的是,越來越多流行的應用和庫開始集成到Spark中或添加了對Spark的支持,而它們一開始是使用Hadoop作為其執行引擎的。比如Apache Mahout(一個構建於Hadoop之上的機器學習庫)正在集成到Spark中。到了2014年4月,Mahout的開發者已經放棄了Hadoop並且不再添加新的基於MapReduce的機器學習算法了。
同樣地,Hive(見第1章)的開發者也正在開發一個運行在Spark上的版本。Pig(一個可以用腳本語言來創建數據處理流水線的數據分析平台)同樣支持Spark作為它的執行引擎。Cascading(一個用於開發Hadoop數據應用程序的應用開發平台)也添加了對Spark的支持。
可拓展
Spark是可擴展的。Spark集群的數據處理能力可以通過增加更多集群節點的方式得以提升。你可以從一個小集群開始,隨著數據量的增加,逐漸增加更多的計算能力。這相當經濟。
而且,Spark的這個特性對於應用程序來說是透明的。當你往Spark集群增加節點的時候無須改動任何代碼。
容錯
Spark是可容錯的。一個由數百個節點構成的集群中,每個節點在任何一天故障的可能性都很高。硬盤損壞或其他硬件問題都有可能導致節點不可用。Spark能自動處理集群中的節點故障。一個節點故障可能會導致性能下降但不會導致應用無法運行。
既然Spark能自動處理節點故障,應用程序的開發者就不必在應用中處理這樣的異常情況了,這簡化了應用程序的代碼。
3.1.2 理想的應用程序
就像前麵討論的那樣,Spark是一個通用框架,它用於各種大數據應用中。然而,對於一個理想的大數據應用程序而言,速度是相當重要的。使用迭代數據處理算法的應用和交互分析都是這樣的典型應用。
迭代算法
迭代算法是指那些在同樣數據上迭代多次的數據處理算法。使用這類算法的應用包括機器學習和圖處理應用。這些應用都在同樣的數據上迭代數十次乃至數百次算法。對於這類應用,Spark是理想的選擇。
Spark內存計算的特性使得在Spark上麵執行這些迭代算法比較快。由於Spark允許應用在內存中緩存數據,因此一個迭代算法哪怕需要迭代100次,也隻需要在第一次迭代的時候從硬盤讀取數據,接下來的迭代都從內存中讀取。而從內存中讀取數據比從硬盤要快100倍,所以在Spark上運行這些應用能快上一個數量級。
交互分析
交互式數據分析涉及交互式地探索數據。舉例來說,對於一個巨型數據集,在觸發一個可能需要花費數小時的長時間運行的批處理作業之前,先進行匯總分析是很有用的。類似地,一個商業分析師可能想要使用BI或數據可視化工具來進行交互分析。在這種場景下,用戶會在同一個數據集上執行多個查詢。Spark就提供了這樣一個用於大數據交互分析的理想平台。
Spark適用於交互分析的理由還是它的內存計算特性。應用程序可以緩存數據,從而使得數據能夠在內存中進行交互分析。第一個查詢請求從硬盤讀取數據,但是接下來的一連串請求都從內存中讀取緩存數據。查詢內存中的數據要比硬盤中的數據快上一個數量級。當數據緩存在內存中的時候,一個查詢請求可能隻需要花費數秒,而在硬盤中則需要不止一個小時。
3.2 總體架構
一個Spark應用包括5個重要部分:驅動程序、集群管理員、worker、執行者、任務(見圖3-1)。
圖3-1 高層Spark架構
3.2.1 worker
worker為Spark應用提供CPU、內存和存儲資源。worker把Spark應用當成分布式進程在集群節點上執行。
3.2.2 集群管理員
Spark使用集群管理員來獲得執行作業所需要的集群資源。顧名思義,集群管理員管理集群中worker節點的計算資源。它能跨應用從底層調度集群資源。它可以讓多個應用分享集群資源並且運行在同一個worker節點上。
Spark目前支持三種集群管理員:單獨模式、Mesos模式、YARN模式。Mesos模式和YARN模式都允許在同一個worker節點上同時運行Spark應用和Hadoop應用。第10章將詳細介紹集群管理員。
3.2.3 驅動程序
驅動程序是一個把Spark當成庫使用的應用。它提供數據處理的代碼,Spark將在worker節點上執行這些代碼。一個驅動程序可以在Spark集群上啟動一個或多個作業。
3.2.4 執行者
執行者是一個JVM進程,對於一個應用由Spark在每一個worker上創建。它可以多線程的方式並發執行應用代碼。它也可以把數據緩存在內存或硬盤中。
執行者的生命周期和創建它的應用一樣。一旦Spark應用結束,那麼為它創建的執行者也將壽終正寢。
3.2.5 任務
任務是Spark發送給執行者的最小工作單元。它運行在worker節點上執行者的一個線程中。每一個任務都執行一些計算,然後將結果返回給驅動程序,或者分區以用於shuffle操作。
Spark為每一個數據分區創建一個任務。一個執行者可以並發執行一個或多個任務。任務數量由分區的數量決定。更多的分區意味著將有更多的任務並行處理數據。
3.3 應用運行
本節主要描述數據處理代碼是怎麼在Spark集群中執行的。
3.3.1 術語
先來看看幾個術語的定義。
shuffle操作。shuffle操作是指在集群節點上對數據進行重新分配。這是一個耗時操作,因為它涉及在網絡間傳輸數據。需要注意的是,shuffle操作不是對數據進行隨機重新分配,它按照某些標準將數據分成不同的集合。每一個集合就是一個新的分區。
作業。作業是一係列計算的集合,Spark執行這些計算並將結果返回給驅動程序。作業本質上就是在Spark集群上運行數據處理算法。一個應用程序可以發起多個作業。本章稍後將會介紹作業是怎麼執行的。
階段。一個階段由若幹個任務構成。Spark將一個作業分解為一個由若幹個階段構成的DAG,每一個階段依賴於其他階段。舉個例子,把一個作業分解為階段0和階段1兩個階段。隻有當階段0完成之後,才可以開始階段1。Spark利用shuffle邊界將任務分成不同的階段。不要求shuffle操作的任務屬於同一階段。隻有在開始一個新階段時,任務才需要輸入數據是經過shuffle操作的。
3.3.2 應用運行過程
有了上麵的這些定義,我們就可以描述一個Spark應用在集群節點上並行處理數據的過程。當一個Spark應用開始運行的時候,Spark會連接集群管理員,獲取在worker節點上的執行者資源。就像前麵所說的,Spark應用把一個數據處理算法當成一個作業提交。Spark將這個作業分解成由若幹個階段構成的DAG。然後,Spark在執行者上調度這些階段的運行,調度操作由集群管理員提供的底層調度器實現。執行者並行地運行Spark提交的任務。
每一個Spark應用都有一組其自己的位於worker節點上的執行者。這樣的設計有諸多好處。首先,不同應用中的任務由於運行在不同JVM之上,使得它們之間互相隔離。一個應用程序中的錯誤任務並不會讓其他應用崩潰。其次,調度任務變得輕而易舉。Spark一次隻需要調度歸屬於同一應用的任務。它不用處理這樣一種複雜情況,其中調度的多個任務屬於多個並發執行的不同應用。
然而,這種設計也有不足之處。由於不同應用在不同的JVM進程中運行,因此它們之間就不太方便共享數據。即使它們可能在同一個worker節點上運行,它們也隻能通過讀寫磁盤的方式共享數據。就像前麵所說的,讀寫磁盤是耗時的操作。因此,應用間通過磁盤共享數據,將會遇到性能問題。
3.4 數據源
Spark本質上是一個使用集群節點進行大數據集處理的計算框架。與數據庫不同,它並沒有存儲係統,但是它可以搭配外部存儲係統使用。Spark一般都配合能存儲大量數據的分布式存儲係統使用。
Spark支持多種數據源。Spark應用程序可以使用的數據來源包括HDFS、HBase、Cassandra、Amazon S3,或者其他支持Hadoop的數據源。任何Hadoop支持的數據源都可以被Spark Core使用。Spark上的庫Spark SQL還支持更多數據源。第7章將會介紹Spark-SQL。
兼容支持Hadoop的數據源是相當重要的。許多組織都已經在Hadoop上麵投入了大量的精力。在HDFS或其他支持Hadoop的數據存儲係統上都存儲著大量的數據。使用Spark並不需要將這些數據遷移到其他存儲係統。而且,將Hadoop MapReduce替換成Spark並不需要另起爐灶,這是比較輕鬆的。如果現有的Hadoop集群正在執行MapReduce作業,也可以同時在上麵運行Spark應用。可以把現有的MapReduce作業轉化成Spark作業。或者,也可以保留現有的MapReduce應用程序,不做更改,使用Spark運行新的應用程序。
由於Spark Core原生支持Hadoop兼容的存儲係統,因此額外的數據源都能很方便地添加進來。比如,人們已經為Spark編寫好了各種數據源的連接器,包括Cassandra、MongoDB、CouchDB和其他流行的數據源。
Spark也支持本地文件係統。Spark應用程序可以讀寫本地文件係統上的數據。如果數據可以從本地文件讀取並在單機上處理,那麼沒必要使用Spark。盡管如此,Spark的這個特性使得它便於開發應用和調試,並且易學。
3.5 API
應用可以通過使用Spark提供的庫獲得Spark集群計算的能力。這些庫都是用Scala編寫的。但是Spark提供了各種語言的API。在本書編寫之際,Spark API提供了如下語言的支持:Scala、Java、Python和R。可以使用上麵的任何語言來開發Spark應用。也有其他語言(比如Clojure)的非官方支持。
Spark API主要由兩個抽象部件SparkContext和彈性分布式數據集(RDD)構成。應用程序通過這兩個部件和Spark進行交互。應用程序可以連接到Spark集群並使用相關資源。接下來會介紹這兩個抽象部件,然後詳細介紹RDD。
3.5.1 SparkContext
SparkContext是一個在Spark庫中定義的類。它是Spark庫的入口點。它表示與Spark集群的一個連接。使用Spark API創建的其他一些重要對象都依賴於它。
每個Spark應用程序都必須創建一個SparkContext類實例。目前,每個Spark應用程序隻能擁有一個激活的SparkContext類實例。如果要創建一個新的實例,那麼在此之前必須讓當前激活的類實例失活。
SparkContext有多個構造函數。最簡單的一個不需要任何參數。一個SparkContext類實例可以用如下代碼創建。
在這種情況下,SparkContext的配置信息都從係統屬性中獲取,比如Spark master的地址、應用名稱等。也可以創建一個SparkConf類實例,然後把它作為SparkContext的參數從而設定配置信息。SparkConf 是Spark庫中定義的一個類。通過這種方式可以像下麵這樣設置各種Spark配置信息。
SparkConf為設置諸如Spark master這樣的常用配置信息都提供了對應的顯式方法。此外,它還提供了一個通用的方法用於設置配置信息,它使用鍵-值對進行設置。SparkContext和SparkConf可以使用的參數將在第4章進行詳細介紹。
在本章接下來的例子中會繼續使用上麵創建的變量sc。
3.5.2 RDD
彈性分布式數據集(RDD)表示一個關於分區數據元素的集合,可以在其上進行並行操作。它是Spark的主要數據抽象概念。它是Spark庫中定義的一個抽象類。
從概念上看,除了可以用於表示分布式數據集和支持惰性操作的特性外,RDD類似於Spark的集合。惰性操作將在本章稍後部分詳細介紹。
下麵分別簡要描述RDD的特點。
不可變性
RDD是一種不可變的數據結構。一旦創建,它將不可以在原地修改。基本上,一個修改RDD的操作都會返回一個新的RDD。
分片
RDD表示的是一組數據的分區。這些分區分布在多個集群節點上。然而,當Spark在單個節點運行時,所有的分區數據都會在當前節點上。
Spark存儲RDD的分區和數據集物理分區之間關係的映射關係。RDD是各個分布式數據源之中數據的一個抽象,它通常表示分布在多個集群節點上的分區數據。比如HDFS將數據分片或分塊分散存儲在集群中。默認情況下,一個RDD分區對應一個HDFS文件分片。其他的分布式數據源(比如Cassandra)同樣也將數據分片分散存儲在集群多個節點上。然而,一個RDD對應多個Cassandra分片。
容錯性
RDD為可容錯的。RDD代表了分散在集群中多個節點的數據,但是任何一個節點都有可能出故障。誠如之前所說的,一個節點出故障的可能性和集群節點數量成正比。集群越大,在任何一個節點它出故障的可能性就越高。
RDD會自動處理節點出故障的情況。當一個節點出故障時,該節點上存儲的數據將無法被訪問。此時,Spark會在其他節點上重建丟失的RDD分區數據。Spark存儲每一個RDD的血統信息。通過這些血統信息,Spark可以恢複RDD的部分信息,當節點出故障的時候,它甚至可以恢複整個RDD。
接口
需要著重指出的是,RDD是一個處理數據的接口。在Spark庫中它定義為一個抽象類。RDD為多種數據源提供了一個處理數據的統一接口,包括HDFS、HBase、Cassandra等。這個接口同樣可以用於處理存儲於多個節點內存中的數據。
Spark為不同數據源提供了各自具體的實現類,比如HadoopRDD、ParallelCollection-RDD、JdbcRDD和CassandraRDD。它們都支持基礎的RDD接口。
強類型
RDD類有一個參數用於表示類型,這使得RDD可以表示不同類型的數據。RDD可以表示同一類型數據的分布式集合,包括Integer、Long、Float、String或者應用開發者自己定義的類型。而且,一個應用總會使用某種類型的RDD,包括Integer、Long、Float、Double、String或自定義類型。
駐留在內存中
之前已經提及了Spark的內存集群計算特性。RDD類提供一套支持內存計算的API。Spark允許RDD在內存中緩存或長期駐留。就像之前所說的,對一個緩存在內存中的RDD進行操作比操作沒緩存的RDD要快很多。
3.5.3 創建RDD
由於RDD是一個抽象類,因此無法直接創建一個RDD的類實例。SparkContext類提供了一個工廠方法用來創建RDD實現類的類實例。RDD也可以通過由其他RDD執行轉換操作得到。就像之前所說的,RDD是不可變的。任何一個對RDD的修改操作都將返回一個代表修改後數據的新RDD。
本節總結了幾種創建RDD的常見方法。在下麵的示例代碼中,sc是一個SparkContext的類實例。之前的章節已經介紹了怎麼創建它。
parallelize
這個方法用於從本地Scala集合創建RDD實例。它會對Scala集合中的數據重新分區、重新分布,然後返回一個代表這些數據的RDD。這個方法很少用在生產上,但是使用它有助於學習Spark。
textFile
textFile方法用於從文本文件創建RDD實例。它可以從多種來源讀取數據,包括單個文件、本地同一目錄下的多個文件、HDFS、Amazon S3,或其他Hadoop支持的存儲係統。這個方法返回一個RDD,這個RDD代表的數據集每個元素都是一個字符串,每一個字符串代表輸入文件中的一行。
上麵的代碼表示從存儲於HDFS上的一個文件或者目錄創建RDD實例。
textFile方法也可以讀取壓縮文件中的數據。而且,它的參數中可以存在通配符,用於從一個目錄中讀取多個文件。下麵是一個例子。
textFile的第二個參數是一個可選參數,它用於指定分區的個數。默認情況下,Spark為每一個文件分塊創建一個分區。可以設置成一個更大的數字從而提高並行化程度,但是設置成一個小於文件分塊數的數字是不可以的。
wholeTextFiles
這個方法讀取目錄下的所有文本文件,然後返回一個由鍵值型RDD。返回RDD中的每一個鍵值對對應一個文件。鍵為文件路徑,對應的值為該文件的內容。這個方法可以從多種來源讀取數據,包括本地文件係統、HDFS、Amazon S3,或者其他Hadoop支持的存儲係統。
sequenceFile
sequenceFile方法從SequenceFile文件中獲取鍵值對數據,這些SequenceFile文件可以存儲於本地文件係統、HDFS或者其他Hadoop支持的存儲係統。這個方法返回一個鍵值對型RDD實例。當使用這個方法的時候,不僅需要提供文件名,還需要提供文件中數據鍵和值各自的類型。
3.5.4 RDD操作
Spark應用使用RDD類或其繼承類中定義的方法來處理數據。這些方法也稱為操作。既然Scala中可以把一個方法當成操作符使用,那麼RDD中的方法有時也稱為操作符。
Spark的美好之處就在於同樣一個RDD方法既可以處理幾字節的數據也可以處理PB級的數據。而且Spark應用可以使用同樣的方法去處理數據,無論它是存儲於本地還是存儲於一個分布式存儲係統。這樣的靈活性使得開發者可以在單機上開發、調試、測試Spark應用,然後不用改動任何代碼就可以將它部署到一個大集群上。
RDD操作可以歸為兩類:轉換和行動。轉換將會創建一個新的RDD實例。行動則會將結果返回給驅動程序。
轉換
轉換指的是在原RDD實例上進行計算,而後創建一個新的RDD實例。本節將介紹一些常見的轉換操作。
從概念上看,RDD轉換操作的類似於Scala集合上的方法。主要的區別在於Scala集合方法操作的數據是在單機內存中的,而RDD的轉換操作可以處理分布在集群各個節點上的數據。另外一個重要的區別是,RDD轉換操作是惰性的,而Scala集合方法不是。本章餘下部分會詳細介紹這些內容。
map
map方法是一個高階方法,它把一個函數作為它的參數,並把這個函數作用在原RDD的每個元素上,從而創建一個新RDD實例。這個作為參數的函數擁有一個參數並返回一個值。
filter
filter方法是一個高階方法,它把一個布爾函數作為它的參數,並把這個函數作用在原RDD的每個元素上,從而創建一個新RDD實例。一個布爾函數隻有一個參數作為輸入,返回true或false。filter方法返回一個新的RDD實例,這個RDD實例代表的數據集由布爾函數返回true的元素構成。因此,新RDD實例代表的數據集是原RDD的子集。
flatMap
flatMap方法是一個高階方法,它把一個函數作為它的參數,這個函數處理原RDD中每個元素返回一個序列。扁平化這個序列的集合得到一個數據集,flatMap方法返回的RDD就代表這個數據集。
mapPartitions
mapPartitions是一個高階方法,它使你可以以分區的粒度來處理數據。相比於一次處理一個元素,mapPartitions一次處理處理一個分區,每個分區被當成一個迭代器。mapPartitions方法的函數參數把迭代器作為輸入,返回另外一個迭代器作為輸出。map-Partitions將自定義函數參數作用於每一個分區上,從而返回一個新RDD實例。
union
union方法把一個RDD實例作為輸入,返回一個新RDD實例,這個新RDD實例的數據集是原RDD和輸入RDD的合集。
intersection
intersection方法把一個RDD實例作為輸入,返回一個新RDD實例,這個新RDD實例代表的數據集是原RDD和輸入RDD的交集。
這是另外一個例子。
subtract
subtract方法把一個RDD實例作為輸入,返回一個新RDD實例,這個新RDD實例代表的數據集由那些存在於原RDD實例中但不在輸入RDD實例中的元素構成。
這是另外一個例子。
distinct
RDD實例上的distinct方法返回一個新RDD實例,這個新RDD實例的數據集由原RDD的數據集去重後得到。
cartesian
cartesian方法把一個RDD實例作為輸入,返回一個新RDD實例,這個新RDD實例的數據集由原RDD和輸入RDD的所有元素的笛卡兒積構成。返回的RDD實例的每一個元素都是一個有序二元組,每一個有序二元組的第一個元素來自原RDD,第二個元素來自輸入RDD。元素的個數等於原RDD的元素個數乘以輸入RDD的元素個數。
這個方法類似於SQL中的join操作。
zip
zip方法把一個RDD實例作為輸入,返回一個新RDD實例,這個新RDD實例的每一個元素是一個二元組,二元組的第一個元素來自原RDD,第二個元素來自輸入RDD。和cartesian方法不同的是,zip方法返回的RDD的元素個數於原RDD的元素個數。原RDD的元素個數和輸入RDD的相同。進一步地說,原RDD和輸入RDD不僅有相同的分區數,每個分區還有相同的元素個數。
zipWithIndex
zipWithIndex方法返回一個新RDD實例,這個新RDD實例的每個元素都是由原RDD元素及其下標構成的二元組。
groupBy
groupBy是一個高階方法,它將原RDD中的元素按照用戶定義的標準分組從而組成一個RDD。它把一個函數作為它的參數,這個函數為原RDD中的每一個元素生成一個鍵。groupBy把這個函數作用在原RDD的每一個元素上,然後返回一個由二元組構成的新RDD實例,每個二元組的第一個元素是函數生成的鍵,第二個元素是對應這個鍵的所有原RDD元素的集合。其中,鍵和原RDD元素的對應關係由那個作為參數的函數決定。
需要注意的是,groupBy是一個費時操作,因為它可能需要對數據做shuffle操作。
假設有一個CSV文件,文件的內容為公司客戶的姓名、年齡、性別和郵編。下麵的示例代碼演示了按照郵編將客戶分組。
keyBy
keyBy方法與groupBy方法相類似。它是一個高階方法,把一個函數作為參數,這個函數為原RDD中的每一個元素生成一個鍵。keyBy方法把這個函數作用在原RDD的每一個元素上,然後返回一個由二元組構成的新RDD實例,每個二元組的第一個元素是函數生成的鍵,第二個元素是對應這個鍵的原RDD元素。其中,鍵和原RDD元素的對應關係由那個作為參數的函數決定。返回的RDD實例的元素個數和原RDD的相同。
groupBy和KeyBy的區別在於返回RDD實例的元素上。雖然都是二元組,但是 groupBy返回的二元組中的第二個元素是一個集合,而keyBy的是單個值。
sortBy
sortBy是一個高階方法,它將原RDD中的元素進行排序後組成一個新的RDD實例返回。它擁有兩個參數。第一個參數是一個函數,這個函數將為原RDD的每一個元素生成一個鍵。第二個參數用來指明是升序還是降序排列。
下麵是另一個示例。
pipe
pipe方法可以讓你創建子進程來運行一段外部程序,然後捕獲它的輸出作為字符串,用這些字符串構成RDD實例返回。
randomSplit
randomSplit方法將原RDD分解成一個RDD數組。它的參數是分解的權重。
coalesce
coalesce方法用於減少RDD的分區數量。它把分區數作為參數,返回分區數等於這個參數的RDD實例。
使用coalesce方法時需要小心,因為減少了RDD的分區數也就意味著降低了Spark的並行能力。它通常用於合並小分區。舉例來說,在執行filter操作之後,RDD可能會有很多小分區。在這種情況下,減少分區數能提升性能。
repartition
repartition方法把一個整數作為參數,返回分區數等於這個參數的RDD實例。它有助於提高Spark的並行能力。它會重新分布數據,因此它是一個耗時操作。
coalesce和repartition方法看起來一樣,但是前者用於減少RDD中的分區,後者用於增加RDD中的分區。
sample
sample方法返回原RDD數據集的一個抽樣子集。它擁有三個參數。第一個參數指定是有放回抽樣還是無放回抽樣。第二個參數指定抽樣比例。第三個參數是可選的,指定抽樣的隨機數種子。
鍵值對型RDD的轉換
除了上麵介紹的RDD轉換之外,針對鍵值對型RDD還支持其他的一些轉換。下麵將介紹隻能作用於鍵值對型RDD的常用轉換操作。
keys
keys方法返回隻由原RDD中的鍵構成的RDD。
values
values方法返回隻由原RDD中的值構成的RDD。
mapValues
mapValues是一個高階方法,它把一個函數作為它的參數,並把這個函數作用在原RDD的每個值上。它返回一個由鍵值對構成的RDD。它和map方法類似,不同點在於它把作為參數的函數作用在原RDD的值上,所以原RDD的鍵都沒有變。返回的RDD和原RDD擁有相同的鍵。
join
join方法把一個鍵值對型RDD作為參數輸入,而後在原RDD和輸入RDD上做內連接操作。它返回一個由二元組構成的RDD。二元組的第一個元素是原RDD和輸入RDD都有的鍵,第二個元素是一個元組,這個元組由原RDD和輸入RDD中鍵對應的值構成。
leftOuterJoin
leftOuterJoin方法把一個鍵值對型RDD作為參數輸入,而後在原RDD和輸入RDD之間做左連接操作。它返回一個由鍵值對構成的RDD。鍵值對的第一個元素是原RDD中的鍵,第二個元素是一個元組,這個元組由原RDD中鍵對應的值和輸入RDD中的可選值構成。可選值用Option類型表示。
rightOuterJoin
rightOuterJoin方法把一個鍵值對型RDD作為參數輸入,而後在原RDD和輸入RDD之間做右連接操作。它返回一個由鍵值對構成的RDD。鍵值對的第一個元素是輸入RDD中的鍵,第二個元素是一個元組,這個元組由原RDD中的可選值和輸入RDD中鍵對應的值構成。可選值用Option類型表示。
fullOuterJoin
fullOuterJoin方法把一個鍵值對型RDD作為參數輸入,而後在原RDD和輸入RDD之間做全連接操作。它返回一個由鍵值對構成的RDD。
sampleByKey
sampleByKey通過在鍵上抽樣返回原RDD的一個子集。它把對每個鍵的抽樣比例作為輸入參數,返回原RDD的一個抽樣。
subtractByKey
subtractByKey方法把一個鍵值對型RDD作為輸入參數,返回一個鍵值對RDD,這個鍵值對RDD的鍵都是隻存在原RDD中但是不存在於輸入RDD中。
groupByKey
groupByKey方法返回一個由二元組構成的RDD,二元組的第一個元素是原RDD的鍵,第二個元素是一個集合,集合由該鍵對應的所有值構成。它類似於上麵介紹過的group-By方法。二者的區別在於groupBy是一個高階方法,它的參數是一個函數,這個函數為原RDD的每一個元素生成一個鍵。groupByKey方法作用於RDD的每一個鍵值對上,故不需要一個生成鍵的函數作為輸入參數。
應當盡量避免使用groupByKey。它是一個耗時操作,因為它可能會對數據進行shuffle操作。在大多數情況下,都有不使用groupByKey的更好的替代方案。
reduceByKey
reduceByKey是一個高階方法,它把一個滿足結合律的二元操作符當作輸入參數。它把這個操作符作用於有相同鍵的值上。
一個二元操作符把兩個值當作輸入參數,返回一個值。一個滿足結合律的二元操作符返回同樣的結果,但是它不關心操作數的分組情況。
reduceByKey方法可以用於對同一鍵對應的值進行匯總操作。比如它可以用於對同一鍵對應的值進行求和,求乘積,求最小值,求最大值。
對於基於鍵的匯總操作、合並操作,reduceByKey比groupByKey更合適。
操作
操作指的是那些返回值給驅動程序的RDD方法。本節介紹一些RDD中常用的操作。
collect
collect方法返回一個數組,這個數組由原RDD中的元素構成。在使用這個方法的時候需要小心,因為它把在worker節點的數據移給了驅動程序。如果操作一個有大數據集的RDD,它有可能會導致驅動程序崩潰。
count
count方法返回原RDD中元素的個數。
countByValue
countByValue方法返回原RDD中每個元素的個數。它返回是一個map類實例,其中,鍵為元素的值,值為該元素的個數。
first
first方法返回原RDD中的第一個元素。
max
max方法返回RDD中最大的元素。
min
min方法返回RDD中最小的元素。
take
take方法的輸入參數為一個整數N,它返回一個由原RDD中前N個元素構成的RDD。
takeOrdered
takeOrdered方法的輸入參數為一個整數N,它返回一個由原RDD中前N小的元素構成的RDD。
top
top方法的輸入參數為一個整數N,它返回一個由原RDD中前N大的元素構成的RDD。
fold
fold是一個高階方法,用於對原RDD的元素做匯總操作,匯總的時候使用一個自定義的初值和一個滿足結合律的二元操作符。它首先在每一個RDD的分區中進行匯總,然後再匯總這些結果。
初值的取值取決於RDD中的元素類型和匯總操作的目的。比如,給定一個元素為整數的RDD,為了計算這個RDD中所有元素的和,初值取為0。相反,給定一個元素為整數的RDD,為了計算這個RDD中所有元素的乘積,初值則應取為1。
reduce
reduce是一個高階方法,用於對原RDD的元素做匯總操作,匯總的時候使用一個滿足結合律和交換律的二元操作符。它類似於fold方法,然而,它並不需要初值。
鍵值對型RDD上的操作
鍵值對RDD上有一些額外的操作,我們在下麵進行介紹。
countByKey
countByKey方法用於統計原RDD每個鍵的個數。它返回一個map類實例,其中,鍵為原RDD中的鍵,值為個數。
lookup
lookup方法的輸入參數為一個鍵,返回一個序列,這個序列的元素為原RDD中這個鍵對應的值。
數值型RDD上的操作
如果RDD的元素類型為Integer、Long、Float或Double,則這樣的RDD為數值型RDD。這類RDD還有一些對於統計分析十分有用的額外操作,下麵將介紹一些常用的行動。
mean
mean方法返回原RDD中元素的平均值。
stdev
stdev方法返回原RDD中元素的標準差。
sum
sum方法返回原RDD中所有元素的和。
variance
variance方法返回原RDD中元素的方差。
3.5.5 保存RDD
一般來說,數據處理完畢後,結果會保存在硬盤上。Spark允許開發者將RDD保存在任何Hadoop支持的存儲係統中。保存在硬盤上的RDD可以被其他Spark應用或Hadoop應用使用。
本節介紹將RDD保存成文件的常用方法。
saveAsTextFile
saveAsTextFile方法將原RDD中的元素保存在指定目錄中,這個目錄位於任何Hadoop支持的存儲係統中。每一個RDD中的元素都用字符串表示並另存為文本中的一行。
saveAsObjectFile
saveAsObjectFile方法將原RDD中的元素序列化成Java對象,存儲在指定目錄中。
saveAsSequenceFile
saveAsSequenceFile方法將鍵值對型RDD以SequenceFile的格式保存。鍵值對型RDD也可以以文本的格式保存,隻須使用saveAsTextFile方法即可。
需要注意的是,上麵的方法都把一個目錄的名字作為輸入參數,然後在這個目錄為每個RDD分區創建一個文件。這種設計不僅高效而且可容錯。因為每一個分區被存成一個文件,所以Spark在保存RDD的時候可以啟動多個任務,並行執行,將數據寫入文件係統中。這樣也保證了寫入數據的過程是可容錯的。一旦有一個將分區寫入文件的任務失敗了,Spark可以再啟動一個任務,重寫剛才失敗任務創建的文件。
3.6 惰性操作
RDD的創建和轉換方法都是惰性操作。當應用調用一個返回RDD的方法的時候,Spark並不會立即執行運算。比如,當你使用SparkContext的textFile方法從HDFS中讀取文件時,Spark並不會馬上從硬盤中讀取文件。類似地,RDD轉換操作(它會返回新RDD)也是惰性的。Spark會記錄作用於RDD上的轉換操作。
讓我們考慮如下示例代碼。
上麵三行代碼看起來很快就會執行完,哪怕textFile方法讀取的是一個包含了10TB數據的文件。這其中的原因是當你調用textFile方法時,它並沒有真正讀取文件。類似地,filter方法也沒有立即遍曆原RDD中的每一個元素。
Spark僅僅記錄了這個RDD是怎麼創建的,在它上麵做轉換操作會創建怎樣的子RDD等信息。Spark為每一個RDD維護其各自的血統信息。在需要的時候,Spark利用這些信息創建RDD或重建RDD。
如果RDD的創建和轉換都是惰性操作,那麼Spark什麼時候才真正讀取數據和做轉換操作的計算呢?下麵將會解答這個問題。
觸發計算的操作
當Spark應用調用操作方法或者保存RDD至存儲係統的時候,RDD的轉換計算才真正執行。保存RDD至存儲係統也被視為一種操作,盡管它並沒有向驅動程序返回值。
當Spark應用調用RDD的操作方法或者保存RDD的時候,它觸發了Spark中的連鎖反應。當調用操作方法的時候,Spark會嚐試創建作為調用者的RDD。如果這個RDD是從文件中創建的,那麼Spark會在worker節點上讀取文件至內存中。如果這個RDD是通過其他RDD的轉換得到的子RDD,Spark會嚐試創建其父RDD。這個過程會一直持續下去,直到Spark找到根RDD。然後Spark就會真正執行這些生成RDD所必需的轉換計算,從而生成作為調用者的RDD。最後,執行操作方法所需的計算,將生成的結果返回給驅動程序。
惰性轉換使得Spark可以高效地執行RDD計算。直到Spark應用需要操作結果時才進行計算,Spark可以利用這一點優化RDD的操作。這使得操作流水線化,而且還避免了在網絡間不必要的數據傳輸。
3.7 緩存
除了將數據駐留在內存中以外,緩存在RDD中也扮演了另外一個重要的角色。就像之前所說的,創建RDD有兩種方式,從存儲係統中讀取數據或者應用其他現存RDD的轉換操作。默認情況下,當一個RDD的操作方法被調用時,Spark會根據它的父RDD來創建這個RDD,這有可能導致父RDD的創建。如此往複,這個過程一直持續到Spark找到根RDD,而後Spark通過從過存儲係統讀取數據的方式創建根RDD。操作方法被調用一次,上麵說的過程就會執行一遍。每次調用操作方法,Spark都會遍曆這個調用者RDD的血統樹,執行所有的轉換操作來創建它。
考慮下麵的例子。
盡管上麵的代碼隻調用了一次textFile方法,但是日誌文件會被從硬盤中讀取兩次。這是因為調用了兩次操作方法count。在調用errorLogs.count時,日誌文件第一次被讀取,調用warningLogs.count時,日誌文件被再次讀取。這隻是個簡單的例子,現實世界中的應用會有更多的各種轉換和操作。
如果一個RDD緩存了,Spark會執行到目前為止的所有轉換操作並為這個RDD創建一個檢查點。具體來說,這隻會在第一次在一個緩存的RDD上調用某操作的時候發生。類似於轉換方法,緩存方法也是惰性的。
如果一個應用緩存了RDD,Spark並不是立即執行計算並把它存儲在內存中。Spark隻有在第一次在緩存的RDD上調用某操作的時候才會將RDD物化在內存中。而且這第一次操作並不會從中受益,後續的操作才會從緩存中受益。因為它們不需要再執行從存儲係統中讀取數據開始的一係列操作。它們通常都運行得快多了。還有,那些隻使用一次數據的應用使用緩存也不會有任何好處。隻有那些需要對同樣數據做多次迭代的應用才能從緩存中受益。
如果一個應用把RDD緩存在內存中,Spark實際上是把它存儲在每個worker節點上執行者的內存中了。每個執行者把它所計算的RDD分區緩存在內存中。
3.7.1 RDD的緩存方法
RDD類提供了兩種緩存方法:cache和persist。
cache
cache方法把RDD存儲在集群中執行者的內存中。它實際上是將RDD物化在內存中。
下麵的例子展示了怎麼利用緩存優化上麵的例子。
persist
persist是一個通用版的cache方法。它把RDD存儲在內存中或者硬盤上或者二者皆有。它的輸入參數是存儲等級,這是一個可選參數。如果調用persist方法而沒有提供參數,那麼它的行為類似於cache方法。
persist方法支持下列常見的存儲選項。
MEMORY_ONLY:當一個應用把 MEMORY_ONLY作為參數調用persist方法時,Spark會將RDD分區采用反序列化Java對象的方式存儲在worker節點的內存中。如果一個RDD分區無法完全載入worker節點的內存中,那麼它將在需要時才計算。
DISK_ONLY:如果把DISK_ONLY作為參數調用persist方法,Spark會物化RDD分區,把它們存儲在每一個worker節點的本地文件係統中。這個參數可以用於緩存中間的RDD,這樣接下來的一係列操作就沒必要從根RDD開始計算了。
MEMORY_AND_DISK:這種情況下,Spark會盡可能地把RDD分區存儲在內存中,如果有剩餘,就把剩餘的分區存儲在硬盤上。
MEMORY_ONLY_SER:這種情況下,Spark會采用序列化Java對象的方式將RDD分區存儲在內存中。一個序列化的Java對象會消耗更少的內存,但是讀取是CPU密集型的操作。這個參數是在內存消耗和CPU使用之間做的一個妥協。
MEMORY_AND_DISK_SER:Spark會盡可能地以序列化Java對象的方式將RDD分區存儲在內存中。如果有剩餘,則剩餘的分區會存儲在硬盤上。
3.7.2 RDD緩存是可容錯的
在分布式環境中可容錯性是相當重要的。之前我們就已經知道了當節點出故障的時候Spark是怎麼自動把計算作業轉移到其他節點的。Spark的RDD機製同樣也是可容錯的。
即使一個緩存RDD的節點出故障了,Spark應用也不會崩潰。Spark會在另外節點上自動重新創建、緩存出故障的節點中存儲的分區。Spark利用RDD的血統信息來重新計算丟失的緩存分區。
3.7.3 緩存內存管理
Spark采用LRU算法來自動管理緩存占用的內存。隻有在必要時,Spark才會從緩存占用的內存中移除老的RDD分區。而且,RDD還提供了名為unpersist的方法。應用可以調用這個方法來從緩存占用的內存中手動移除RDD分區。
3.8 Spark作業
RDD上的轉換、操作和緩存方法構成了Spark應用的基礎。從本質上說,RDD描述了Spark編程模型。既然我們介紹過了編程模型,那麼接下來我們介紹在Spark應用中這些是怎麼結合在一起的。
作業指的是Spark將要執行的一些計算,它們將操作的結果返回給驅動程序。一個應用可以發起一個或多個作業。通過調用RDD的操作方法可以發起一個作業。也就是說,一個操作方法會觸發一個作業。如果一個操作是從未緩存的RDD或未緩存RDD的後代RDD發起的,Spark將會從存儲係統中讀取數據,從此開始作業。如果一個操作是從緩存過的RDD或者緩存過的RDD的後代RDD發起的,那麼Spark就會從那個緩存過的RDD開始作業。接下來,Spark會按照操作方法的要求執行必要的轉換操作來創建RDD。最後,執行操作所需的計算,一旦結果出來後,便將它返回給驅動程序。
當一個應用調用RDD的操作方法時,Spark會創建由若幹個階段構成的DAG。Spark根據shuffle邊界來將不同任務劃分成不同的階段。不需要shuffle操作的任務被劃分到同一個階段。那些輸入數據是已經做過shuffle操作的任務將開始一個新的階段。
一個階段可以由一個或者多個任務構成。Spark把任務提交給執行者,執行者將並行執行任務。在節點間調度任務的依據是數據分布情況。如果一個節點在處理任務時失效了,Spark會把這個任務提交給其他節點。
3.9 共享變量
Spark使用的架構是無共享的。數據分布在集群的各個節點上,每個節點都有自己的CPU、內存和存儲資源。沒有全局的內存空間用於任務間共享。驅動程序和任務之間通過消息共享數據。
舉例來說,如果一個RDD操作的函數參數是驅動程序中變量的引用,Spark會將這個變量的副本以及任務一起發送給執行者。每個任務都有一份變量的副本並把它當成隻讀變量使用。任何對這個變量的更新都隻存在任務的內部,改動並不會回傳給驅動程序。而且Spark會把這個變量在每一個階段的開始發送給worker節點。
對於一些應用而言,這種默認行為是低效的。在一個實際的使用場景中,驅動程序在作業的任務間共享了一個巨大的查找表。而這個作業由多個階段構成。默認情況下,Spark會自動將這個變量及其相關任務發送給每個執行者。然而,Spark會在每個階段做這件事。如果這個查找表存儲了100MB的數據,並且這個作業涉及10個階段,那麼Spark就會給每個worker節點發送10次100MB的相同數據。
另外一個使用場景是在每個運行在不同節點上的任務中需要更新全局變量。默認情況下,任務中對變量的更新是不會回傳給驅動程序的。
Spark通過共享變量的概念來滿足這些使用場景的需求。
3.9.1 廣播變量
廣播變量的使用使得Spark應用可以有效地在驅動程序和執行作業的任務之間共享數據。Spark隻會給worker節點發送一次廣播變量,並且將它反序列化成隻讀變量存儲在執行者的內存中。而且,Spark采用一種更高效的算法來發布廣播變量。
注意,如果一個作業由多個階段構成,且階段中的任務使用同一個驅動程序的變量,那麼使用廣播變量是十分有用的。如果你不想在開始執行每個任務之前反序列化變量,使用廣播變量也是有益的。默認情況下,Spark會將傳輸過來的變量以序列化的形式緩存在執行者的內存中,在開始執行任務之前再反序列化它。
SparkContext 類提供了一個叫作broadcast的方法用於創建廣播變量。它把一個待廣播的變量作為參數,返回一個Broadcast類實例。一個任務必須使用Broadcast對象的value方法才可以獲取廣播變量的值。
考慮這樣一個應用,它根據電商交易信息生成交易詳情。在現實世界的應用中會有一張顧客表、一張商品表和一張交易表。為了簡化起見,我們直接用一些簡單的數據結構來代替這些表作為輸入數據。
使用廣播變量使得我們可以高效地實現顧客數據、商品數據和交易數據之間的連接。我們可以通過使用RDD API來實現連接操作,但是這會在網絡間對顧客數據、商品數據和交易數據做shuffle操作。使用廣播變量,我們使得Spark隻將顧客數據和商品數據發送給每個節點一次,並且用簡單的map操作來代替耗時的join操作。
3.9.2 累加器
累加器是隻增變量,它可以被運行在不同節點上的任務更改並且被驅動程序讀取。它可以用於計數器和聚合操作。Spark提供了數值類型的累加器,也支持創建自定義類型的累加器。
SparkContext類提供了一個叫作accumulator的方法用於創建累加器變量。它有兩個參數。第一個參數是累加器的初值,第二個是在Spark UI中顯示的名字,這是一個可選參數。它返回一個Accumulator類實例。這個類實例為操作累加器變量提供操作符。任務隻能采用add方法或者+=操作符來增加累加器變量的值。隻有驅動程序可以通過value方法來獲取累加器的值。
考慮這樣一個應用,它需要從顧客表中過濾出不合法的顧客並計數。在現實世界的應用中,我們會從硬盤中讀取數據並將過濾後的數據寫入到硬盤中的另外一個文件。為簡化起見,我們跳過讀寫硬盤的部分。
在使用累加器的時候需要注意,轉換操作期間對累加器的更新無法保證恰好隻有一次。如果一個任務或一個階段重複執行,每一個任務的更新操作就會多次執行。
而且,對累加器的更新操作並不是在RDD的操作方法被調用時才執行的。RDD的轉換操作是惰性的,轉換操作中對累加器的更新並不會立即執行。因此,如果驅動程序在操作方法被調用之前就使用累加器的值,那麼它將得到一個錯誤的值。
3.10 總結
Spark是一個快速、可擴展、可容錯且基於內存的集群計算框架。一個Spark應用可以比Hadoop應用快上100倍。
Spark不但快速而且它能很方便地使用mapReduce。通過不同語言(包括Java、Python、Scala和R)的易讀的API,它可以方便地開發分布式大數據應用。使用Spark開發者的生產力可以有5~10倍的提升。
而且Spark為各種數據處理任務提供了統一的平台。它是一個通用的框架,可以被各種大數據應用使用。對於迭代式數據分析或者使用迭代算法的應用而言,它是一個理想的平台。
Spark的編程模型基於一個叫作RDD的抽象概念。從概念上看,RDD類似於Scala中的集合。它表示的數據就是一組分區的集合,這些分區分布在集群的節點上。它還為處理數據提供一些函數式的方法。
最後更新:2017-05-19 16:38:10