閱讀658 返回首頁    go 阿裏雲 go 技術社區[雲棲]


《Spark官方文檔》Spark Streaming編程指南(二)

累加器和廣播變量

首先需要注意的是,累加器(Accumulators)和廣播變量(Broadcast variables)是無法從Spark Streaming的檢查點中恢複回來的。所以如果你開啟了檢查點功能,並同時在使用累加器和廣播變量,那麼你最好是使用懶惰實例化的單例模式,因為這樣累加器和廣播變量才能在驅動器(driver)故障恢複後重新實例化。代碼示例如下:

object WordBlacklist {

  @volatile private var instance: Broadcast[Seq[String]] = null

  def getInstance(sc: SparkContext): Broadcast[Seq[String]] = {
    if (instance == null) {
      synchronized {
        if (instance == null) {
          val wordBlacklist = Seq("a", "b", "c")
          instance = sc.broadcast(wordBlacklist)
        }
      }
    }
    instance
  }
}

object DroppedWordsCounter {

  @volatile private var instance: Accumulator[Long] = null

  def getInstance(sc: SparkContext): Accumulator[Long] = {
    if (instance == null) {
      synchronized {
        if (instance == null) {
          instance = sc.accumulator(0L, "WordsInBlacklistCounter")
        }
      }
    }
    instance
  }
}

wordCounts.foreachRDD((rdd: RDD[(String, Int)], time: Time) => {
  // 獲取現有或注冊新的blacklist廣播變量
  val blacklist = WordBlacklist.getInstance(rdd.sparkContext)
  // 獲取現有或注冊新的 droppedWordsCounter 累加器
  val droppedWordsCounter = DroppedWordsCounter.getInstance(rdd.sparkContext)
  // 基於blacklist來過濾詞,並將過濾掉的詞的個數累加到 droppedWordsCounter 中
  val counts = rdd.filter { case (word, count) =>
    if (blacklist.value.contains(word)) {
      droppedWordsCounter += count
      false
    } else {
      true
    }
  }.collect()
  val output = "Counts at time " + time + " " + counts
})

這裏有完整代碼:source code


DataFrame和SQL相關算子

在Streaming應用中可以調用DataFrames and SQL來處理流式數據。開發者可以用通過StreamingContext中的SparkContext對象來創建一個SQLContext,並且,開發者需要確保一旦驅動器(driver)故障恢複後,該SQLContext對象能重新創建出來。同樣,你還是可以使用懶惰創建的單例模式來實例化SQLContext,如下麵的代碼所示,這裏我們將最開始的那個小栗子做了一些修改,使用DataFrame和SQL來統計單詞計數。其實就是,將每個RDD都轉化成一個DataFrame,然後注冊成臨時表,再用SQL查詢這些臨時表。

/** streaming應用中調用DataFrame算子 */

val words: DStream[String] = ...

words.foreachRDD { rdd =>

  // 獲得SQLContext單例
  val sqlContext = SQLContext.getOrCreate(rdd.sparkContext)
  import sqlContext.implicits._

  // 將RDD[String] 轉為 DataFrame
  val wordsDataFrame = rdd.toDF("word")

  // DataFrame注冊為臨時表
  wordsDataFrame.registerTempTable("words")

  // 再用SQL語句查詢,並打印出來
  val wordCountsDataFrame = 
    sqlContext.sql("select word, count(*) as total from words group by word")
  wordCountsDataFrame.show()
}

See the full source code.

這裏有完整代碼:source code

你也可以在其他線程裏執行SQL查詢(異步查詢,即:執行SQL查詢的線程和運行StreamingContext的線程不同)。不過這種情況下,你需要確保查詢的時候 StreamingContext 沒有把所需的數據丟棄掉,否則StreamingContext有可能已將老的RDD數據丟棄掉了,那麼異步查詢的SQL語句也可能無法得到查詢結果。舉個栗子,如果你需要查詢上一個批次的數據,但是你的SQL查詢可能要執行5分鍾,那麼你就需要StreamingContext至少保留最近5分鍾的數據:streamingContext.remember(Minutes(5)) (這是Scala為例,其他語言差不多)

更多DataFrame和SQL的文檔見這裏: DataFrames and SQL


MLlib算子

MLlib 提供了很多機器學習算法。首先,你需要關注的是流式計算相關的機器學習算法(如:Streaming Linear RegressionStreaming KMeans),這些流式算法可以在流式數據上一邊學習訓練模型,一邊用最新的模型處理數據。除此以外,對更多的機器學習算法而言,你需要離線訓練這些模型,然後將訓練好的模型用於在線的流式數據。詳見MLlib


緩存/持久化

和RDD類似,DStream也支持將數據持久化到內存中。隻需要調用 DStream的persist() 方法,該方法內部會自動調用DStream中每個RDD的persist方法進而將數據持久化到內存中。這對於可能需要計算很多次的DStream非常有用(例如:對於同一個批數據調用多個算子)。對於基於滑動窗口的算子,如:reduceByWindow和reduceByKeyAndWindow,或者有狀態的算子,如:updateStateByKey,數據持久化就更重要了。因此,滑動窗口算子產生的DStream對象默認會自動持久化到內存中(不需要開發者調用persist)。

對於從網絡接收數據的輸入數據流(如:Kafka、Flume、socket等),默認的持久化級別會將數據持久化到兩個不同的節點上互為備份副本,以便支持容錯。

注意,與RDD不同的是,DStream的默認持久化級別是將數據序列化到內存中。進一步的討論見性能調優這一小節。關於持久化級別(或者存儲級別)的更詳細說明見Spark編程指南(Spark Programming Guide)。


檢查點

一般來說Streaming 應用都需要7*24小時長期運行,所以必須對一些與業務邏輯無關的故障有很好的容錯(如:係統故障、JVM崩潰等)。對於這些可能性,Spark Streaming 必須在檢查點保存足夠的信息到一些可容錯的外部存儲係統中,以便能夠隨時從故障中恢複回來。所以,檢查點需要保存以下兩種數據:

  • 元數據檢查點(Metadata checkpointing) – 保存流式計算邏輯的定義信息到外部可容錯存儲係統(如:HDFS)。主要用途是用於在故障後回複應用程序本身(後續詳談)。元數包括:
    • Configuration – 創建Streaming應用程序的配置信息。
    • DStream operations – 定義流式處理邏輯的DStream操作信息。
    • Incomplete batches – 已經排隊但未處理完的批次信息。
  • 數據檢查點(Data checkpointing) – 將生成的RDD保存到可靠的存儲中。這對一些需要跨批次組合數據或者有狀態的算子來說很有必要。在這種轉換算子中,往往新生成的RDD是依賴於前幾個批次的RDD,因此隨著時間的推移,有可能產生很長的依賴鏈條。為了避免在恢複數據的時候需要恢複整個依賴鏈條上所有的數據,檢查點需要周期性地保存一些中間RDD狀態信息,以斬斷無限製增長的依賴鏈條和恢複時間。

總之,元數據檢查點主要是為了恢複驅動器節點上的故障,而數據或RDD檢查點是為了支持對有狀態轉換操作的恢複。

何時啟用檢查點

如果有以下情況出現,你就必須啟用檢查點了:

  • 使用了有狀態的轉換算子(Usage of stateful transformations) – 不管是用了 updateStateByKey 還是用了 reduceByKeyAndWindow(有”反歸約”函數的那個版本),你都必須配置檢查點目錄來周期性地保存RDD檢查點。
  • 支持驅動器故障中恢複(Recovering from failures of the driver running the application) – 這時候需要元數據檢查點以便恢複流式處理的進度信息。

注意,一些簡單的流式應用,如果沒有用到前麵所說的有狀態轉換算子,則完全可以不開啟檢查點。不過這樣的話,驅動器(driver)故障恢複後,有可能會丟失部分數據(有些已經接收但還未處理的數據可能會丟失)。不過通常這點丟失時可接受的,很多Spark Streaming應用也是這樣運行的。對非Hadoop環境的支持未來還會繼續改進。

如何配置檢查點

檢查點的啟用,隻需要設置好保存檢查點信息的檢查點目錄即可,一般會會將這個目錄設為一些可容錯的、可靠性較高的文件係統(如:HDFS、S3等)。開發者隻需要調用 streamingContext.checkpoint(checkpointDirectory)。設置好檢查點,你就可以使用前麵提到的有狀態轉換算子了。另外,如果你需要你的應用能夠支持從驅動器故障中恢複,你可能需要重寫部分代碼,實現以下行為:

  • 如果程序是首次啟動,就需要new一個新的StreamingContext,並定義好所有的數據流處理,然後調用StreamingContext.start()。
  • 如果程序是故障後重啟,就需要從檢查點目錄中的數據中重新構建StreamingContext對象。
 

不過這個行為可以用StreamingContext.getOrCreate來實現,示例如下:

// 首次創建StreamingContext並定義好數據流處理邏輯
def functionToCreateContext(): StreamingContext = {
    val ssc = new StreamingContext(...)   // 新建一個StreamingContext對象
    val lines = ssc.socketTextStream(...) // 創建DStreams
    ...
    ssc.checkpoint(checkpointDirectory)   // 設置好檢查點目錄
    ssc
}

// 創建新的StreamingContext對象,或者從檢查點構造一個
val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)

// 無論是否是首次啟動都需要設置的工作在這裏
context. ...

// 啟動StreamingContext對象
context.start()
context.awaitTermination()

如果 checkpointDirectory 目錄存在,則context對象會從檢查點數據重新構建出來。如果該目錄不存在(如:首次運行),則 functionToCreateContext 函數會被調用,創建一個新的StreamingContext對象並定義好DStream數據流。完整的示例請參見RecoverableNetworkWordCount,這個例子會將網絡數據中的單詞計數統計結果添加到一個文件中。

除了使用getOrCreate之外,開發者還需要確保驅動器進程能在故障後重啟。這一點隻能由應用的部署環境基礎設施來保證。進一步的討論見部署(Deployment)這一節。

另外需要注意的是,RDD檢查點會增加額外的保存數據的開銷。這可能會導致數據流的處理時間變長。因此,你必須仔細的調整檢查點間隔時間。如果批次間隔太小(比如:1秒),那麼對每個批次保存檢查點數據將大大減小吞吐量。另一方麵,檢查點保存過於頻繁又會導致血統信息和任務個數的增加,這同樣會影響係統性能。對於需要RDD檢查點的有狀態轉換算子,默認的間隔是批次間隔的整數倍,且最小10秒。開發人員可以這樣來自定義這個間隔:dstream.checkpoint(checkpointInterval)。一般推薦設為批次間隔時間的5~10倍。


部署應用

本節中將主要討論一下如何部署Spark Streaming應用。

前提條件

要運行一個Spark Streaming 應用,你首先需要具備以下條件:

  • 集群以及集群管理器 – 這是一般Spark應用的基本要求,詳見 deployment guide
  • 給Spark應用打個JAR包 – 你需要將你的應用打成一個JAR包。如果使用spark-submit 提交應用,那麼你不需要提供Spark和Spark Streaming的相關JAR包。但是,如果你使用了高級數據源(advanced sources – 如:Kafka、Flume、Twitter等),那麼你需要將這些高級數據源相關的JAR包及其依賴一起打包並部署。例如,如果你使用了TwitterUtils,那麼就必須將spark-streaming-twitter_2.10及其相關依賴都打到應用的JAR包中。
  • 為執行器(executor)預留足夠的內存 – 執行器必須配置預留好足夠的內存,因為接受到的數據都得存在內存裏。注意,如果某些窗口長度達到10分鍾,那也就是說你的係統必須知道保留10分鍾的數據在內存裏。可見,到底預留多少內存是取決於你的應用處理邏輯的。
  • 配置檢查點 – 如果你的流式應用需要檢查點,那麼你需要配置一個Hadoop API兼容的可容錯存儲目錄作為檢查點目錄,流式應用的信息會寫入這個目錄,故障恢複時會用到這個目錄下的數據。詳見前麵的檢查點小節。
  • 配置驅動程序自動重啟 – 流式應用自動恢複的前提就是,部署基礎設施能夠監控驅動器進程,並且能夠在其故障時,自動重啟之。不同的集群管理器有不同的工具來實現這一功能:
    • Spark獨立部署 – Spark獨立部署集群可以支持將Spark應用的驅動器提交到集群的某個worker節點上運行。同時,Spark的集群管理器可以對該驅動器進程進行監控,一旦驅動器退出且返回非0值,或者因worker節點原始失敗,Spark集群管理器將自動重啟這個驅動器。詳見Spark獨立部署指南(Spark Standalone guide)。
    • YARN – YARN支持和獨立部署類似的重啟機製。詳細請參考YARN的文檔。
    • Mesos – Mesos上需要用Marathon來實現這一功能。
  • 配置WAL(write ahead log)- 從Spark 1.2起,我們引入了write ahead log來提高容錯性。如果啟用這個功能,則所有接收到的數據都會以write ahead log形式寫入配置好的檢查點目錄中。這樣就能確保數據零丟失(容錯語義有詳細的討論)。用戶隻需將 spark.streaming.receiver.writeAheadLog 設為true。不過,這同樣可能會導致接收器的吞吐量下降。不過你可以啟動多個接收器並行接收數據,從而提升整體的吞吐量(more receivers in parallel)。另外,建議在啟用WAL後禁用掉接收數據多副本功能,因為WAL其實已經是存儲在一個多副本存儲係統中了。你隻需要把存儲級別設為 StorageLevel.MEMORY_AND_DISK_SER。如果是使用S3(或者其他不支持flushing的文件係統)存儲WAL,一定要記得啟用這兩個標識:spark.streaming.driver.writeAheadLog.closeFileAfterWrite 和 spark.streaming.receiver.writeAheadLog.closeFileAfterWrite。更詳細請參考: Spark Streaming Configuration
  • 設置好最大接收速率 – 如果集群可用資源不足以跟上接收數據的速度,那麼可以在接收器設置一下最大接收速率,即:每秒接收記錄的條數。相關的主要配置有:spark.streaming.receiver.maxRate,如果使用Kafka Direct API 還需要設置 spark.streaming.kafka.maxRatePerPartition。從Spark 1.5起,我們引入了backpressure的概念來動態地根據集群處理速度,評估並調整該接收速率。用戶隻需將 spark.streaming.backpressure.enabled設為true即可啟用該功能。

升級應用代碼

升級Spark Streaming應用程序代碼,可以使用以下兩種方式:

  • 新的Streaming程序和老的並行跑一段時間,新程序完成初始化以後,再關閉老的。注意,這種方式適用於能同時發送數據到多個目標的數據源(即:數據源同時將數據發給新老兩個Streaming應用程序)。
  • 老程序能夠優雅地退出(參考  StreamingContext.stop(...) or JavaStreamingContext.stop(...) ),即:確保所收到的數據都已經處理完畢後再退出。然後再啟動新的Streaming程序,而新程序將接著在老程序退出點上繼續拉取數據。注意,這種方式需要數據源支持數據緩存(或者叫數據堆積,如:Kafka、Flume),因為在新舊程序交接的這個空檔時間,數據需要在數據源處緩存。目前還不能支持從檢查點重啟,因為檢查點存儲的信息包含老程序中的序列化對象信息,在新程序中將其反序列化可能會出錯。這種情況下,隻能要麼指定一個新的檢查點目錄,要麼刪除老的檢查點目錄。

應用監控

除了Spark自身的監控能力(monitoring capabilities)之外,對Spark Streaming還有一些額外的監控功能可用。如果實例化了StreamingContext,那麼你可以在Spark web UI上看到多出了一個Streaming tab頁,上麵顯示了正在運行的接收器(是否活躍,接收記錄的條數,失敗信息等)和處理完的批次信息(批次處理時間,查詢延時等)。這些信息都可以用來監控streaming應用。

web UI上有兩個度量特別重要:

  • 批次處理耗時(Processing Time) – 處理單個批次耗時
  • 批次調度延時(Scheduling Delay) -各批次在隊列中等待時間(等待上一個批次處理完)

如果批次處理耗時一直比批次間隔時間大,或者批次調度延時持續上升,就意味著係統處理速度跟不上數據接收速度。這時候你就得考慮一下怎麼把批次處理時間降下來(reducing)。

Spark Streaming程序的處理進度可以用StreamingListener接口來監聽,這個接口可以監聽到接收器的狀態和處理時間。不過需要注意的是,這是一個developer API接口,換句話說這個接口未來很可能會變動(可能會增加更多度量信息)。



性能調優

要獲得Spark Streaming應用的最佳性能需要一點點調優工作。本節將深入解釋一些能夠改進Streaming應用性能的配置和參數。總體上來說,你需要考慮這兩方麵的事情:

  1. 提高集群資源利用率,減少單批次處理耗時。
  2. 設置合適的批次大小,以便使數據處理速度能跟上數據接收速度。

減少批次處理時間

有不少優化手段都可以減少Spark對每個批次的處理時間。細節將在優化指南(Tuning Guide)中詳談。這裏僅列舉一些最重要的。

數據接收並發度

跨網絡接收數據(如:從Kafka、Flume、socket等接收數據)需要在Spark中序列化並存儲數據。

如果接收數據的過程是係統瓶頸,那麼可以考慮增加數據接收的並行度。注意,每個輸入DStream隻包含一個單獨的接收器(receiver,運行約worker節點),每個接收器單獨接收一路數據流。所以,配置多個輸入DStream就能從數據源的不同分區分別接收多個數據流。例如,可以將從Kafka拉取兩個topic的數據流分成兩個Kafka輸入數據流,每個數據流拉取其中一個topic的數據,這樣一來會同時有兩個接收器並行地接收數據,因而增加了總體的吞吐量。同時,另一方麵我們又可以把這些DStream數據流合並成一個,然後可以在合並後的DStream上使用任何可用的transformation算子。示例代碼如下:

val numStreams = 5
val kafkaStreams = (1 to numStreams).map { i => KafkaUtils.createStream(...) }
val unifiedStream = streamingContext.union(kafkaStreams)
unifiedStream.print()

另一個可以考慮優化的參數就是接收器的阻塞間隔,該參數由配置參數(configuration parameter)spark.streaming.blockInterval決定。大多數接收器都會將數據合並成一個個數據塊,然後再保存到spark內存中。對於map類算子來說,每個批次中數據塊的個數將會決定處理這批數據並行任務的個數,每個接收器每批次數據處理任務數約等於 (批次間隔 / 數據塊間隔)。例如,對於2秒的批次間隔,如果數據塊間隔為200ms,則創建的並發任務數為10。如果任務數太少(少於單機cpu core個數),則資源利用不夠充分。如需增加這個任務數,對於給定的批次間隔來說,隻需要減少數據塊間隔即可。不過,我們還是建議數據塊間隔至少要50ms,否則任務的啟動開銷占比就太高了。

另一個切分接收數據流的方法是,顯示地將輸入數據流劃分為多個分區(使用 inputStream.repartition(<number of partitions>))。該操作會在處理前,將數據散開重新分發到集群中多個節點上。

數據處理並發度

在計算各個階段(stage)中,任何一個階段的並發任務數不足都有可能造成集群資源利用率低。例如,對於reduce類的算子,如:reduceByKey 和 reduceByKeyAndWindow,其默認的並發任務數是由 spark.default.parallelism 決定的。你既可以修改這個默認值(spark.default.parallelism),也可以通過參數指定這個並發數量(見PairDStreamFunctions)。

數據序列化

調整數據的序列化格式可以大大減少數據序列化的開銷。在spark Streaming中主要有兩種類型的數據需要序列化:

  • 輸入數據: 默認地,接收器收到的數據是以 StorageLevel.MEMORY_AND_DISK_SER_2 的存儲級別存儲到執行器(executor)內存中的。也就是說,收到的數據會被序列化以減少GC開銷,同時保存兩個副本以容錯。同時,數據會優先保存在內存裏,當內存不足時才吐出到磁盤上。很明顯,這個過程中會有數據序列化的開銷 – 接收器首先將收到的數據反序列化,然後再以spark所配置指定的格式來序列化數據。
  • Streaming算子所生產的持久化的RDDs: Streaming計算所生成的RDD可能會持久化到內存中。例如,基於窗口的算子會將數據持久化到內存,因為窗口數據可能會多次處理。所不同的是,spark core默認用 StorageLevel.MEMORY_ONLY 級別持久化RDD數據,而spark streaming默認使用StorageLevel.MEMORY_ONLY_SER 級別持久化接收到的數據,以便盡量減少GC開銷。

不管是上麵哪一種數據,都可以使用Kryo序列化來減少CPU和內存開銷,詳見Spark Tuning Guide。另,對於Kryo,你可以考慮這些優化:注冊自定義類型,禁用對象引用跟蹤(詳見Configuration Guide)。

在一些特定的場景下,如果數據量不是很大,那麼你可以考慮不用序列化格式,不過你需要注意的是取消序列化是否會導致大量的GC開銷。例如,如果你的批次間隔比較短(幾秒)並且沒有使用基於窗口的算子,這種情況下你可以考慮禁用序列化格式。這樣可以減少序列化的CPU開銷以優化性能,同時GC的增長也不多。

任務啟動開銷

如果每秒啟動的任務數過多(比如每秒50個以上),那麼將任務發送給slave節點的開銷會明顯增加,那麼你也就很難達到亞秒級(sub-second)的延遲。不過以下兩個方法可以減少任務的啟動開銷:

  • 任務序列化(Task Serialization): 使用Kryo來序列化任務,以減少任務本身的大小,從而提高發送任務的速度。任務的序列化格式是由 spark.closure.serializer 屬性決定的。不過,目前還不支持閉包序列化,未來的版本可能會增加對此的支持。
  • 執行模式(Execution mode): Spark獨立部署或者Mesos粗粒度模式下任務的啟動時間比Mesos細粒度模式下的任務啟動時間要短。詳見Running on Mesos guide

這些調整有可能能夠減少100ms的批次處理時間,這也使得亞秒級的批次間隔成為可能。


設置合適的批次間隔

要想streaming應用在集群上穩定運行,那麼係統處理數據的速度必須能跟上其接收數據的速度。換句話說,批次數據的處理速度應該和其生成速度一樣快。對於特定的應用來說,可以從其對應的監控(monitoring)頁麵上觀察驗證,頁麵上顯示的處理耗時應該要小於批次間隔時間。

根據spark streaming計算的性質,在一定的集群資源限製下,批次間隔的值會極大地影響係統的數據處理能力。例如,在WordCountNetwork示例中,對於特定的數據速率,一個係統可能能夠在批次間隔為2秒時跟上數據接收速度,但如果把批次間隔改為500毫秒係統可能就處理不過來了。所以,批次間隔需要謹慎設置,以確保生產係統能夠處理得過來。

要找出適合的批次間隔,你可以從一個比較保守的批次間隔值(如5~10秒)開始測試。要驗證係統是否能跟上當前的數據接收速率,你可能需要檢查一下端到端的批次處理延遲(可以看看Spark驅動器log4j日誌中的Total delay,也可以用StreamingListener接口來檢測)。如果這個延遲能保持和批次間隔差不多,那麼係統基本就是穩定的。否則,如果這個延遲持久在增長,也就是說係統跟不上數據接收速度,那也就意味著係統不穩定。一旦係統文檔下來後,你就可以嚐試提高數據接收速度,或者減少批次間隔值。不過需要注意,瞬間的延遲增長可以隻是暫時的,隻要這個延遲後續會自動降下來就沒有問題(如:降到小於批次間隔值)


內存調優

Spark應用內存占用和GC調優已經在調優指南(Tuning Guide)中有詳細的討論。牆裂建議你讀一讀那篇文檔。本節中,我們隻是討論一下幾個專門用於Spark Streaming的調優參數。

Spark Streaming應用在集群中占用的內存量嚴重依賴於具體所使用的tranformation算子。例如,如果想要用一個窗口算子操縱最近10分鍾的數據,那麼你的集群至少需要在內存裏保留10分鍾的數據;另一個例子是updateStateByKey,如果key很多的話,相對應的保存的key的state也會很多,而這些都需要占用內存。而如果你的應用隻是做一個簡單的 “映射-過濾-存儲”(map-filter-store)操作的話,那需要的內存就很少了。

一般情況下,streaming接收器接收到的數據會以 StorageLevel.MEMORY_AND_DISK_SER_2 這個存儲級別存到spark中,也就是說,如果內存裝不下,數據將被吐到磁盤上。數據吐到磁盤上會大大降低streaming應用的性能,因此還是建議根據你的應用處理的數據量,提供充足的內存。最好就是,一邊小規模地放大內存,再觀察評估,然後再放大,再評估。

另一個內存調優的方向就是垃圾回收。因為streaming應用往往都需要低延遲,所以肯定不希望出現大量的或耗時較長的JVM垃圾回收暫停。

以下是一些能夠幫助你減少內存占用和GC開銷的參數或手段:

  • DStream持久化級別(Persistence Level of DStreams): 前麵數據序列化(Data Serialization)這小節已經提到過,默認streaming的輸入RDD會被持久化成序列化的字節流。相對於非序列化數據,這樣可以減少內存占用和GC開銷。如果啟用Kryo序列化,還能進一步減少序列化數據大小和內存占用量。如果你還需要進一步減少內存占用的話,可以開啟數據壓縮(通過spark.rdd.compress這個配置設定),隻不過數據壓縮會增加CPU消耗。
  • 清除老數據(Clearing old data): 默認情況下,所有的輸入數據以及DStream的transformation算子產生的持久化RDD都是自動清理的。Spark Streaming會根據所使用的transformation算子來清理老數據。例如,你用了一個窗口操作處理最近10分鍾的數據,那麼Spark Streaming會保留至少10分鍾的數據,並且會主動把更早的數據都刪掉。當然,你可以設置 streamingContext.remember 以保留更長時間段的數據(比如:你可能會需要交互式地查詢更老的數據)。
  • CMS垃圾回收器(CMS Garbage Collector): 為了盡量減少GC暫停的時間,我們牆裂建議使用CMS垃圾回收器(concurrent mark-and-sweep GC)。雖然CMS GC會稍微降低係統的總體吞吐量,但我們仍建議使用它,因為CMS GC能使批次處理的時間保持在一個比較恒定的水平上。最後,你需要確保在驅動器(通過spark-submit中的–driver-java-options設置)和執行器(使用spark.executor.extraJavaOptions配置參數)上都設置了CMS GC。
  • 其他提示: 如果還想進一步減少GC開銷,以下是更進一步的可以嚐試的手段:
    • 配合Tachyon使用堆外內存來持久化RDD。詳見Spark編程指南(Spark Programming Guide
    • 使用更多但是更小的執行器進程。這樣GC壓力就會分散到更多的JVM堆中。


容錯語義

本節中,我們將討論Spark Streaming應用在出現失敗時的具體行為。

背景

要理解Spark Streaming所提供的容錯語義,我們首先需要回憶一下Spark RDD所提供的基本容錯語義。

  1. RDD是不可變的,可重算的,分布式數據集。每個RDD都記錄了其創建算子的血統信息,其中每個算子都以可容錯的數據集作為輸入數據。
  2. 如果RDD的某個分區因為節點失效而丟失,則該分區可以根據RDD的血統信息以及相應的原始輸入數據集重新計算出來。
  3. 假定所有RDD transformation算子計算過程都是確定性的,那麼通過這些算子得到的最終RDD總是包含相同的數據,而與Spark集群的是否故障無關。

Spark主要操作一些可容錯文件係統的數據,如:HDFS或S3。因此,所有從這些可容錯數據源產生的RDD也是可容錯的。然而,對於Spark Streaming並非如此,因為多數情況下Streaming需要從網絡遠端接收數據,這回導致Streaming的數據源並不可靠(尤其是對於使用了fileStream的應用)。要實現RDD相同的容錯屬性,數據接收就必須用多個不同worker節點上的Spark執行器來實現(默認副本因子是2)。因此一旦出現故障,係統需要恢複兩種數據:

  1. 接收並保存了副本的數據 – 數據不會因為單個worker節點故障而丟失,因為有副本!
  2. 接收但尚未保存副本數據 – 因為數據並沒有副本,所以一旦故障,隻能從數據源重新獲取。

此外,還有兩種可能的故障類型需要考慮:

  1. Worker節點故障 – 任何運行執行器的worker節點一旦故障,節點上內存中的數據都會丟失。如果這些節點上有接收器在運行,那麼其包含的緩存數據也會丟失。
  2. Driver節點故障 – 如果Spark Streaming的驅動節點故障,那麼很顯然SparkContext對象就沒了,所有執行器及其內存數據也會丟失。

有了以上這些基本知識,下麵我們就進一步了解一下Spark Streaming的容錯語義。

定義

流式係統的可靠度語義可以據此來分類:單條記錄在係統中被處理的次數保證。一個流式係統可能提供保證必定是以下三種之一(不管係統是否出現故障):

  1. 至多一次(At most once): 每條記錄要麼被處理一次,要麼就沒有處理。
  2. 至少一次(At least once): 每條記錄至少被處理過一次(一次或多次)。這種保證能確保沒有數據丟失,比“至多一次”要強。但有可能出現數據重複。
  3. 精確一次(Exactly once): 每條記錄都精確地隻被處理一次 – 也就是說,既沒有數據丟失,也不會出現數據重複。這是三種保證中最強的一種。

基礎語義

任何流式處理係統一般都會包含以下三個數據處理步驟:

  1. 數據接收(Receiving the data): 從數據源拉取數據。
  2. 數據轉換(Transforming the data): 將接收到的數據進行轉換(使用DStream和RDD transformation算子)。
  3. 數據推送(Pushing out the data): 將轉換後最終數據推送到外部文件係統,數據庫或其他展示係統。

如果Streaming應用需要做到端到端的“精確一次”的保證,那麼就必須在以上三個步驟中各自都保證精確一次:即,每條記錄必須,隻接收一次、處理一次、推送一次。下麵讓我們在Spark Streaming的上下文環境中來理解一下這三個步驟的語義:

  1. 數據接收: 不同數據源提供的保證不同,下一節再詳細討論。
  2. 數據轉換: 所有的數據都會被“精確一次”處理,這要歸功於RDD提供的保障。即使出現故障,隻要數據源還能訪問,最終所轉換得到的RDD總是包含相同的內容。
  3. 數據推送: 輸出操作默認保證“至少一次”的語義,是否能“精確一次”還要看所使用的輸出算子(是否冪等)以及下遊係統(是否支持事務)。不過用戶也可以開發自己的事務機製來實現“精確一次”語義。這個後續會有詳細討論。

接收數據語義

不同的輸入源提供不同的數據可靠性級別,從“至少一次”到“精確一次”。

從文件接收數據

如果所有的輸入數據都來源於可容錯的文件係統,如HDFS,那麼Spark Streaming就能在任何故障中恢複並處理所有的數據。這種情況下就能保證精確一次語義,也就是說不管出現什麼故障,所有的數據總是精確地隻處理一次,不多也不少。

基於接收器接收數據

對於基於接收器的輸入源,容錯語義將同時依賴於故障場景和接收器類型。前麵也已經提到過,spark Streaming主要有兩種類型的接收器:

  1. 可靠接收器 – 這類接收器會在數據接收並保存好副本後,向可靠數據源發送確認信息。這類接收器故障時,是不會給緩存的(已接收但尚未保存副本)數據發送確認信息。因此,一旦接收器重啟,沒有收到確認的數據,會重新從數據源再獲取一遍,所以即使有故障也不會丟數據。
  2. 不可靠接收器 – 這類接收器不會發送確認信息,因此一旦worker和driver出現故障,就有可能會丟失數據。

對於不同的接收器,我們可以獲得如下不同的語義。如果一個worker節點故障了,對於可靠接收器來書,不會有數據丟失。而對於不可靠接收器,緩存的(接收但尚未保存副本)數據可能會丟失。如果driver節點故障了,除了接收到的數據之外,其他的已經接收且已經保存了內存副本的數據都會丟失,這將會影響有狀態算子的計算結果。

為了避免丟失已經收到且保存副本的數,從 spark 1.2 開始引入了WAL(write ahead logs),以便將這些數據寫入到可容錯的存儲中。隻要你使用可靠接收器,同時啟用WAL(write ahead logs enabled),那麼久再也不用為數據丟失而擔心了。並且這時候,還能提供“至少一次”的語義保證。

下表總結了故障情況下的各種語義:

部署場景 Worker 故障 Driver 故障
Spark 1.1及以前版本 或者
Spark 1.2及以後版本,且未開啟WAL
若使用不可靠接收器,則可能丟失緩存(已接收但尚未保存副本)數據;
若使用可靠接收器,則沒有數據丟失,且提供至少一次處理語義
若使用不可靠接收器,則緩存數據和已保存數據都可能丟失;
若使用可靠接收器,則沒有緩存數據丟失,但已保存數據可能丟失,且不提供語義保證
Spark 1.2及以後版本,並啟用WAL 若使用可靠接收器,則沒有數據丟失,且提供至少一次語義保證 若使用可靠接收器和文件,則無數據丟失,且提供至少一次語義保證

從Kafka Direct API接收數據

從Spark 1.3開始,我們引入Kafka Direct API,該API能為Kafka數據源提供“精確一次”語義保證。有了這個輸入API,再加上輸出算子的“精確一次”保證,你就能真正實現端到端的“精確一次”語義保證。(改功能截止Spark 1.6.1還是實驗性的)更詳細的說明見:Kafka Integration Guide

輸出算子的語義

輸出算子(如 foreachRDD)提供“至少一次”語義保證,也就是說,如果worker故障,單條輸出數據可能會被多次寫入外部實體中。不過這對於文件係統來說是可以接受的(使用saveAs***Files 多次保存文件會覆蓋之前的),所以我們需要一些額外的工作來實現“精確一次”語義。主要有兩種實現方式:

  • 冪等更新(Idempotent updates): 就是說多次操作,產生的結果相同。例如,多次調用saveAs***Files保存的文件總是包含相同的數據。
  • 事務更新(Transactional updates): 所有的更新都是事務性的,這樣一來就能保證更新的原子性。以下是一種實現方式:
    • 用批次時間(在foreachRDD中可用)和分區索引創建一個唯一標識,該標識代表流式應用中唯一的一個數據塊。
    • 基於這個標識建立更新事務,並使用數據塊數據更新外部係統。也就是說,如果該標識未被提交,則原子地將標識代表的數據更新到外部係統。否則,就認為該標識已經被提交,直接忽略之。
      dstream.foreachRDD { (rdd, time) =>
        rdd.foreachPartition { partitionIterator =>
          val partitionId = TaskContext.get.partitionId()
          val uniqueId = generateUniqueId(time.milliseconds, partitionId)
          // 使用uniqueId作為事務的唯一標識,基於uniqueId實現partitionIterator所指向數據的原子事務提交
        }
      }
      


遷移指南 – 從0.9.1及以下升級到1.x

在Spark 0.9.1和Spark 1.0之間,有一些API接口變更,變更目的是為了保障未來版本API的穩定。本節將詳細說明一下從已有版本遷移升級到1.0所需的工作。

輸入DStream(Input DStreams): 所有創建輸入流的算子(如:StreamingContext.socketStream, FlumeUtils.createStream 等)的返回值不再是DStream(對Java來說是JavaDStream),而是 InputDStream / ReceiverInputDStream(對Java來說是JavaInputDStream / JavaPairInputDStream /JavaReceiverInputDStream / JavaPairReceiverInputDStream)。這樣才能確保特定輸入流的功能能夠在未來持續增加到這些class中,而不會打破二進製兼容性。注意,已有的Spark Streaming應用應該不需要任何代碼修改(新的返回類型都是DStream的子類),隻不過需要基於Spark 1.0重新編譯一把。

定製網絡接收器(Custom Network Receivers): 自從Spark Streaming發布以來,Scala就能基於NetworkReceiver來定製網絡接收器。但由於錯誤處理和匯報API方便的限製,該類型不能在Java中使用。所以Spark 1.0開始,用 Receiver 來替換掉這個NetworkReceiver,主要的好處如下:

  • 該類型新增了stop和restart方法,便於控製接收器的生命周期。詳見custom receiver guide
  • 定製接收器用Scala和Java都能實現。

為了將已有的基於NetworkReceiver的自定義接收器遷移到Receiver上來,你需要如下工作:

  • 首先你的自定義接收器類型需要從 org.apache.spark.streaming.receiver.Receiver繼承,而不再是org.apache.spark.streaming.dstream.NetworkReceiver。
  • 原先,我們需要在自定義接收器中創建一個BlockGenerator來保存接收到的數據。你必須顯示的實現onStart() 和 onStop() 方法。而在新的Receiver class中,這些都不需要了,你隻需要調用它的store係列的方法就能將數據保存到Spark中。所以你接下來需要做的遷移工作就是,刪除BlockGenerator對象(這個類型在Spark 1.0之後也沒有了~),然後用store(…)方法來保存接收到的數據。

基於Actor的接收器(Actor-based Receivers): 從actor class繼承後,並實org.apache.spark.streaming.receiver.Receiver 後,即可從Akka Actors中獲取數據。獲取數據的類被重命名為  org.apache.spark.streaming.receiver.ActorHelper ,而保存數據的pushBlocks(…)方法也被重命名為 store(…)。其他org.apache.spark.streaming.receivers包中的工具類也被移到  org.apache.spark.streaming.receiver 包下並重命名,新的類名應該比之前更加清晰。



下一步

轉載自 並發編程網 - ifeve.com

最後更新:2017-05-19 15:03:34

  上一篇:go  阿裏雲大數據利器Maxcompute之-假如你使用過hive
  下一篇:go  《Spark官方文檔》Spark Streaming編程指南(一)