374
技術社區[雲棲]
《Spark 官方文檔》Spark編程指南
Spark編程指南
概述
總體上來說,每個Spark應用都包含一個驅動器(driver)程序,驅動器運行用戶的main函數,並在集群上執行各種並行操作。
Spark最重要的一個抽象概念就是彈性分布式數據集(resilient distributed dataset – RDD),RDD是一個可分區的元素集合,其包含的元素可以分布在集群各個節點上,並且可以執行一些分布式並行操作。RDD通常是通過,HDFS(或者其他Hadoop支持的文件係統)上的文件,或者驅動器中的Scala集合對象,來創建或轉換得到;其次,用戶也可以請求Spark將RDD持久化到內存裏,以便在不同的並行操作裏複用之;最後,RDD具備容錯性,可以從節點失敗中自動恢複數據。
Spark第二個重要抽象概念是共享變量,共享變量是一種可以在並行操作之間共享使用的變量。默認情況下,當Spark把一係列任務調度到不同節點上運行時,Spark會同時把每個變量的副本和任務代碼一起發送給各個節點。但有時候,我們需要在任務之間,或者任務和驅動器之間共享一些變量。Spark提供了兩種類型的共享變量:廣播變量和累加器,廣播變量可以用來在各個節點上緩存數據,而累加器則是用來執行跨節點的“累加”操作,例如:計數和求和。
本文將會使用Spark所支持的所有語言來展示Spark的特性。如果你能啟動Spark的交互式shell動手實驗一下,效果會更好(對scala請使用bin/spark-shell,而對於python,請使用bin/pyspark)。
鏈接Spark
Spark 1.6.0 使用了Scala 2.10。用Scala寫應用的話,你需要使用一個兼容的Scala版本(如:2.10.X)
同時,如果你需要在maven中依賴Spark,可以用如下maven工件標識:
groupId = org.apache.spark
artifactId = spark-core_2.10
version = 1.6.0
另外,如果你需要訪問特定版本的HDFS,那麼你可能需要增加相應版本的hadoop-client依賴項,其maven工件標識如下:
groupId = org.apache.hadoop
artifactId = hadoop-client
version = <your-hdfs-version>
最後,你需要如下,在你的代碼裏導入一些Spark class:
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
(在Spark 1.3.0之前,你需要顯示的 import org.apache.spark.SparkContext._
來啟用這些重要的隱式轉換)
初始化Spark
Spark應用程序需要做的第一件事就是創建一個 SparkContext 對象,SparkContext對象決定了Spark如何訪問集群。而要新建一個SparkContext對象,你還得需要構造一個 SparkConf 對象,SparkConf對象包含了你的應用程序的配置信息。
每個JVM進程中,隻能有一個活躍(active)的SparkContext對象。如果你非要再新建一個,那首先必須將之前那個活躍的SparkContext 對象stop()掉。
val conf = new SparkConf().setAppName(appName).setMaster(master)
new SparkContext(conf)
appName參數值是你的應用展示在集群UI上的應用名稱。master參數值是Spark, Mesos or YARN cluster URL 或者特殊的“local”(本地模式)。實際上,一般不應該將master參數值硬編碼到代碼中,而是應該用spark-submit腳本的參數來設置。然而,如果是本地測試或單元測試中,你可以直接在代碼裏給master參數寫死一個”local”值。
使用shell
在Spark shell中,默認已經為你新建了一個SparkContext對象,變量名為sc。所以spark-shell裏不能自建SparkContext對象。你可以通過–master參數設置要連接到哪個集群,而且可以給–jars參數傳一個逗號分隔的jar包列表,以便將這些jar包加到classpath中。你還可以通過–packages設置逗號分隔的maven工件列表,以便增加額外的依賴項。同樣,還可以通過–repositories參數增加maven repository地址。下麵是一個示例,在本地4個CPU core上運行的實例:
$ ./bin/spark-shell –master local[4]
或者,將code.jar添加到classpath下:
$ ./bin/spark-shell --master local[4] --jars code.jar
通過maven標識添加依賴:
$ ./bin/spark-shell --master local[4] --packages "org.example:example:0.1"
spark-shell –help可以查看完整的選項列表。實際上,spark-shell是在後台調用spark-submit來實現其功能的(spark-submit
script.)
彈性分布式數據集(RDD)
Spark的核心概念是彈性分布式數據集(RDD),RDD是一個可容錯、可並行操作的分布式元素集合。總體上有兩種方法可以創建RDD對象:由驅動程序中的集合對象通過並行化操作創建,或者從外部存儲係統中數據集加載(如:共享文件係統、HDFS、HBase或者其他Hadoop支持的數據源)。
並行化集合
並行化集合是以一個已有的集合對象(例如:Scala Seq)為參數,調用 SparkContext.parallelize() 方法創建得到的RDD。集合對象中所有的元素都將被複製到一個可並行操作的分布式數據集中。例如,以下代碼將一個1到5組成的數組並行化成一個RDD:
val data = Array(1, 2, 3, 4, 5)
val distData = sc.parallelize(data)
一旦創建成功,該分布式數據集(上例中的distData)就可以執行一些並行操作。如,distData.reduce((a, b) => a + b),這段代碼會將集合中所有元素加和。後麵我們還會繼續討論分布式數據集上的各種操作。
並行化集合的一個重要參數是分區(partition),即這個分布式數據集可以分割為多少片。Spark中每個任務(task)都是基於分區的,每個分區一個對應的任務(task)。典型場景下,一般每個CPU對應2~4個分區。並且一般而言,Spark會基於集群的情況,自動設置這個分區數。當然,你還是可以手動控製這個分區數,隻需給parallelize方法再傳一個參數即可(如:sc.parallelize(data, 10) )。注意:Spark代碼裏有些地方仍然使用分片(slice)這個術語,這隻不過是分區的一個別名,主要為了保持向後兼容。
外部數據集
Spark 可以通過Hadoop所支持的任何數據源來創建分布式數據集,包括:本地文件係統、HDFS、Cassandra、HBase、Amazon S3 等。Spark 支持的文件格式包括:文本文件(text files)、SequenceFiles,以及其他 Hadoop 支持的輸入格式(InputFormat)。
文本文件創建RDD可以用 SparkContext.textFile 方法。這個方法輸入參數是一個文件的URI(本地路徑,或者 hdfs://,s3n:// 等),其輸出RDD是一個文本行集合。以下是一個簡單示例:
scala> val distFile = sc.textFile("data.txt")
distFile: RDD[String] = MappedRDD@1d4cee08
創建後,distFile 就可以執行數據集的一些操作。比如,我們可以把所有文本行的長度加和:distFile.map(s => s.length).reduce((a, b) => a + b)
以下是一些Spark讀取文件的要點:
- 如果是本地文件係統,那麼這個文件必須在所有的worker節點上能夠以相同的路徑訪問到。所以要麼把文件複製到所有worker節點上同一路徑下,要麼掛載一個共享文件係統。
- 所有Spark基於文件輸入的方法(包括textFile)都支持輸入參數為:目錄,壓縮文件,以及通配符。例如:textFile(“/my/directory”), textFile(“/my/directory/*.txt”), 以及 textFile(“/my/directory/*.gz”)
- textFile方法同時還支持一個可選參數,用以控製數據的分區個數。默認地,Spark會為文件的每一個block創建一個分區(HDFS上默認block大小為64MB),你可以通過調整這個參數來控製數據的分區數。注意,分區數不能少於block個數。
除了文本文件之外,Spark的Scala API還支持其他幾種數據格式:
- SparkContext.wholeTextFiles 可以讀取一個包含很多小文本文件的目錄,並且以 (filename, content) 鍵值對的形式返回結果。這與textFile 不同,textFile隻返回文件的內容,每行作為一個元素。
- 對於SequenceFiles,可以調用 SparkContext.sequenceFile[K, V],其中 K 和 V 分別是文件中key和value的類型。這些類型都應該是 Writable 接口的子類, 如:IntWritable and Text 等。另外,Spark 允許你為一些常用Writable指定原生類型,例如:sequenceFile[Int, String] 將自動讀取 IntWritable 和 Text。
- 對於其他的Hadoop InputFormat,你可以用 SparkContext.hadoopRDD 方法,並傳入任意的JobConf 對象和 InputFormat,以及key class、value class。這和設置Hadoop job的輸入源是同樣的方法。你還可以使用 SparkContext.newAPIHadoopRDD,該方法接收一個基於新版Hadoop MapReduce API (org.apache.hadoop.mapreduce)的InputFormat作為參數。
- RDD.saveAsObjectFile 和 SparkContext.objectFile 支持將RDD中元素以Java對象序列化的格式保存成文件。雖然這種序列化方式不如Avro效率高,卻為保存RDD提供了一種簡便方式。
RDD算子
RDD支持兩種類型的算子(operation):transformation算子 和 action算子;transformation算子可以將已有RDD轉換得到一個新的RDD,而action算子則是基於數據集計算,並將結果返回給驅動器(driver)。例如,map是一個transformation算子,它將數據集中每個元素傳給一個指定的函數,並將該函數返回結果構建為一個新的RDD;而 reduce是一個action算子,它可以將RDD中所有元素傳給指定的聚合函數,並將最終的聚合結果返回給驅動器(還有一個reduceByKey算子,其返回的聚合結果是一個數據集)。
Spark中所有transformation算子都是懶惰的,也就是說,這些算子並不立即計算結果,而是記錄下對基礎數據集(如:一個數據文件)的轉換操作。隻有等到某個action算子需要計算一個結果返回給驅動器的時候,transformation算子所記錄的操作才會被計算。這種設計使Spark可以運行得更加高效 – 例如,map算子創建了一個數據集,同時該數據集下一步會調用reduce算子,那麼Spark將隻會返回reduce的最終聚合結果(單獨的一個數據)給驅動器,而不是將map所產生的數據集整個返回給驅動器。
默認情況下,每次調用action算子的時候,每個由transformation轉換得到的RDD都會被重新計算。然而,你也可以通過調用persist(或者cache)操作來持久化一個RDD,這意味著Spark將會把RDD的元素都保存在集群中,因此下一次訪問這些元素的速度將大大提高。同時,Spark還支持將RDD元素持久化到內存或者磁盤上,甚至可以支持跨節點多副本。
基礎
以下簡要說明一下RDD的基本操作,參考如下代碼:
val lines = sc.textFile("data.txt")
val lineLengths = lines.map(s => s.length)
val totalLength = lineLengths.reduce((a, b) => a + b)
其中,第一行是從外部文件加載數據,並創建一個基礎RDD。這時候,數據集並沒有加載進內存除非有其他操作施加於lines,這時候的lines RDD其實可以說隻是一個指向 data.txt 文件的指針。第二行,用lines通過map轉換得到一個lineLengths RDD,同樣,lineLengths也是懶惰計算的。最後,我們使用 reduce算子計算長度之和,reduce是一個action算子。此時,Spark將會把計算分割為一些小的任務,分別在不同的機器上運行,每台機器上都運行相關的一部分map任務,並在本地進行reduce,並將這些reduce結果都返回給驅動器。
如果我們後續需要重複用到 lineLengths RDD,我們可以增加一行:
lineLengths.persist()
這一行加在調用 reduce 之前,則 lineLengths RDD 首次計算後,Spark會將其數據保存到內存中。
將函數傳給Spark
Spark的API 很多都依賴於在驅動程序中向集群傳遞操作函數。以下是兩種建議的實現方式:
- 匿名函數(Anonymous function syntax),這種方式代碼量比較少。
- 全局單件中的靜態方法。例如,你可以按如下方式定義一個 object MyFunctions 並傳遞其靜態成員函數 MyFunctions.func1:
object MyFunctions {
def func1(s: String): String = { ... }
}
myRdd.map(MyFunctions.func1)
注意,技術上來說,你也可以傳遞一個類對象實例上的方法(不是單件對象),不過這回導致傳遞函數的同時,需要把相應的對象也發送到集群中各節點上。例如,我們定義一個MyClass如下:
class MyClass {
def func1(s: String): String = { ... }
def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(func1) }
}
如果我們 new MyClass 創建一個實例,並調用其 doStuff 方法,同時doStuff中的 map算子引用了該MyClass實例上的 func1 方法,那麼接下來,這個MyClass對象將被發送到集群中所有節點上。rdd.map(x => this.func1(x)) 也會有類似的效果。
類似地,如果應用外部對象的成員變量,也會導致對整個對象實例的引用:
class MyClass {
val field = "Hello"
def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(x => field + x) }
}
上麵的代碼對field的引用等價於 rdd.map(x => this.field + x),這將導致應用整個this對象。為了避免類似問題,最簡單的方式就是,將field固執到一個本地臨時變量中,而不是從外部直接訪問之,如下:
def doStuff(rdd: RDD[String]): RDD[String] = {
val field_ = this.field
rdd.map(x => field_ + x)
}
理解閉包
Spark裏一個比較難的事情就是,理解在整個集群上跨節點執行的變量和方法的作用域以及生命周期。Spark裏一個頻繁出現的問題就是RDD算子在變量作用域之外修改了其值。下麵的例子,我們將會以foreach() 算子為例,來遞增一個計數器counter,不過類似的問題在其他算子上也會出現。
示例
考慮如下例子,我們將會計算RDD中原生元素的總和,如果不是在同一個JVM中執行,其表現將有很大不同。例如,這段代碼如果使用Spark本地模式(–master=local[n])運行,和在集群上運行(例如,用spark-submit提交到YARN上)結果完全不同。
var counter = 0
var rdd = sc.parallelize(data)
// Wrong: Don't do this!!
rdd.foreach(x => counter += x)
println("Counter value: " + counter)
本地模式 v.s. 集群模式
上麵這段代碼其行為是不確定的。在本地模式下運行,所有代碼都在運行於單個JVM中,所以RDD的元素都能夠被累加並保存到counter變量中,這是因為本地模式下,counter變量和驅動器節點在同一個內存空間中。
然而,在集群模式下,情況會更複雜,以上代碼的運行結果就不是所預期的結果了。為了執行這個作業,Spark會將RDD算子的計算過程分割成多個獨立的任務(task)- 每個任務分發給不同的執行器(executor)去執行。而執行之前,Spark需要計算閉包。閉包是由執行器執行RDD算子(本例中的foreach())時所需要的變量和方法組成的。閉包將會被序列化,並發送給每個執行器。由於本地模式下,隻有一個執行器,所有任務都共享同樣的閉包。而在其他模式下,情況則有所不同,每個執行器都運行於不同的worker節點,並且都擁有獨立的閉包副本。
在上麵的例子中,閉包中的變量會跟隨不同的閉包副本,發送到不同的執行器上,所以等到foreach真正在執行器上運行時,其引用的counter已經不再是驅動器上所定義的那個counter副本了,驅動器內存中仍然會有一個counter變量副本,但是這個副本對執行器是不可見的!執行器隻能看到其所收到的序列化閉包中包含的counter副本。因此,最終驅動器上得到的counter將會是0。
為了確保類似這樣的場景下,代碼能有確定的行為,這裏應該使用累加器(Accumulator)。累加器是Spark中專門用於集群跨節點分布式執行計算中,安全地更新同一變量的機製。本指南中專門有一節詳細說明累加器。
通常來說,閉包(由循環或本地方法組成),不應該改寫全局狀態。Spark中改寫閉包之外對象的行為是未定義的。這種代碼,有可能在本地模式下能正常工作,但這隻是偶然情況,同樣的代碼在分布式模式下其行為很可能不是你想要的。所以,如果需要全局聚合,請記得使用累加器(Accumulator)。
打印RDD中的元素
另一種常見習慣是,試圖用 rdd.foreach(println) 或者 rdd.map(println) 來打印RDD中所有的元素。如果是在單機上,這種寫法能夠如預期一樣,打印出RDD所有元素。然後,在集群模式下,這些輸出將會被打印到執行器的標準輸出(stdout)上,因此驅動器的標準輸出(stdout)上神馬也看不到!如果真要在驅動器上把所有RDD元素都打印出來,你可以先調用collect算子,把RDD元素先拉倒驅動器上來,代碼可能是這樣:rdd.collect().foreach(println)。不過如果RDD很大的話,有可能導致驅動器內存溢出,因為collect會把整個RDD都弄到驅動器所在單機上來;如果你隻是需要打印一部分元素,那麼take是不更安全的選擇:rdd.take(100).foreach(println)
使用鍵值對
大部分Spark算子都能在包含任意類型對象的RDD上工作,但也有一部分特殊的算子要求RDD包含的元素必須是鍵值對(key-value pair)。這種算子常見於做分布式混洗(shuffle)操作,如:以key分組或聚合。
在Scala中,這種操作在包含 Tuple2 (內建與scala語言,可以這樣創建:(a, b) )類型對象的RDD上自動可用。鍵值對操作是在 PairRDDFunctions 類上可用,這個類型也會自動包裝到包含tuples的RDD上。
例如,以下代碼將使用reduceByKey算子來計算文件中每行文本出現的次數:
val lines = sc.textFile("data.txt")
val pairs = lines.map(s => (s, 1))
val counts = pairs.reduceByKey((a, b) => a + b)
同樣,我們還可以用 counts.sortByKey() 來對這些鍵值對按字母排序,最後再用 counts.collect() 將數據以對象數據組的形式拉到驅動器內存中。
注意:如果使用自定義類型對象做鍵值對中的key的話,你需要確保自定義類型實現了 equals() 方法(通常需要同時也實現hashCode()方法)。完整的細節可以參考:Object.hashCode() documentation
轉換算子 – transformation
以下是Spark支持的一些常用transformation算子。詳細請參考 RDD API doc (Scala, Java, Python, R) 以及 鍵值對 RDD 函數 (Scala, Java) 。
transformation算子 | 作用 |
---|---|
map(func) | 返回一個新的分布式數據集,其中每個元素都是由源RDD中一個元素經func轉換得到的。 |
filter(func) | 返回一個新的數據集,其中包含的元素來自源RDD中元素經func過濾後(func返回true時才選中)的結果 |
flatMap(func) | 類似於map,但每個輸入元素可以映射到0到n個輸出元素(所以要求func必須返回一個Seq而不是單個元素) |
mapPartitions(func) | 類似於map,但基於每個RDD分區(或者數據block)獨立運行,所以如果RDD包含元素類型為T,則 func 必須是 Iterator<T> => Iterator<U> 的映射函數。 |
mapPartitionsWithIndex(func) | 類似於 mapPartitions,隻是func 多了一個整型的分區索引值,因此如果RDD包含元素類型為T,則 func 必須是 Iterator<T> => Iterator<U> 的映射函數。 |
sample(withReplacement, fraction, seed) | 采樣部分(比例取決於 fraction )數據,同時可以指定是否使用回置采樣(withReplacement),以及隨機數種子(seed) |
union(otherDataset) | 返回源數據集和參數數據集(otherDataset)的並集 |
intersection(otherDataset) | 返回源數據集和參數數據集(otherDataset)的交集 |
distinct([numTasks])) | 返回對源數據集做元素去重後的新數據集 |
groupByKey([numTasks]) | 隻對包含鍵值對的RDD有效,如源RDD包含 (K, V) 對,則該算子返回一個新的數據集包含 (K, Iterable<V>) 對。 注意:如果你需要按key分組聚合的話(如sum或average),推薦使用 reduceByKey或者 aggregateByKey 以獲得更好的性能。 注意:默認情況下,輸出計算的並行度取決於源RDD的分區個數。當然,你也可以通過設置可選參數 numTasks 來指定並行任務的個數。 |
reduceByKey(func, [numTasks]) | 如果源RDD包含元素類型 (K, V) 對,則該算子也返回包含(K, V) 對的RDD,隻不過每個key對應的value是經過func聚合後的結果,而func本身是一個 (V, V) => V 的映射函數。 另外,和 groupByKey 類似,可以通過可選參數 numTasks 指定reduce任務的個數。 |
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) | 如果源RDD包含 (K, V) 對,則返回新RDD包含 (K, U) 對,其中每個key對應的value都是由 combOp 函數 和 一個“0”值zeroValue 聚合得到。允許聚合後value類型和輸入value類型不同,避免了不必要的開銷。和 groupByKey 類似,可以通過可選參數 numTasks 指定reduce任務的個數。 |
sortByKey([ascending], [numTasks]) | 如果源RDD包含元素類型 (K, V) 對,其中K可排序,則返回新的RDD包含 (K, V) 對,並按照 K 排序(升序還是降序取決於 ascending 參數) |
join(otherDataset, [numTasks]) | 如果源RDD包含元素類型 (K, V) 且參數RDD(otherDataset)包含元素類型(K, W),則返回的新RDD中將包含內關聯後key對應的 (K, (V, W)) 對。外關聯(Outer joins)操作請參考 leftOuterJoin、rightOuterJoin 以及 fullOuterJoin 算子。 |
cogroup(otherDataset, [numTasks]) | 如果源RDD包含元素類型 (K, V) 且參數RDD(otherDataset)包含元素類型(K, W),則返回的新RDD中包含 (K, (Iterable<V>, Iterable<W>))。該算子還有個別名:groupWith |
cartesian(otherDataset) | 如果源RDD包含元素類型 T 且參數RDD(otherDataset)包含元素類型 U,則返回的新RDD包含前二者的笛卡爾積,其元素類型為 (T, U) 對。 |
pipe(command, [envVars]) | 以shell命令行管道處理RDD的每個分區,如:Perl 或者 bash 腳本。 RDD中每個元素都將依次寫入進程的標準輸入(stdin),然後按行輸出到標準輸出(stdout),每一行輸出字符串即成為一個新的RDD元素。 |
coalesce(numPartitions) | 將RDD的分區數減少到numPartitions。當以後大數據集被過濾成小數據集後,減少分區數,可以提升效率。 |
repartition(numPartitions) | 將RDD數據重新混洗(reshuffle)並隨機分布到新的分區中,使數據分布更均衡,新的分區個數取決於numPartitions。該算子總是需要通過網絡混洗所有數據。 |
repartitionAndSortWithinPartitions(partitioner) | 根據partitioner(spark自帶有HashPartitioner和RangePartitioner等)重新分區RDD,並且在每個結果分區中按key做排序。這是一個組合算子,功能上等價於先 repartition 再在每個分區內排序,但這個算子內部做了優化(將排序過程下推到混洗同時進行),因此性能更好。 |
動作算子 – action
以下是Spark支持的一些常用action算子。詳細請參考 RDD API doc (Scala, Java, Python, R) 以及 鍵值對 RDD 函數 (Scala, Java) 。
Action算子 | 作用 |
---|---|
reduce(func) | 將RDD中元素按func進行聚合(func是一個 (T,T) => T 的映射函數,其中T為源RDD元素類型,並且func需要滿足 交換律 和 結合律 以便支持並行計算) |
collect() | 將數據集中所有元素以數組形式返回驅動器(driver)程序。通常用於,在RDD進行了filter或其他過濾操作後,將一個足夠小的數據子集返回到驅動器內存中。 |
count() | 返回數據集中元素個數 |
first() | 返回數據集中首個元素(類似於 take(1) ) |
take(n) | 返回數據集中前 n 個元素 |
takeSample(withReplacement,num, [seed]) | 返回數據集的隨機采樣子集,最多包含 num 個元素,withReplacement 表示是否使用回置采樣,最後一個參數為可選參數seed,隨機數生成器的種子。 |
takeOrdered(n, [ordering]) | 按元素排序(可以通過 ordering 自定義排序規則)後,返回前 n 個元素 |
saveAsTextFile(path) | 將數據集中元素保存到指定目錄下的文本文件中(或者多個文本文件),支持本地文件係統、HDFS 或者其他任何Hadoop支持的文件係統。 保存過程中,Spark會調用每個元素的toString方法,並將結果保存成文件中的一行。 |
saveAsSequenceFile(path) (Java and Scala) |
將數據集中元素保存到指定目錄下的Hadoop Sequence文件中,支持本地文件係統、HDFS 或者其他任何Hadoop支持的文件係統。適用於實現了Writable接口的鍵值對RDD。在Scala中,同樣也適用於能夠被隱式轉換為Writable的類型(Spark實現了所有基本類型的隱式轉換,如:Int,Double,String 等) |
saveAsObjectFile(path) (Java and Scala) |
將RDD元素以Java序列化的格式保存成文件,保存結果文件可以使用 SparkContext.objectFile 來讀取。 |
countByKey() | 隻適用於包含鍵值對(K, V)的RDD,並返回一個哈希表,包含 (K, Int) 對,表示每個key的個數。 |
foreach(func) | 在RDD的每個元素上運行 func 函數。通常被用於累加操作,如:更新一個累加器(Accumulator ) 或者 和外部存儲係統互操作。 注意:用 foreach 操作出累加器之外的變量可能導致未定義的行為。更詳細請參考前麵的“理解閉包”(Understanding closures )這一小節。 |
混洗操作
有一些Spark算子會觸發眾所周知的混洗(Shuffle)事件。Spark中的混洗機製是用於將數據重新分布,其結果是所有數據將在各個分區間重新分組。一般情況下,混洗需要跨執行器(executor)或跨機器複製數據,這也是混洗操作一般都比較複雜而且開銷大的原因。
背景
為了理解混洗階段都發生了哪些事,我首先以 reduceByKey 算子為例來看一下。reduceByKey算子會生成一個新的RDD,將源RDD中一個key對應的多個value組合進一個tuple - 然後將這些values輸入給reduce函數,得到的result再和key關聯放入新的RDD中。這個算子的難點在於對於某一個key來說,並非其對應的所有values都在同一個分區(partition)中,甚至有可能都不在同一台機器上,但是這些values又必須放到一起計算reduce結果。
在Spark中,通常是由於為了進行某種計算操作,而將數據分布到所需要的各個分區當中。而在計算階段,單個任務(task)隻會操作單個分區中的數據 – 因此,為了組織好每個reduceByKey中reduce任務執行時所需的數據,Spark需要執行一個多對多操作。即,Spark需要讀取RDD的所有分區,並找到所有key對應的所有values,然後跨分區傳輸這些values,並將每個key對應的所有values放到同一分區,以便後續計算各個key對應values的reduce結果 – 這個過程就叫做混洗(Shuffle)。
雖然混洗好後,各個分區中的元素和分區自身的順序都是確定的,但是分區中元素的順序並非確定的。如果需要混洗後分區內的元素有序,可以參考使用以下混洗操作:
mapPartitions 使用 .sorted 對每個分區排序
-
repartitionAndSortWithinPartitions
重分區的同時,對分區進行排序,比自行組合repartition和sort更高效 - sortBy 創建一個全局有序的RDD
會導致混洗的算子有:重分區(repartition)類算子,如: repartition
和 coalesce;
ByKey 類算子(除了計數類的,如 countByKey) 如:groupByKey
和 reduceByKey;以及Join類算子,如:
cogroup
和 join
.
性能影響
混洗(Shuffle)之所以開銷大,是因為混洗操作需要引入磁盤I/O,數據序列化以及網絡I/O等操作。為了組織好混洗數據,Spark需要生成對應的任務集 – 一係列map任務用於組織數據,再用一係列reduce任務來聚合數據。注意這裏的map、reduce是來自MapReduce的術語,和Spark的map、reduce算子並沒有直接關係。
在Spark內部,單個map任務的輸出會盡量保存在內存中,直至放不下為止。然後,這些輸出會基於目標分區重新排序,並寫到一個文件裏。在reduce端,reduce任務隻讀取與之相關的並已經排序好的blocks。
某些混洗算子會導致非常明顯的內存開銷增長,因為這些算子需要在數據傳輸前後,在內存中維護組織數據記錄的各種數據結構。特別地,reduceByKey和aggregateByKey都會在map端創建這些數據結構,而ByKey係列算子都會在reduce端創建這些數據結構。如果數據在內存中存不下,Spark會把數據吐到磁盤上,當然這回導致額外的磁盤I/O以及垃圾回收的開銷。
混洗還會再磁盤上生成很多臨時文件。以Spark-1.3來說,這些臨時文件會一直保留到其對應的RDD被垃圾回收才刪除。之所以這樣做,是因為如果血統信息需要重新計算的時候,這些混洗文件可以不必重新生成。如果程序持續引用這些RDD或者垃圾回收啟動頻率較低,那麼這些垃圾回收可能需要等較長的一段時間。這就意味著,長時間運行的Spark作業可能會消耗大量的磁盤。Spark的臨時存儲目錄,是由spark.local.dir 配置參數指定的。
混洗行為可以由一係列配置參數來調優。參考Spark配置指南(Spark Configuration Guide)中“混洗行為”這一小節。
RDD持久化
Spark的一項關鍵能力就是它可以持久化(或者緩存)數據集在內存中,從而跨操作複用這些數據集。如果你持久化了一個RDD,那麼每個節點上都會存儲該RDD的一些分區,這些分區是由對應的節點計算出來並保持在內存中,後續可以在其他施加在該RDD上的action算子中複用(或者從這些數據集派生新的RDD)。這使得後續動作的速度提高很多(通常高於10倍)。因此,緩存對於迭代算法和快速交互式分析是一個很關鍵的工具。
你可以用persist() 或者 cache() 來標記一下需要持久化的RDD。等到該RDD首次被施加action算子的時候,其對應的數據分區就會被保留在內存裏。同時,Spark的緩存具備一定的容錯性 – 如果RDD的任何一個分區丟失了,Spark將自動根據其原來的血統信息重新計算這個分區。
另外,每個持久化的RDD可以使用不同的存儲級別,比如,你可以把RDD保存在磁盤上,或者以java序列化對象保存到內存裏(為了省空間),或者跨節點多副本,或者使用 Tachyon 存到虛擬機以外的內存裏。這些存儲級別都可以由persist()的參數StorageLevel對象來控製。cache() 方法本身就是一個使用默認存儲級別做持久化的快捷方式,默認存儲級別是 StorageLevel.MEMORY_ONLY(以java序列化方式存到內存裏)。完整的存儲級別列表如下:
存儲級別 | 含義 |
---|---|
MEMORY_ONLY | 以未序列化的Java對象形式將RDD存儲在JVM內存中。如果RDD不能全部裝進內存,那麼將一部分分區緩存,而另一部分分區將每次用到時重新計算。這個是Spark的RDD的默認存儲級別。 |
MEMORY_AND_DISK | 以未序列化的Java對象形式存儲RDD在JVM中。如果RDD不能全部裝進內存,則將不能裝進內存的分區放到磁盤上,然後每次用到的時候從磁盤上讀取。 |
MEMORY_ONLY_SER | 以序列化形式存儲RDD(每個分區一個字節數組)。通常這種方式比未序列化存儲方式要更省空間,尤其是如果你選用了一個比較好的序列化協議(fast serializer),但是這種方式也相應的會消耗更多的CPU來讀取數據。 |
MEMORY_AND_DISK_SER | 和MEMORY_ONLY_SER類似,隻是當內存裝不下的時候,會將分區的數據吐到磁盤上,而不是每次用到都重新計算。 |
DISK_ONLY | RDD數據隻存儲於磁盤上。 |
MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc. | 和上麵沒有”_2″的級別相對應,隻不過每個分區數據會在兩個節點上保存兩份副本。 |
OFF_HEAP (實驗性的) | 將RDD以序列化格式保存到Tachyon。與MEMORY_ONLY_SER相比,OFF_HEAP減少了垃圾回收開銷,並且使執行器(executor)進程更小且可以共用同一個內存池,這一特性在需要大量消耗內存和多Spark應用並發的場景下比較吸引人。而且,因為RDD存儲於Tachyon中,所以一個執行器掛了並不會導致數據緩存的丟失。這種模式下Tachyon 的內存是可丟棄的。因此,Tachyon並不會重建一個它逐出內存的block。如果你打算用Tachyon做為堆外存儲,Spark和Tachyon具有開箱即用的兼容性。請參考這裏,有建議使用的Spark和Tachyon的匹配版本對:page。 |
注意:在Python中存儲的對象總是會使用 Pickle 做序列化,所以這時是否選擇一個序列化級別已經無關緊要了。
Spark會自動持久化一些混洗操作(如:reduceByKey)的中間數據,即便用戶根本沒有調用persist。這麼做是為了避免一旦有一個節點在混洗過程中失敗,就要重算整個輸入數據。當然,我們還是建議對需要重複使用的RDD調用其persist算子。
如何選擇存儲級別?
Spark的存儲級別主要可於在內存使用和CPU占用之間做一些權衡。建議根據以下步驟來選擇一個合適的存儲級別:
- 如果RDD能使用默認存儲級別(MEMORY_ONLY),那就盡量使用默認級別。這是CPU效率最高的方式,所有RDD算子都能以最快的速度運行。
- 如果步驟1的答案是否(不適用默認級別),那麼可以嚐試MEMORY_ONLY_SER級別,並選擇一個高效的序列化協議(selecting a fast serialization library),這回大大節省數據對象的存儲空間,同時速度也還不錯。
- 盡量不要把數據吐到磁盤上,除非:1.你的數據集重新計算的代價很大;2.你的數據集是從一個很大的數據源中過濾得到的結果。否則的話,重算一個分區的速度很可能和從磁盤上讀取差不多。
- 如果需要支持容錯,可以考慮使用帶副本的存儲級別(例如:用Spark來服務web請求)。所有的存儲級別都能夠以重算丟失數據的方式來提供容錯性,但是帶副本的存儲級別可以讓你的應用持續的運行,而不必等待重算丟失的分區。
- 在一些需要大量內存或者並行多個應用的場景下,實驗性的OFF_HEAP會有以下幾個優勢:
- 這個級別下,可以允許多個執行器共享同一個Tachyon中內存池。
- 可以有效地減少垃圾回收的開銷。
- 即使單個執行器掛了,緩存數據也不會丟失。
刪除數據
Spark能夠自動監控各個節點上緩存使用率,並且以LRU(最近經常使用)的方式將老數據逐出內存。如果你更喜歡手動控製的話,可以用RDD.unpersist() 方法來刪除無用的緩存。
共享變量
一般而言,當我們給Spark算子(如 map 或 reduce)傳遞一個函數時,這些函數將會在遠程的集群節點上運行,並且這些函數所引用的變量都是各個節點上的獨立副本。這些變量都會以副本的形式複製到各個機器節點上,如果更新這些變量副本的話,這些更新並不會傳回到驅動器(driver)程序。通常來說,支持跨任務的可讀寫共享變量是比較低效的。不過,Spark還是提供了兩種比較通用的共享變量:廣播變量和累加器。
廣播變量
廣播變量提供了一種隻讀的共享變量,它是把在每個機器節點上保存一個緩存,而不是每個任務保存一份副本。通常可以用來在每個節點上保存一個較大的輸入數據集,這要比常規的變量副本更高效(一般的變量是每個任務一個副本,一個節點上可能有多個任務)。Spark還會嚐試使用高效的廣播算法來分發廣播變量,以減少通信開銷。
Spark的操作有時會有多個階段(stage),不同階段之間的分割線就是混洗操作。Spark會自動廣播各個階段用到的公共數據。這些方式廣播的數據都是序列化過的,並且在運行各個任務前需要反序列化。這也意味著,顯示地創建廣播變量,隻有在跨多個階段(stage)的任務需要同樣的數據 或者 緩存數據的序列化和反序列化格式很重要的情況下 才是必須的。
廣播變量可以通過一個變量v來創建,隻需調用 SparkContext.broadcast(v)即可。這個廣播變量是對變量v的一個包裝,要訪問其值,可以調用廣播變量的 value 方法。代碼示例如下:
scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)
scala> broadcastVar.value
res0: Array[Int] = Array(1, 2, 3)
廣播變量創建之後,集群中任何函數都不應該再使用原始變量v,這樣才能保證v不會被多次複製到同一個節點上。另外,對象v在廣播後不應該再被更新,這樣才能保證所有節點上拿到同樣的值(例如,更新後,廣播變量又被同步到另一新節點,新節點有可能得到的值和其他節點不一樣)。
累加器
累加器是一種隻支持滿足結合律的“累加”操作的變量,因此它可以很高效地支持並行計算。利用累加器可以實現計數(類似MapReduce中的計數器)或者求和。Spark原生支持了數字類型的累加器,開發者也可以自定義新的累加器。如果創建累加器的時候給了一個名字,那麼這個名字會展示在Spark UI上,這對於了解程序運行處於哪個階段非常有幫助(注意:Python尚不支持該功能)。
創捷累加器時需要賦一個初始值v,調用 SparkContext.accumulator(v) 可以創建一個累加器。後續集群中運行的任務可以使用 add 方法 或者 += 操作符 (僅Scala和Python支持)來進行累加操作。不過,任務本身並不能讀取累加器的值,隻有驅動器程序可以用 value 方法訪問累加器的值。
以下代碼展示了如何使用累加器對一個元素數組求和:
scala> val accum = sc.accumulator(0, "My Accumulator")
accum: spark.Accumulator[Int] = 0
scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)
...
10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s
scala> accum.value
res2: Int = 10
以上代碼使用了Spark內建支持的Int型累加器,開發者也可以通過子類化 AccumulatorParam 來自定義累加器。累加器接口(AccumulatorParam )主要有兩個方法:1. zero:這個方法為累加器提供一個“零值”,2.addInPlace 將收到的兩個參數值進行累加。例如,假設我們需要為Vector提供一個累加機製,那麼可能的實現方式如下:
object VectorAccumulatorParam extends AccumulatorParam[Vector] {
def zero(initialValue: Vector): Vector = {
Vector.zeros(initialValue.size)
}
def addInPlace(v1: Vector, v2: Vector): Vector = {
v1 += v2
}
}
// Then, create an Accumulator of this type:
val vecAccum = sc.accumulator(new Vector(...))(VectorAccumulatorParam)
如果使用Scala,Spark還支持幾種更通用的接口:1.Accumulable,這個接口可以支持所累加的數據類型與結果類型不同(如:構建一個收集元素的list);2.SparkContext.accumulableCollection 方法可以支持常用的Scala集合類型。
對於在action算子中更新的累加器,Spark保證每個任務對累加器的更新隻會被應用一次,例如,某些任務如果重啟過,則不會再次更新累加器。而如果在transformation算子中更新累加器,那麼用戶需要注意,一旦某個任務因為失敗被重新執行,那麼其對累加器的更新可能會實施多次。
累加器並不會改變Spark懶惰求值的運算模型。如果在RDD算子中更新累加器,那麼其值隻會在RDD做action算子計算的時候被更新一次。因此,在transformation算子(如:map)中更新累加器,其值並不能保證一定被更新。以下代碼片段說明了這一特性:
val accum = sc.accumulator(0)
data.map { x => accum += x; f(x) }
// 這裏,accum任然是0,因為沒有action算子,所以map
也不會進行實際的計算
部署到集群
應用提交指南(application submission guide)中描述了如何向集群提交應用。換句話說,就是你需要把你的應用打包成 JAR文件(Java/Scala)或者一係列 .py 或 .zip 文件(Python),然後再用 bin/spark-submit 腳本將其提交給Spark所支持的集群管理器。
從Java/Scala中啟動Spark作業
org.apache.spark.launcher 包提供了簡明的Java API,可以將Spark作業作為子進程啟動。
單元測試
Spark對所有常見的單元測試框架提供友好的支持。你隻需要在測試中創建一個SparkContext對象,然後吧master URL設為local,運行測試操作,最後調用 SparkContext.stop() 來停止測試。注意,一定要在 finally 代碼塊或者單元測試框架的 tearDown方法裏調用SparkContext.stop(),因為Spark不支持同一程序中有多個SparkContext對象同時運行。
從1.0之前版本遷移過來
Spark 1.0 凍結了Spark Core 1.x 係列的核心API,隻要是沒有標記為 “experimental” 或者 “developer API”的API,在未來的版本中會一直支持。對於Scala用戶來說,唯一的變化就是分組相關的算子,如:groupByKey, cogroup, join,這些算子的返回類型由 (Key, Seq[Value]) 變為 (Key, Iterable[Value])。
更詳細遷移向導請參考這裏:Spark Streaming, MLlib 以及 GraphX.
下一步
你可以去Spark的官網上看看示例程序(example Spark programs)。另外,Spark代碼目錄下也自帶了不少例子,見 examples 目錄(最後更新:2017-05-19 16:38:07