## Spark作業性能調優總結
Spark作業性能調優總結
前段時間在集群上運行Spark作業,但是發現作業運行到某個stage之後就卡住了,之後也不再有日誌輸出。於是開始著手對作業進行調優,下麵是遇到的問題和解決過程:
運行時錯誤
Out Of Memory: Java heap space / GC overhead limit exceeded
使用yarn logs -applicationId=appliation_xxx_xxx 命令查看Yarn收集的各個Executor的日誌。
可以發現OOM的錯誤,以及一些retry 或waiting timeout的錯誤。這是因為發生Full GC時會造成stop-the-world,應用暫停運行等待垃圾回收結束。
Java heap space是指堆內存空間不足,而GC overhead limit exeeded是Hotspot VM 1.6的一個策略,通過統計GC時間來預測是否要OOM了,提前拋出異常,防止OOM發生。Sun官方給出的定義是“並行/並發回收器在GC回收時間過長會拋出OutOfMemory。過長的定義是,超過98%的時間用來做GC並且回收了不到2%的堆內存,用來避免內存過小造成應用不能正常工作”。這個策略會保存數據或保存現場(Heap Dump)
在代碼設置Spark參數,將executor的堆棧信息打印出來
conf.set("spark.executor.extraJavaOptions", "-XX:+PrintGCDetails -XX:+PrintGCTimeStamps")
在運行後的日誌中可以發現如下內容:
33.125: [GC DefNew: 16000K->16000K(16192K), 0.0000574 secs 18973K->2704K(32576K), 0.1015066 secs]
100.667:[Full GC [Tenured: 0K->210K(10240K), 0.0149142 secs] 4603K->210K(19456K), [Perm : 2999K->2999K(21248K)], 0.0150007 secs]
實際上還有total的匯總信息,如下所示:
PSYoungGen total 5496832K, used 5357511K [0x00000006aaa80000, 0x0000000800000000, 0x0000000800000000)
eden space 5402624K, 97% used [0x00000006aaa80000,0x00000007ebe75800,0x00000007f4680000)
from space 94208K, 99% used [0x00000007f4680000,0x00000007fa27c530,0x00000007fa280000)
to space 95744K, 0% used [0x00000007fa280000,0x00000007fa280000,0x0000000800000000)
ParOldGen total 11185152K, used 781938K [0x00000003fff80000, 0x00000006aaa80000, 0x00000006aaa80000)
PSPermGen total 1048576K, used 58663K [0x00000003bff80000, 0x00000003fff80000, 0x00000003fff80000)
object space 1048576K, 5% used [0x00000003bff80000,0x00000003c38c9d60,0x00000003fff80000)```
作業卡住的時候,PSPermGen的使用占比一般在99%左右,因此我在Spark程序中增大了堆外內存
```conf.set("spark.executor.extraJavaOptions", "-XX:PermSize=1024m -XX:MaxPermSize=2048m -XX:+PrintGCDetails -XX:+PrintGCTimeStamps") ```
這裏GC和Full GC代表gc停頓的類型,Full GC代表stop-the-world。箭頭兩邊是gc前後的區間空間大小,分別是young區、tenured區和perm區,括號裏是該區的大小。冒號前麵是GC發生的時間,單位是秒,從jvm啟動開始計算。DefNew代表Serial收集器,為Default New Generation的縮寫,類似的還有PSYoundGen,代表parallel Scavenge收集器。這樣可以通過分析日誌找到導致GC overhead limit execeeded的原因, 通過調節相應的參數解決問題。
文中涉及到的名詞解釋:
Eden Space:堆內存池,大多數對象在這裏分配內存空間
Survivor Space:堆內存池,存儲在Eden Space中存活下來的對象
Tenured Generation:堆內存,存儲Survivor Space中存過幾次GC的對象
Permanent Generation:非對空間,存儲的是class和method對象
Code Cache:非堆空間,JVM用來存儲編譯和存儲native code
### Executor & Task Lost
*executor lost*
WARN TaskSetManager: Lost task 1.0 in stage 0.0 (TID 1, aa.local):
ExecutorLostFailure (executor lost)
*task lost*
WARN TaskSetManager: Lost task 69.2 in stage 7.0 (TID 1145, 192.168.47.217):
java.io.IOException: Connection from /192.168.47.217:55483 closed
各種timeout
java.util.concurrent.TimeoutException: Futures timed out after [120 second]
ERROR TransportChannelHandler: Connection to /192.168.47.212:35409
has been quiet for 120000 ms while there are outstanding requests.
Assuming connection is dead; please adjust spark.network.
timeout if this is wrong
解決:由網絡或者GC引起,worker或者executor沒有接收到executor或task的心跳反饋,提高spark.core.connection.ack.wait.timeout 的值,根據情況改為300s或更高。增大spark.yarn.executor.memoryOverhead 堆外內存的值,根據情況改為4096或更高。
##傾斜
*數據傾斜*
*任務傾斜*
差距不大的幾個task,有的運行速度特別慢
解決:大多數任務都完成了,還有一兩個任務怎們都跑不完或者跑的很慢,分數據傾斜和任務傾斜
數據傾斜:
數據傾斜大多數情況是由於大量的無效數據引起,比如null或者“ ”,也有可能是一些異常數據,,比如統計用戶登錄情況時,出現某用戶登錄過千萬次的情況,無效數據在計算前需要過濾掉。
數據處理有一個原則,多使用filter,這樣你真正需要分析的數據量就越少,處理速度就越快。
具體可參見解決spark中遇到的數據傾斜問題
任務傾斜
task傾斜原因比較多,網絡io,cpu,mem都有可能造成這個節點上的任務執行緩慢,可以去看該節點的性能監控過來分析原因。以前遇到過同事在spark的一台worker上跑R的任務導致該節點spark task運行緩慢。
或者可以開啟spark的推測機製,開啟推測機製後如果某一台機器的幾個task特別慢,推測機製會將任務分配到其他機器執行,最後Spark會選取最快的作為最終結果。
```spark.speculation true
spark.speculation.interval 100 - 檢測周期,單位毫秒;
spark.speculation.quantile 0.75 - 完成task的百分比時啟動推測
spark.speculation.multiplier 1.5 - 比其他的慢多少倍時啟動推測。```
####OOM
解決: 內存不夠,數據太多就會拋出OOM的Exception,主要有Driver OOm和Executor OOM兩種
Driver OOM
一般是使用了collect操作將所有executor的數據聚合到dirver端導致,盡量不要使用collect操作即可
Executor OOM
可以按下麵的內存優化的方法增加code使用內存空間
增加executor內存總量,也就是說增加spark.executor.memory 的值
增加任務並行度(大任務就被分割成小任務了),參考下麵優化並行度的方法
#### 一些優化
部分Executor不執行任務
有時候會發現部分executor並沒有執行任務,為什麼呢?
任務partition數量過少
每個partition隻會在一個task執行任務。改變分區數,可以通過repartition方法,即使這樣,在repartition前麵還是要從數據源讀取數據,此時(讀入數據)的並發度根據不同的數據源受到不同限製,常用的大概有以下幾種:
hdfs - block數就是partition數
mysql - 按讀入時的分區規則分partition
es - 分區數即為 es 的 分片數(shard)
數據本地行的副作用
taskManager在分發任務之前會優先計算數據本地行,優先等級是:
`process(同一個executor) -> node_local(同一個節點) -> rack_local(同一個機架) -> any(任何結點)`
Spark會優先執行高優先級的任務, 任務完成的速度很快(小於設置的spark.locality.wait時間),則數據本地性下一級別的任務則一直不會啟動,這就是Spark的延時調度機製。
舉個極端例子:運行一個count任務,如果數據全都堆積在某一台節點上,那將隻會有這台機器在長期計算任務,集群中的其他機器則會處於等待狀態(等待本地性降級)而不執行任務,造成了大量的資源浪費。
判斷的公式為:`curTime – lastLaunchTime >= localityWaits(currentLocalityIndex)`
其中 curTime 為係統當前時間,lastLaunchTime 為在某優先級下最後一次啟動task的時間
如果滿足這個條件則會進入下一個優先級的時間判斷,直到 any,不滿足則分配當前優先級的任務。
數據本地性任務分配的源碼在 taskSetManager.Scala 。
如果存在大量executor處於等待狀態,可以降低以下參數的值(也可以設置為0),默認都是3s。
```spark.locality.wait
spark.locality.wait.process
spark.locality.wait.node
spark.locality.wait.rack```
當你數據本地性很差,可適當提高上述值,當然也可以直接在集群中對數據進行balance。
內存
如果你的Shuffle量特別大,同時rdd緩存比較少可以更改下麵的參數進一步提高任務運行速度
`spark.storage.memoryFraction` - 分配給rdd緩存的比例,默認為0.6(60%),如果緩存的數據比較少可以降低該值
`spark.shuffle.memoryFraction` - 分配給shuffle數據的比例,默認為0.2(20%),剩下的20%內存空間則是分配給代碼生成對象等。
如果運行任務運行緩慢,jvm進行頻繁GC或者內存空間不足,或者可以降低上述的兩個值。
"`spark.rdd.compress","true`" - 默認為false,壓縮序列化的RDD分區,消耗一些cpu減少空間的使用
以上這些方法是我在解決Spark作業性能過程中接觸的,更加詳細的情況以及解決方法可以看這篇文章Spark排錯與優化 ,總結的非常詳細了。
此外,還是應該多看Spark調度的源碼,對寫代碼已經分析問題都非常有幫助。
最後更新:2017-07-18 22:02:38