《Spark 官方文檔》Spark配置(一)
Spark配置
Spark有以下三種方式修改配置:
- Spark properties (Spark屬性)可以控製絕大多數應用程序參數,而且既可以通過 SparkConf 對象來設置,也可以通過Java係統屬性來設置。
- Environment variables (環境變量)可以指定一些各個機器相關的設置,如IP地址,其設置方法是寫在每台機器上的conf/spark-env.sh中。
- Logging (日誌)可以通過log4j.properties配置日誌。
Spark屬性
Spark屬性可以控製大多數的應用程序設置,並且每個應用的設定都是分開的。這些屬性可以用SparkConf 對象直接設定。SparkConf為一些常用的屬性定製了專用方法(如,master URL和application name),其他屬性都可以用鍵值對做參數,調用set()方法來設置。例如,我們可以初始化一個包含2個本地線程的Spark應用,代碼如下:
注意,local[2]代表2個本地線程 – 這是最小的並發方式,可以幫助我們發現一些隻有在分布式上下文才能複現的bug。
val conf = new SparkConf()
.setMaster("local[2]")
.setAppName("CountingSheep")
val sc = new SparkContext(conf)
注意,本地模式下,我們可以使用n個線程(n >= 1)。而且在像Spark Streaming這樣的場景下,我們可能需要多個線程來防止類似線程餓死這樣的問題。
配置時間段的屬性應該寫明時間單位,如下格式都是可接受的:
25ms (milliseconds)
5s (seconds)
10m or 10min (minutes)
3h (hours)
5d (days)
1y (years)
配置大小的屬性也應該寫明單位,如下格式都是可接受的:
1b (bytes)
1k or 1kb (kibibytes = 1024 bytes)
1m or 1mb (mebibytes = 1024 kibibytes)
1g or 1gb (gibibytes = 1024 mebibytes)
1t or 1tb (tebibytes = 1024 gibibytes)
1p or 1pb (pebibytes = 1024 tebibytes)
動態加載Spark屬性
在某些場景下,你可能需要避免將屬性值寫死在 SparkConf 中。例如,你可能希望在同一個應用上使用不同的master或不同的內存總量。Spark允許你簡單地創建一個空的SparkConf對象:
val sc = new SparkContext(new SparkConf())
然後在運行時設置這些屬性:
./bin/spark-submit --name "My app" --master local[4] --conf spark.eventLog.enabled=false
--conf "spark.executor.extraJavaOptions=-XX:+PrintGCDetails -XX:+PrintGCTimeStamps" myApp.jar
Spark shell和spark-submit
工具支持兩種動態加載配置的方法。第一種,通過命令行選項,如:上麵提到的–master(設置master URL)。spark-submit可以在啟動Spark應用時,通過–conf標誌接受任何屬性配置,同時有一些特殊配置參數同樣可用(如,–master)。運行./bin/spark-submit –help可以展示這些選項的完整列表。
同時,bin/spark-submit 也支持從conf/spark-defaults.conf 中讀取配置選項,在該文件中每行是一個鍵值對,並用空格分隔,如下:
spark.master spark://5.6.7.8:7077
spark.executor.memory 4g
spark.eventLog.enabled true
spark.serializer org.apache.spark.serializer.KryoSerializer
這些通過參數或者屬性配置文件傳遞的屬性,最終都會在SparkConf 中合並。其優先級是:首先是SparkConf代碼中寫的屬性值,其次是spark-submit或spark-shell的標誌參數,最後是spark-defaults.conf文件中的屬性。
有一些配置項被重命名過,這種情形下,老的名字仍然是可以接受的,隻是優先級比新名字優先級低。
查看Spark屬性
每個SparkContext都有其對應的Spark UI,所以Spark應用程序都能通過Spark UI查看其屬性。默認你可以在這裏看到:https://<driver>:4040,頁麵上的”Environment“ tab頁可以查看Spark屬性。如果你真的想確認一下屬性設置是否正確的話,這個功能就非常有用了。注意,隻有顯式地通過SparkConf對象、在命令行參數、或者spark-defaults.conf設置的參數才會出現在頁麵上。其他屬性,你可以認為都是默認值。
可用的屬性
絕大多數屬性都有合理的默認值。這裏是部分常用的選項:
應用屬性
屬性名稱 | 默認值 | 含義 |
---|---|---|
spark.app.name |
(none) | Spark應用的名字。會在SparkUI和日誌中出現。 |
spark.driver.cores |
1 | 在cluster模式下,用幾個core運行驅動器(driver)進程。 |
spark.driver.maxResultSize |
1g | Spark action算子返回的結果最大多大。至少要1M,可以設為0表示無限製。如果結果超過這一大小,Spark作業(job)會直接中斷退出。但是,設得過高有可能導致驅動器OOM(out-of-memory)(取決於spark.driver.memory設置,以及驅動器JVM的內存限製)。設一個合理的值,以避免驅動器OOM。 |
spark.driver.memory |
1g | 驅動器進程可以用的內存總量(如:1g,2g)。 注意,在客戶端模式下,這個配置不能在SparkConf中直接設置(因為驅動器JVM都啟動完了呀!)。驅動器客戶端模式下,必須要在命令行裏用 –driver-memory 或者在默認屬性配置文件裏設置。 |
spark.executor.memory |
1g | 單個執行器(executor)使用的內存總量(如,2g,8g) |
spark.extraListeners |
(none) | 逗號分隔的SparkListener子類的類名列表;初始化SparkContext時,這些類的實例會被創建出來,並且注冊到Spark的監聽總線上。如果這些類有一個接受SparkConf作為唯一參數的構造函數,那麼這個構造函數會被優先調用;否則,就調用無參數的默認構造函數。如果沒有構造函數,SparkContext創建的時候會拋異常。 |
spark.local.dir |
/tmp | Spark的”草稿“目錄,包括map輸出的臨時文件,或者RDD存在磁盤上的數據。這個目錄最好在本地文件係統中,這樣讀寫速度快。這個配置可以接受一個逗號分隔的列表,通常用這種方式將文件IO分散不同的磁盤上去。 注意:Spark-1.0及以後版本中,這個屬性會被集群管理器所提供的環境變量覆蓋:SPARK_LOCAL_DIRS(獨立部署或Mesos)或者 LOCAL_DIRS(YARN)。 |
spark.logConf |
false | SparkContext啟動時是否把生效的 SparkConf 屬性以INFO日誌打印到日誌裏 |
spark.master |
(none) | 集群管理器URL。參考allowed master URL’s. |
除了這些以外,以下還有很多可用的參數配置,在某些特定情形下,可能會用到:
運行時環境
屬性名 | 默認值 | 含義 |
---|---|---|
spark.driver.extraClassPath |
(none) | 額外的classpath,將插入到到驅動器的classpath開頭。 注意:驅動器如果運行客戶端模式下,這個配置不能通過SparkConf 在程序裏配置,因為這時候程序已經啟動呀!而是應該用命令行參數(–driver-class-path)或者在 conf/spark-defaults.conf 配置。 |
spark.driver.extraJavaOptions |
(none) | 驅動器額外的JVM選項。如:GC設置或其他日誌參數。 注意:驅動器如果運行客戶端模式下,這個配置不能通過SparkConf在程序裏配置,因為這時候程序已經啟動呀!而是應該用命令行參數(–driver-java-options)或者conf/spark-defaults.conf 配置。 |
spark.driver.extraLibraryPath |
(none) | 啟動驅動器JVM時候指定的依賴庫路徑。 注意:驅動器如果運行客戶端模式下,這個配置不能通過SparkConf在程序裏配置,因為這時候程序已經啟動呀!而是應該用命令行參數(–driver-library-path)或者conf/spark-defaults.conf 配置。 |
spark.driver.userClassPathFirst |
false | (試驗性的:即未來不一定會支持該配置) 驅動器是否首選使用用戶指定的jars,而不是spark自身的。這個特性可以用來處理用戶依賴和spark本身依賴項之間的衝突。目前還是試驗性的,並且隻能用在集群模式下。 |
spark.executor.extraClassPath |
(none) | 添加到執行器(executor)classpath開頭的classpath。主要為了向後兼容老的spark版本,不推薦使用。 |
spark.executor.extraJavaOptions |
(none) | 傳給執行器的額外JVM參數。如:GC設置或其他日誌設置等。注意,不能用這個來設置Spark屬性或者堆內存大小。Spark屬性應該用SparkConf對象,或者spark-defaults.conf文件(會在spark-submit腳本中使用)來配置。執行器堆內存大小應該用 spark.executor.memory配置。 |
spark.executor.extraLibraryPath |
(none) | 啟動執行器JVM時使用的額外依賴庫路徑。 |
spark.executor.logs.rolling.maxRetainedFiles |
(none) | Sets the number of latest rolling log files that are going to be retained by the system. Older log files will be deleted. Disabled by default.設置日誌文件最大保留個數。老日誌文件將被幹掉。默認禁用的。 |
spark.executor.logs.rolling.maxSize |
(none) | 設置執行器日誌文件大小上限。默認禁用的。 需要自動刪日誌請參考 spark.executor.logs.rolling.maxRetainedFiles. |
spark.executor.logs.rolling.strategy |
(none) | 執行器日誌滾動策略。默認禁用。 可接受的值有”time”(基於時間滾動) 或者 “size”(基於文件大小滾動)。 time:將使用 spark.executor.logs.rolling.time.interval設置滾動時間間隔 size:將使用 spark.executor.logs.rolling.size.maxBytes設置文件大小上限 |
spark.executor.logs.rolling.time.interval |
daily | 設置執行器日誌滾動時間間隔。日誌滾動默認是禁用的。 可用的值有 “daily”, “hourly”, “minutely”,也可設為數字(則單位為秒)。 關於日誌自動清理,請參考 spark.executor.logs.rolling.maxRetainedFiles |
spark.executor.userClassPathFirst |
false | (試驗性的)與 spark.driver.userClassPathFirst類似,隻不過這個參數將應用於執行器 |
spark.executorEnv.[EnvironmentVariableName] |
(none) | 向執行器進程增加名為EnvironmentVariableName的環境變量。用戶可以指定多個來設置不同的環境變量。 |
spark.python.profile |
false | 對Python worker啟用性能分析,性能分析結果會在sc.show_profile()中,或者在驅動器退出前展示。也可以用sc.dump_profiles(path)輸出到磁盤上。如果部分分析結果被手動展示過,那麼驅動器退出前就不再自動展示了。默認會使用pyspark.profiler.BasicProfiler,也可以自己傳一個profiler 類參數給SparkContext構造函數。 |
spark.python.profile.dump |
(none) | 這個目錄是用來在驅動器退出前,dump性能分析結果。性能分析結果會按RDD分別dump。同時可以使用ptats.Stats()來裝載。如果製定了這個,那麼分析結果就不再自動展示。 |
spark.python.worker.memory |
512m | 聚合時每個python worker使用的內存總量,和JVM的內存字符串格式相同(如,512m,2g)。如果聚合時使用的內存超過這個量,就將數據溢出到磁盤上。 |
spark.python.worker.reuse |
true | 是否複用Python worker。如果是,則每個任務會啟動固定數量的Python worker,並且不需要fork() python進程。如果需要廣播的數據量很大,設為true能大大減少廣播數據量,因為需要廣播的進程數減少了。 |
混洗行為
屬性名 | 默認值 | 含義 |
---|---|---|
spark.reducer.maxSizeInFlight |
48m | map任務輸出同時reduce任務獲取的最大內存占用量。每個輸出需要創建buffer來接收,對於每個reduce任務來說,有一個固定的內存開銷上限,所以最好別設太大,除非你內存非常大。 |
spark.shuffle.compress |
true | 是否壓縮map任務的輸出文件。通常來說,壓縮是個好主意。使用的壓縮算法取決於 spark.io.compression.codec |
spark.shuffle.file.buffer |
32k | 每個混洗輸出流的內存buffer大小。這個buffer能減少混洗文件的創建和磁盤尋址。 |
spark.shuffle.io.maxRetries |
3 | (僅對netty)如果IO相關異常發生,重試次數(如果設為非0的話)。重試能是大量數據的混洗操作更加穩定,因為重試可以有效應對長GC暫停或者網絡閃斷。 |
spark.shuffle.io.numConnectionsPerPeer |
1 | (僅netty)主機之間的連接是複用的,這樣可以減少大集群中重複建立連接的次數。然而,有些集群是機器少,磁盤多,這種集群可以考慮增加這個參數值,以便充分利用所有磁盤並發性能。 |
spark.shuffle.io.preferDirectBufs |
true | (僅netty)堆外緩存可以有效減少垃圾回收和緩存複製。對於堆外內存緊張的用戶來說,可以考慮禁用這個選項,以迫使netty所有內存都分配在堆上。 |
spark.shuffle.io.retryWait |
5s | (僅netty)混洗重試獲取數據的間隔時間。默認最大重試延遲是15秒,設置這個參數後,將變成maxRetries* retryWait。 |
spark.shuffle.manager |
sort | 混洗數據的實現方式。可用的有”sort”和”hash“。基於排序(sort)的混洗內存利用率更高,並且從1.2開始已經是默認值了。 |
spark.shuffle.service.enabled |
false | 啟用外部混洗服務。啟用外部混洗服務後,執行器生成的混洗中間文件就由該服務保留,這樣執行器就可以安全的退出了。如果 spark.dynamicAllocation.enabled啟用了,那麼這個參數也必須啟用,這樣動態分配才能有外部混洗服務可用。 更多請參考:dynamic allocation configuration and setup documentation |
spark.shuffle.service.port |
7337 | 外部混洗服務對應端口 |
spark.shuffle.sort.bypassMergeThreshold |
200 | (高級)在基於排序(sort)的混洗管理器中,如果沒有map端聚合的話,就會最多存在這麼多個reduce分區。 |
spark.shuffle.spill.compress |
true | 是否在混洗階段壓縮溢出到磁盤的數據。壓縮算法取決於spark.io.compression.codec |
Spark UI
屬性名 | 默認值 | 含義 |
---|---|---|
spark.eventLog.compress |
false | 是否壓縮事件日誌(當然spark.eventLog.enabled必須開啟) |
spark.eventLog.dir |
file:///tmp/spark-events | Spark events日誌的基礎目錄(當然spark.eventLog.enabled必須開啟)。在這個目錄中,spark會給每個應用創建一個單獨的子目錄,然後把應用的events log打到子目錄裏。用戶可以設置一個統一的位置(比如一個HDFS目錄),這樣history server就可以從這裏讀取曆史文件。 |
spark.eventLog.enabled |
false | 是否啟用Spark事件日誌。如果Spark應用結束後,仍需要在SparkUI上查看其狀態,必須啟用這個。 |
spark.ui.killEnabled |
true | 允許從SparkUI上殺掉stage以及對應的作業(job) |
spark.ui.port |
4040 | SparkUI端口,展示應用程序運行狀態。 |
spark.ui.retainedJobs |
1000 | SparkUI和status API最多保留多少個spark作業的數據(當然是在垃圾回收之前) |
spark.ui.retainedStages |
1000 | SparkUI和status API最多保留多少個spark步驟(stage)的數據(當然是在垃圾回收之前) |
spark.worker.ui.retainedExecutors |
1000 | SparkUI和status API最多保留多少個已結束的執行器(executor)的數據(當然是在垃圾回收之前) |
spark.worker.ui.retainedDrivers |
1000 | SparkUI和status API最多保留多少個已結束的驅動器(driver)的數據(當然是在垃圾回收之前) |
spark.sql.ui.retainedExecutions |
1000 | SparkUI和status API最多保留多少個已結束的執行計劃(execution)的數據(當然是在垃圾回收之前) |
spark.streaming.ui.retainedBatches |
1000 | SparkUI和status API最多保留多少個已結束的批量(batch)的數據(當然是在垃圾回收之前) |
壓縮和序列化
屬性名 | 默認值 | 含義 |
---|---|---|
spark.broadcast.compress |
true | 是否在廣播變量前使用壓縮。通常是個好主意。 |
spark.closure.serializer |
org.apache.spark.serializer. JavaSerializer |
閉包所使用的序列化類。目前隻支持Java序列化。 |
spark.io.compression.codec |
snappy | 內部數據使用的壓縮算法,如:RDD分區、廣播變量、混洗輸出。Spark提供了3中算法:lz4,lzf,snappy。你也可以使用全名來指定壓縮算法:org.apache.spark.io.LZ4CompressionCodec ,org.apache.spark.io.LZFCompressionCodec ,org.apache.spark.io.SnappyCompressionCodec . |
spark.io.compression.lz4.blockSize |
32k | LZ4算法使用的塊大小。當然你需要先使用LZ4壓縮。減少塊大小可以減少混洗時LZ4算法占用的內存量。 |
spark.io.compression.snappy.blockSize |
32k | Snappy算法使用的塊大小(先得使用Snappy算法)。減少塊大小可以減少混洗時Snappy算法占用的內存量。 |
spark.kryo.classesToRegister |
(none) | 如果你使用Kryo序列化,最好指定這個以提高性能(tuning guide)。 本參數接受一個逗號分隔的類名列表,這些類都會注冊為Kryo可序列化類型。 |
spark.kryo.referenceTracking |
true (false when using Spark SQL Thrift Server) | 是否跟蹤同一對象在Kryo序列化的引用。如果你的對象圖中有循環護著包含統一對象的多份拷貝,那麼最好啟用這個。如果沒有這種情況,那就禁用以提高性能。 |
spark.kryo.registrationRequired |
false | Kryo序列化時,是否必須事先注冊。如果設為true,那麼Kryo遇到沒有注冊過的類型,就會拋異常。如果設為false(默認)Kryo會序列化未注冊類型的對象,但會有比較明顯的性能影響,所以啟用這個選項,可以強製必須在序列化前,注冊可序列化類型。 |
spark.kryo.registrator |
(none) | 如果你使用Kryo序列化,用這個class來注冊你的自定義類型。如果你需要自定義注冊方式,這個參數很有用。否則,使用 spark.kryo.classesRegister更簡單。要設置這個參數,需要用KryoRegistrator的子類。詳見: tuning guide 。 |
spark.kryoserializer.buffer.max |
64m | 最大允許的Kryo序列化buffer。必須必你所需要序列化的對象要大。如果你在Kryo中看到”buffer limit exceeded”這個異常,你就得增加這個值了。 |
spark.kryoserializer.buffer |
64k | Kryo序列化的初始buffer大小。注意,每台worker上對應每個core會有一個buffer。buffer最大增長到 spark.kryoserializer.buffer.max |
spark.rdd.compress |
false | 是否壓縮序列化後RDD的分區(如:StorageLevel.MEMORY_ONLY_SER)。能節省大量空間,但多消耗一些CPU。 |
spark.serializer |
org.apache.spark.serializer. JavaSerializer (org.apache.spark.serializer. KryoSerializer when using Spark SQL Thrift Server) |
用於序列化對象的類,序列化後的數據將通過網絡傳輸,或從緩存中反序列化回來。默認的Java序列化使用java的Serializable接口,但速度較慢,所以我們建議使用usingorg.apache.spark.serializer.KryoSerializer and configuring Kryo serialization如果速度需要保證的話。當然你可以自定義一個序列化器,通過繼承org.apache.spark.Serializer . |
spark.serializer.objectStreamReset |
100 | 如果使用org.apache.spark.serializer.JavaSerializer做序列化器,序列化器緩存這些對象,以避免輸出多餘數據,然而,這個會打斷垃圾回收。通過調用reset來flush序列化器,從而使老對象被回收。要禁用這一周期性reset,需要把這個參數設為-1,。默認情況下,序列化器會每過100個對象,被reset一次。 |
轉載自 並發編程網 - ifeve.com
最後更新:2017-05-19 16:38:20