閱讀204 返回首頁    go 阿裏雲 go 技術社區[雲棲]


## Spark作業性能調優總結

Spark作業性能調優總結

前段時間在集群上運行Spark作業,但是發現作業運行到某個stage之後就卡住了,之後也不再有日誌輸出。於是開始著手對作業進行調優,下麵是遇到的問題和解決過程:

運行時錯誤

Out Of Memory: Java heap space / GC overhead limit exceeded

使用yarn logs -applicationId=appliation_xxx_xxx 命令查看Yarn收集的各個Executor的日誌。

可以發現OOM的錯誤,以及一些retry 或waiting timeout的錯誤。這是因為發生Full GC時會造成stop-the-world,應用暫停運行等待垃圾回收結束。

Java heap space是指堆內存空間不足,而GC overhead limit exeeded是Hotspot VM 1.6的一個策略,通過統計GC時間來預測是否要OOM了,提前拋出異常,防止OOM發生。Sun官方給出的定義是“並行/並發回收器在GC回收時間過長會拋出OutOfMemory。過長的定義是,超過98%的時間用來做GC並且回收了不到2%的堆內存,用來避免內存過小造成應用不能正常工作”。這個策略會保存數據或保存現場(Heap Dump)

​ 在代碼設置Spark參數,將executor的堆棧信息打印出來

conf.set("spark.executor.extraJavaOptions", "-XX:+PrintGCDetails -XX:+PrintGCTimeStamps")

在運行後的日誌中可以發現如下內容:

33.125: [GC DefNew: 16000K->16000K(16192K), 0.0000574 secs 18973K->2704K(32576K), 0.1015066 secs]
100.667:[Full GC [Tenured: 0K->210K(10240K), 0.0149142 secs] 4603K->210K(19456K), [Perm : 2999K->2999K(21248K)], 0.0150007 secs]

實際上還有total的匯總信息,如下所示:

 PSYoungGen      total 5496832K, used 5357511K [0x00000006aaa80000, 0x0000000800000000, 0x0000000800000000)
  eden space 5402624K, 97% used [0x00000006aaa80000,0x00000007ebe75800,0x00000007f4680000)
  from space 94208K, 99% used [0x00000007f4680000,0x00000007fa27c530,0x00000007fa280000)
  to   space 95744K, 0% used [0x00000007fa280000,0x00000007fa280000,0x0000000800000000)
 ParOldGen       total 11185152K, used 781938K [0x00000003fff80000, 0x00000006aaa80000, 0x00000006aaa80000)
 PSPermGen       total 1048576K, used 58663K [0x00000003bff80000, 0x00000003fff80000, 0x00000003fff80000)
  object space 1048576K, 5% used [0x00000003bff80000,0x00000003c38c9d60,0x00000003fff80000)```
  作業卡住的時候,PSPermGen的使用占比一般在99%左右,因此我在Spark程序中增大了堆外內存



```conf.set("spark.executor.extraJavaOptions", "-XX:PermSize=1024m -XX:MaxPermSize=2048m -XX:+PrintGCDetails -XX:+PrintGCTimeStamps") ```
這裏GC和Full GC代表gc停頓的類型,Full GC代表stop-the-world。箭頭兩邊是gc前後的區間空間大小,分別是young區、tenured區和perm區,括號裏是該區的大小。冒號前麵是GC發生的時間,單位是秒,從jvm啟動開始計算。DefNew代表Serial收集器,為Default New Generation的縮寫,類似的還有PSYoundGen,代表parallel Scavenge收集器。這樣可以通過分析日誌找到導致GC overhead limit execeeded的原因, 通過調節相應的參數解決問題。

文中涉及到的名詞解釋:

Eden Space:堆內存池,大多數對象在這裏分配內存空間

Survivor Space:堆內存池,存儲在Eden Space中存活下來的對象

Tenured Generation:堆內存,存儲Survivor Space中存過幾次GC的對象

Permanent Generation:非對空間,存儲的是class和method對象

Code Cache:非堆空間,JVM用來存儲編譯和存儲native code

### Executor & Task Lost

*executor lost*



WARN TaskSetManager: Lost task 1.0 in stage 0.0 (TID 1, aa.local):
ExecutorLostFailure (executor lost)
*task lost*



WARN TaskSetManager: Lost task 69.2 in stage 7.0 (TID 1145, 192.168.47.217):
java.io.IOException: Connection from /192.168.47.217:55483 closed
各種timeout



java.util.concurrent.TimeoutException: Futures timed out after [120 second]
​
ERROR TransportChannelHandler: Connection to /192.168.47.212:35409 
has been quiet for 120000 ms while there are outstanding requests.
Assuming connection is dead; please adjust spark.network.
timeout if this is wrong
解決:由網絡或者GC引起,worker或者executor沒有接收到executor或task的心跳反饋,提高spark.core.connection.ack.wait.timeout 的值,根據情況改為300s或更高。增大spark.yarn.executor.memoryOverhead 堆外內存的值,根據情況改為4096或更高。

##傾斜

*數據傾斜*




*任務傾斜*

差距不大的幾個task,有的運行速度特別慢

解決:大多數任務都完成了,還有一兩個任務怎們都跑不完或者跑的很慢,分數據傾斜和任務傾斜

數據傾斜:

數據傾斜大多數情況是由於大量的無效數據引起,比如null或者“ ”,也有可能是一些異常數據,,比如統計用戶登錄情況時,出現某用戶登錄過千萬次的情況,無效數據在計算前需要過濾掉。 
數據處理有一個原則,多使用filter,這樣你真正需要分析的數據量就越少,處理速度就越快。

具體可參見解決spark中遇到的數據傾斜問題

任務傾斜

task傾斜原因比較多,網絡io,cpu,mem都有可能造成這個節點上的任務執行緩慢,可以去看該節點的性能監控過來分析原因。以前遇到過同事在spark的一台worker上跑R的任務導致該節點spark task運行緩慢。 
或者可以開啟spark的推測機製,開啟推測機製後如果某一台機器的幾個task特別慢,推測機製會將任務分配到其他機器執行,最後Spark會選取最快的作為最終結果。

```spark.speculation true

spark.speculation.interval 100 - 檢測周期,單位毫秒;

spark.speculation.quantile 0.75 - 完成task的百分比時啟動推測

spark.speculation.multiplier 1.5 - 比其他的慢多少倍時啟動推測。```

####OOM
解決: 內存不夠,數據太多就會拋出OOM的Exception,主要有Driver OOm和Executor OOM兩種

Driver OOM

一般是使用了collect操作將所有executor的數據聚合到dirver端導致,盡量不要使用collect操作即可

Executor OOM

可以按下麵的內存優化的方法增加code使用內存空間

增加executor內存總量,也就是說增加spark.executor.memory 的值

增加任務並行度(大任務就被分割成小任務了),參考下麵優化並行度的方法

#### 一些優化

部分Executor不執行任務

有時候會發現部分executor並沒有執行任務,為什麼呢?

任務partition數量過少

每個partition隻會在一個task執行任務。改變分區數,可以通過repartition方法,即使這樣,在repartition前麵還是要從數據源讀取數據,此時(讀入數據)的並發度根據不同的數據源受到不同限製,常用的大概有以下幾種:



hdfs - block數就是partition數
mysql - 按讀入時的分區規則分partition
es - 分區數即為 es 的 分片數(shard)
數據本地行的副作用

taskManager在分發任務之前會優先計算數據本地行,優先等級是:



`process(同一個executor) -> node_local(同一個節點) -> rack_local(同一個機架) -> any(任何結點)`
Spark會優先執行高優先級的任務,    任務完成的速度很快(小於設置的spark.locality.wait時間),則數據本地性下一級別的任務則一直不會啟動,這就是Spark的延時調度機製。

舉個極端例子:運行一個count任務,如果數據全都堆積在某一台節點上,那將隻會有這台機器在長期計算任務,集群中的其他機器則會處於等待狀態(等待本地性降級)而不執行任務,造成了大量的資源浪費。



判斷的公式為:`curTime – lastLaunchTime >= localityWaits(currentLocalityIndex)`
其中 curTime 為係統當前時間,lastLaunchTime 為在某優先級下最後一次啟動task的時間

如果滿足這個條件則會進入下一個優先級的時間判斷,直到 any,不滿足則分配當前優先級的任務。

數據本地性任務分配的源碼在 taskSetManager.Scala 。

如果存在大量executor處於等待狀態,可以降低以下參數的值(也可以設置為0),默認都是3s。



```spark.locality.wait
spark.locality.wait.process
spark.locality.wait.node
spark.locality.wait.rack```
當你數據本地性很差,可適當提高上述值,當然也可以直接在集群中對數據進行balance。

內存

如果你的Shuffle量特別大,同時rdd緩存比較少可以更改下麵的參數進一步提高任務運行速度

`spark.storage.memoryFraction` - 分配給rdd緩存的比例,默認為0.6(60%),如果緩存的數據比較少可以降低該值

`spark.shuffle.memoryFraction` - 分配給shuffle數據的比例,默認為0.2(20%),剩下的20%內存空間則是分配給代碼生成對象等。

如果運行任務運行緩慢,jvm進行頻繁GC或者內存空間不足,或者可以降低上述的兩個值。

"`spark.rdd.compress","true`" - 默認為false,壓縮序列化的RDD分區,消耗一些cpu減少空間的使用


以上這些方法是我在解決Spark作業性能過程中接觸的,更加詳細的情況以及解決方法可以看這篇文章Spark排錯與優化 ,總結的非常詳細了。

此外,還是應該多看Spark調度的源碼,對寫代碼已經分析問題都非常有幫助。

最後更新:2017-07-18 22:02:38

  上一篇:go  Ubuntu 安裝anaconda3
  下一篇:go  工作中要拿出自己的態度