Spark Streaming 不同Batch任務可以並行計算麼?
關於Spark Streaming中的任務有如下幾個概念:- Batch
- Job
- Stage
- Task
Job的並行度複雜些,由兩個配置決定:
- spark.scheduler.mode(FIFO/FAIR)
- spark.streaming.concurrentJobs
concurrentJobs 其實決定了向Spark Core提交Job的並行度。提交一個Job,必須等這個執行完了,才會提交第二個。假設我們把它設置為2,則會並發的把Job提交給Spark Core,Spark 有自己的機製決定如何運行這兩個Job,這個機製其實就是FIFO或者FAIR(決定了資源的分配規則)。默認是FIFO,也就是先進先出,你把concurrentJobs設置為2,但是如果底層是FIFO,那麼會優先執行先提交的Job,雖然如此,如果資源夠兩個job運行,還是會並行運行兩個Job。
我們搞個例子來論證下上麵的結論:
object JobTest {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setAppName("test")
conf.setMaster("local[2]")
conf.set("spark.streaming.concurrentJobs", "2")
val sc = new StreamingContext(conf, Seconds(10))
val input = new TestInputStream[String](sc, Seq(Seq("1", "2", "3"), Seq("1", "2", "3"), Seq("1", "2", "3")), 2)
val input2 = new TestInputStream[String](sc, Seq(Seq("1", "2", "3"), Seq("1", "2", "3"), Seq("1", "2", "3")), 2)
input.map{f=>
Thread.sleep(5000)
f
}.foreachRDD(f=>f.count())
input2.map{f=>
Thread.sleep(5000)
f
}.foreachRDD(f=>f.count())
sc.start()
sc.awaitTermination()
}
}
源碼github地址上麵的TestInputStream的簽名如下:
class TestInputStream[T: ClassTag](_ssc: StreamingContext, input: Seq[Seq[T]], numPartitions: Int)
extends InputDStream[T](_ssc) {
所以TestInputStream其實就是我Mock的一個數據源,最後numPartitions表示的是分區數。這裏,我們把concurrentJobs設置為2,意味著TaskScheduler接受到了兩個Job,然後setMaster[local(2)]表示隻可以並發執行兩個Task。因為input,input1每個batch至少都有3個元素,每個元素需要運行5秒,所以有一個task需要運行兩個元素,那麼第一次input1需要運行10秒。input1在運行五秒後,空出了一個線程,這個時候input的job開始運行,到第十秒的時候,input1完成,input開始運行也已經完成一個元素的計算,這個時候啟動另外兩個元素運行。所以input1花了10秒,input花了15秒,但是因為input被延時了五秒才得以運行,所以input1其實相當於花了20秒。
這裏你會好奇,為啥我先聲明的input,接著再申明的input1,但是input1卻先運行呢?因為這兩個數據源對應的job是被並發提交的,有一定的隨機性。如果你多啟動幾次,你會發現input對應job id有可能是0,也有可能是1。
還有兩點值的注意的是:
- job id的產生是在job提交的時候才產生,而不是job在產生的時候生成的。
- job被提交後會直接進入Scheduler的pool,在scheduler給你分配資源的時候,雖然說FIFO是先按job id 小的優先處理,但是job id大的先進來,在分配資源的時候,小的還沒進來呢,所以job id 大的可能被優先執行了。
上麵的流程解說解釋的是下麵這張圖:
接著呢,input2在剩下兩條記錄處理的10秒過程中,其實第二個周期已經開始了,input的任務又得以開始運行,這個時候因為隻有一個線程可以用,所以運行了兩個元素,input1處理完成,空出線程,第二個周期的input1繼續調度,input的剩下的一個元素也繼續運行,最後input,input1都花了15秒。
有點繞,如果大家迷惑,可以把代碼貼在自己的IDE上運行一下,然後觀察他們的交錯時間。
如果我們再做個調整:
conf.setMaster("local[4]")
conf.set("spark.streaming.concurrentJobs", "3")
conf.set("spark.scheduler.mode", "FIFO")
val sc = new StreamingContext(conf, Seconds(5))
你會發現,不同batch的job其實也可以並行運行的,這裏需要有幾個條件:- 有延時發生了,batch無法在本batch完成
- concurrentJobs > 1
- 如果scheduler mode 是FIFO則需要某個Job無法一直消耗掉所有資源
Mode是FAIR則盡力保證你的Job是並行運行的,毫無疑問是可以並行的。
回到我們的標題,不同Batch的job有可能會同時在運行麼,隻要滿足我前麵提到的三個條件,就有可能。
最後更新:2017-04-01 17:13:51