閱讀334 返回首頁    go 阿裏雲 go 技術社區[雲棲]


Spark Streaming 不同Batch任務可以並行計算麼?

關於Spark Streaming中的任務有如下幾個概念:
  • Batch
  • Job
  • Stage
  • Task
其實Job,Stage,Task都是Spark Core裏就有的概念,Batch則是Streaming特有的概念。同一Stage裏的Task一般都是並行的。同一Job裏的Stage可以並行,但是一般如果有依賴則是串行,可以參考我這篇文章Spark 多個Stage執行是串行執行的麼?

Job的並行度複雜些,由兩個配置決定:
  1. spark.scheduler.mode(FIFO/FAIR)
  2. spark.streaming.concurrentJobs
我們知道一個Batch可能會有多個Action執行,比如你注冊了多個Kafka數據流,每個Action都會產生一個Job,所以一個Batch有可能是一批Job,也就是JobSet的概念,這些Job由jobExecutor依次提交執行,而JobExecutor是一個默認池子大小為1的線程池,所以隻能執行完一個Job再執行另外一個Job。這裏說的池子,他的大小就是由spark.streaming.concurrentJobs 控製的。

concurrentJobs 其實決定了向Spark Core提交Job的並行度。提交一個Job,必須等這個執行完了,才會提交第二個。假設我們把它設置為2,則會並發的把Job提交給Spark Core,Spark 有自己的機製決定如何運行這兩個Job,這個機製其實就是FIFO或者FAIR(決定了資源的分配規則)。默認是FIFO,也就是先進先出,你把concurrentJobs設置為2,但是如果底層是FIFO,那麼會優先執行先提交的Job,雖然如此,如果資源夠兩個job運行,還是會並行運行兩個Job。

我們搞個例子來論證下上麵的結論:

object JobTest {

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setAppName("test")
    conf.setMaster("local[2]")
    conf.set("spark.streaming.concurrentJobs", "2")
    val sc = new StreamingContext(conf, Seconds(10))

    val input = new TestInputStream[String](sc, Seq(Seq("1", "2", "3"), Seq("1", "2", "3"), Seq("1", "2", "3")), 2)
    val input2 = new TestInputStream[String](sc, Seq(Seq("1", "2", "3"), Seq("1", "2", "3"), Seq("1", "2", "3")), 2)

    input.map{f=>
      Thread.sleep(5000)
      f
    }.foreachRDD(f=>f.count())

    input2.map{f=>
      Thread.sleep(5000)
      f
    }.foreachRDD(f=>f.count())

    sc.start()
    sc.awaitTermination()

  }
}
源碼github地址

上麵的TestInputStream的簽名如下:
class TestInputStream[T: ClassTag](_ssc: StreamingContext, input: Seq[Seq[T]], numPartitions: Int)
  extends InputDStream[T](_ssc) {
所以TestInputStream其實就是我Mock的一個數據源,最後numPartitions表示的是分區數。這裏,我們把concurrentJobs設置為2,意味著TaskScheduler接受到了兩個Job,然後setMaster[local(2)]表示隻可以並發執行兩個Task。

因為input,input1每個batch至少都有3個元素,每個元素需要運行5秒,所以有一個task需要運行兩個元素,那麼第一次input1需要運行10秒。input1在運行五秒後,空出了一個線程,這個時候input的job開始運行,到第十秒的時候,input1完成,input開始運行也已經完成一個元素的計算,這個時候啟動另外兩個元素運行。所以input1花了10秒,input花了15秒,但是因為input被延時了五秒才得以運行,所以input1其實相當於花了20秒。

這裏你會好奇,為啥我先聲明的input,接著再申明的input1,但是input1卻先運行呢?因為這兩個數據源對應的job是被並發提交的,有一定的隨機性。如果你多啟動幾次,你會發現input對應job id有可能是0,也有可能是1。

還有兩點值的注意的是:
  1. job id的產生是在job提交的時候才產生,而不是job在產生的時候生成的。
  2. job被提交後會直接進入Scheduler的pool,在scheduler給你分配資源的時候,雖然說FIFO是先按job id 小的優先處理,但是job id大的先進來,在分配資源的時候,小的還沒進來呢,所以job id 大的可能被優先執行了。

上麵的流程解說解釋的是下麵這張圖:
e4e4e81e2c7639286fecb4bf99d4aa137a6f7912

接著呢,input2在剩下兩條記錄處理的10秒過程中,其實第二個周期已經開始了,input的任務又得以開始運行,這個時候因為隻有一個線程可以用,所以運行了兩個元素,input1處理完成,空出線程,第二個周期的input1繼續調度,input的剩下的一個元素也繼續運行,最後input,input1都花了15秒。

f6b5d18db968677665a4e6de429a184c157936a9

有點繞,如果大家迷惑,可以把代碼貼在自己的IDE上運行一下,然後觀察他們的交錯時間。
如果我們再做個調整:

conf.setMaster("local[4]")
    conf.set("spark.streaming.concurrentJobs", "3")
    conf.set("spark.scheduler.mode", "FIFO")
    val sc = new StreamingContext(conf, Seconds(5))
你會發現,不同batch的job其實也可以並行運行的,這裏需要有幾個條件:
  1. 有延時發生了,batch無法在本batch完成
  2. concurrentJobs > 1
  3. 如果scheduler mode 是FIFO則需要某個Job無法一直消耗掉所有資源

Mode是FAIR則盡力保證你的Job是並行運行的,毫無疑問是可以並行的。

回到我們的標題,不同Batch的job有可能會同時在運行麼,隻要滿足我前麵提到的三個條件,就有可能。

最後更新:2017-04-01 17:13:51

  上一篇:go 使用StreamingPro 快速構建Spark SQL on CarbonData
  下一篇:go 3月31日雲棲精選夜讀:數據科學谘詢:想要轉型毫無頭緒?看了本文你不慌