閱讀252 返回首頁    go 阿裏雲 go 技術社區[雲棲]


《Spark 官方文檔》Spark配置(一)

Spark配置

Spark有以下三種方式修改配置:

  • Spark properties (Spark屬性)可以控製絕大多數應用程序參數,而且既可以通過 SparkConf 對象來設置,也可以通過Java係統屬性來設置。
  • Environment variables (環境變量)可以指定一些各個機器相關的設置,如IP地址,其設置方法是寫在每台機器上的conf/spark-env.sh中。
  • Logging (日誌)可以通過log4j.properties配置日誌。

Spark屬性

Spark屬性可以控製大多數的應用程序設置,並且每個應用的設定都是分開的。這些屬性可以用SparkConf 對象直接設定。SparkConf為一些常用的屬性定製了專用方法(如,master URL和application name),其他屬性都可以用鍵值對做參數,調用set()方法來設置。例如,我們可以初始化一個包含2個本地線程的Spark應用,代碼如下:

注意,local[2]代表2個本地線程 – 這是最小的並發方式,可以幫助我們發現一些隻有在分布式上下文才能複現的bug。

val conf = new SparkConf()
             .setMaster("local[2]")
             .setAppName("CountingSheep")
val sc = new SparkContext(conf)

注意,本地模式下,我們可以使用n個線程(n >= 1)。而且在像Spark Streaming這樣的場景下,我們可能需要多個線程來防止類似線程餓死這樣的問題。

配置時間段的屬性應該寫明時間單位,如下格式都是可接受的:

25ms (milliseconds)
5s (seconds)
10m or 10min (minutes)
3h (hours)
5d (days)
1y (years)

配置大小的屬性也應該寫明單位,如下格式都是可接受的:

1b (bytes)
1k or 1kb (kibibytes = 1024 bytes)
1m or 1mb (mebibytes = 1024 kibibytes)
1g or 1gb (gibibytes = 1024 mebibytes)
1t or 1tb (tebibytes = 1024 gibibytes)
1p or 1pb (pebibytes = 1024 tebibytes)

動態加載Spark屬性

在某些場景下,你可能需要避免將屬性值寫死在 SparkConf 中。例如,你可能希望在同一個應用上使用不同的master或不同的內存總量。Spark允許你簡單地創建一個空的SparkConf對象:

val sc = new SparkContext(new SparkConf())

然後在運行時設置這些屬性:

./bin/spark-submit --name "My app" --master local[4] --conf spark.eventLog.enabled=false
  --conf "spark.executor.extraJavaOptions=-XX:+PrintGCDetails -XX:+PrintGCTimeStamps" myApp.jar

Spark shell和spark-submit工具支持兩種動態加載配置的方法。第一種,通過命令行選項,如:上麵提到的–master(設置master URL)。spark-submit可以在啟動Spark應用時,通過–conf標誌接受任何屬性配置,同時有一些特殊配置參數同樣可用(如,–master)。運行./bin/spark-submit –help可以展示這些選項的完整列表。

同時,bin/spark-submit 也支持從conf/spark-defaults.conf 中讀取配置選項,在該文件中每行是一個鍵值對,並用空格分隔,如下:

spark.master            spark://5.6.7.8:7077
spark.executor.memory   4g
spark.eventLog.enabled  true
spark.serializer        org.apache.spark.serializer.KryoSerializer

這些通過參數或者屬性配置文件傳遞的屬性,最終都會在SparkConf 中合並。其優先級是:首先是SparkConf代碼中寫的屬性值,其次是spark-submit或spark-shell的標誌參數,最後是spark-defaults.conf文件中的屬性。

有一些配置項被重命名過,這種情形下,老的名字仍然是可以接受的,隻是優先級比新名字優先級低。

查看Spark屬性

每個SparkContext都有其對應的Spark UI,所以Spark應用程序都能通過Spark UI查看其屬性。默認你可以在這裏看到:https://<driver>:4040,頁麵上的”Environment“ tab頁可以查看Spark屬性。如果你真的想確認一下屬性設置是否正確的話,這個功能就非常有用了。注意,隻有顯式地通過SparkConf對象、在命令行參數、或者spark-defaults.conf設置的參數才會出現在頁麵上。其他屬性,你可以認為都是默認值。

可用的屬性

絕大多數屬性都有合理的默認值。這裏是部分常用的選項:

應用屬性

屬性名稱 默認值 含義
spark.app.name (none) Spark應用的名字。會在SparkUI和日誌中出現。
spark.driver.cores 1 在cluster模式下,用幾個core運行驅動器(driver)進程。
spark.driver.maxResultSize 1g Spark action算子返回的結果最大多大。至少要1M,可以設為0表示無限製。如果結果超過這一大小,Spark作業(job)會直接中斷退出。但是,設得過高有可能導致驅動器OOM(out-of-memory)(取決於spark.driver.memory設置,以及驅動器JVM的內存限製)。設一個合理的值,以避免驅動器OOM。
spark.driver.memory 1g 驅動器進程可以用的內存總量(如:1g,2g)。
注意,在客戶端模式下,這個配置不能在SparkConf中直接設置(因為驅動器JVM都啟動完了呀!)。驅動器客戶端模式下,必須要在命令行裏用 –driver-memory 或者在默認屬性配置文件裏設置。
spark.executor.memory 1g 單個執行器(executor)使用的內存總量(如,2g,8g)
spark.extraListeners (none) 逗號分隔的SparkListener子類的類名列表;初始化SparkContext時,這些類的實例會被創建出來,並且注冊到Spark的監聽總線上。如果這些類有一個接受SparkConf作為唯一參數的構造函數,那麼這個構造函數會被優先調用;否則,就調用無參數的默認構造函數。如果沒有構造函數,SparkContext創建的時候會拋異常。
spark.local.dir /tmp Spark的”草稿“目錄,包括map輸出的臨時文件,或者RDD存在磁盤上的數據。這個目錄最好在本地文件係統中,這樣讀寫速度快。這個配置可以接受一個逗號分隔的列表,通常用這種方式將文件IO分散不同的磁盤上去。
注意:Spark-1.0及以後版本中,這個屬性會被集群管理器所提供的環境變量覆蓋:SPARK_LOCAL_DIRS(獨立部署或Mesos)或者 LOCAL_DIRS(YARN)。
spark.logConf false SparkContext啟動時是否把生效的 SparkConf 屬性以INFO日誌打印到日誌裏
spark.master (none) 集群管理器URL。參考allowed master URL’s.

除了這些以外,以下還有很多可用的參數配置,在某些特定情形下,可能會用到:

運行時環境

屬性名 默認值 含義
spark.driver.extraClassPath (none) 額外的classpath,將插入到到驅動器的classpath開頭。
注意:驅動器如果運行客戶端模式下,這個配置不能通過SparkConf 在程序裏配置,因為這時候程序已經啟動呀!而是應該用命令行參數(–driver-class-path)或者在 conf/spark-defaults.conf 配置。
spark.driver.extraJavaOptions (none) 驅動器額外的JVM選項。如:GC設置或其他日誌參數。
注意:驅動器如果運行客戶端模式下,這個配置不能通過SparkConf在程序裏配置,因為這時候程序已經啟動呀!而是應該用命令行參數(–driver-java-options)或者conf/spark-defaults.conf 配置。
spark.driver.extraLibraryPath (none) 啟動驅動器JVM時候指定的依賴庫路徑。
注意:驅動器如果運行客戶端模式下,這個配置不能通過SparkConf在程序裏配置,因為這時候程序已經啟動呀!而是應該用命令行參數(–driver-library-path)或者conf/spark-defaults.conf 配置。
spark.driver.userClassPathFirst false (試驗性的:即未來不一定會支持該配置) 驅動器是否首選使用用戶指定的jars,而不是spark自身的。這個特性可以用來處理用戶依賴和spark本身依賴項之間的衝突。目前還是試驗性的,並且隻能用在集群模式下。
spark.executor.extraClassPath (none) 添加到執行器(executor)classpath開頭的classpath。主要為了向後兼容老的spark版本,不推薦使用。
spark.executor.extraJavaOptions (none) 傳給執行器的額外JVM參數。如:GC設置或其他日誌設置等。注意,不能用這個來設置Spark屬性或者堆內存大小。Spark屬性應該用SparkConf對象,或者spark-defaults.conf文件(會在spark-submit腳本中使用)來配置。執行器堆內存大小應該用 spark.executor.memory配置。
spark.executor.extraLibraryPath (none) 啟動執行器JVM時使用的額外依賴庫路徑。
spark.executor.logs.rolling.maxRetainedFiles (none) Sets the number of latest rolling log files that are going to be retained by the system. Older log files will be deleted. Disabled by default.設置日誌文件最大保留個數。老日誌文件將被幹掉。默認禁用的。
spark.executor.logs.rolling.maxSize (none) 設置執行器日誌文件大小上限。默認禁用的。
需要自動刪日誌請參考 spark.executor.logs.rolling.maxRetainedFiles.
spark.executor.logs.rolling.strategy (none) 執行器日誌滾動策略。默認禁用。
可接受的值有”time”(基於時間滾動) 或者 “size”(基於文件大小滾動)。
time:將使用 spark.executor.logs.rolling.time.interval設置滾動時間間隔
size:將使用 spark.executor.logs.rolling.size.maxBytes設置文件大小上限
spark.executor.logs.rolling.time.interval daily 設置執行器日誌滾動時間間隔。日誌滾動默認是禁用的。
可用的值有 “daily”, “hourly”, “minutely”,也可設為數字(則單位為秒)。
關於日誌自動清理,請參考 spark.executor.logs.rolling.maxRetainedFiles
spark.executor.userClassPathFirst false (試驗性的)與 spark.driver.userClassPathFirst類似,隻不過這個參數將應用於執行器
spark.executorEnv.[EnvironmentVariableName] (none) 向執行器進程增加名為EnvironmentVariableName的環境變量。用戶可以指定多個來設置不同的環境變量。
spark.python.profile false 對Python worker啟用性能分析,性能分析結果會在sc.show_profile()中,或者在驅動器退出前展示。也可以用sc.dump_profiles(path)輸出到磁盤上。如果部分分析結果被手動展示過,那麼驅動器退出前就不再自動展示了。默認會使用pyspark.profiler.BasicProfiler,也可以自己傳一個profiler 類參數給SparkContext構造函數。
spark.python.profile.dump (none) 這個目錄是用來在驅動器退出前,dump性能分析結果。性能分析結果會按RDD分別dump。同時可以使用ptats.Stats()來裝載。如果製定了這個,那麼分析結果就不再自動展示。
spark.python.worker.memory 512m 聚合時每個python worker使用的內存總量,和JVM的內存字符串格式相同(如,512m,2g)。如果聚合時使用的內存超過這個量,就將數據溢出到磁盤上。
spark.python.worker.reuse true 是否複用Python worker。如果是,則每個任務會啟動固定數量的Python worker,並且不需要fork() python進程。如果需要廣播的數據量很大,設為true能大大減少廣播數據量,因為需要廣播的進程數減少了。

混洗行為

屬性名 默認值 含義
spark.reducer.maxSizeInFlight 48m map任務輸出同時reduce任務獲取的最大內存占用量。每個輸出需要創建buffer來接收,對於每個reduce任務來說,有一個固定的內存開銷上限,所以最好別設太大,除非你內存非常大。
spark.shuffle.compress true 是否壓縮map任務的輸出文件。通常來說,壓縮是個好主意。使用的壓縮算法取決於 spark.io.compression.codec
spark.shuffle.file.buffer 32k 每個混洗輸出流的內存buffer大小。這個buffer能減少混洗文件的創建和磁盤尋址。
spark.shuffle.io.maxRetries 3 (僅對netty)如果IO相關異常發生,重試次數(如果設為非0的話)。重試能是大量數據的混洗操作更加穩定,因為重試可以有效應對長GC暫停或者網絡閃斷。
spark.shuffle.io.numConnectionsPerPeer 1 (僅netty)主機之間的連接是複用的,這樣可以減少大集群中重複建立連接的次數。然而,有些集群是機器少,磁盤多,這種集群可以考慮增加這個參數值,以便充分利用所有磁盤並發性能。
spark.shuffle.io.preferDirectBufs true (僅netty)堆外緩存可以有效減少垃圾回收和緩存複製。對於堆外內存緊張的用戶來說,可以考慮禁用這個選項,以迫使netty所有內存都分配在堆上。
spark.shuffle.io.retryWait 5s (僅netty)混洗重試獲取數據的間隔時間。默認最大重試延遲是15秒,設置這個參數後,將變成maxRetries* retryWait。
spark.shuffle.manager sort 混洗數據的實現方式。可用的有”sort”和”hash“。基於排序(sort)的混洗內存利用率更高,並且從1.2開始已經是默認值了。
spark.shuffle.service.enabled false 啟用外部混洗服務。啟用外部混洗服務後,執行器生成的混洗中間文件就由該服務保留,這樣執行器就可以安全的退出了。如果 spark.dynamicAllocation.enabled啟用了,那麼這個參數也必須啟用,這樣動態分配才能有外部混洗服務可用。
更多請參考:dynamic allocation configuration and setup documentation
spark.shuffle.service.port 7337 外部混洗服務對應端口
spark.shuffle.sort.bypassMergeThreshold 200 (高級)在基於排序(sort)的混洗管理器中,如果沒有map端聚合的話,就會最多存在這麼多個reduce分區。
spark.shuffle.spill.compress true 是否在混洗階段壓縮溢出到磁盤的數據。壓縮算法取決於spark.io.compression.codec

Spark UI

屬性名 默認值 含義
spark.eventLog.compress false 是否壓縮事件日誌(當然spark.eventLog.enabled必須開啟)
spark.eventLog.dir file:///tmp/spark-events Spark events日誌的基礎目錄(當然spark.eventLog.enabled必須開啟)。在這個目錄中,spark會給每個應用創建一個單獨的子目錄,然後把應用的events log打到子目錄裏。用戶可以設置一個統一的位置(比如一個HDFS目錄),這樣history server就可以從這裏讀取曆史文件。
spark.eventLog.enabled false 是否啟用Spark事件日誌。如果Spark應用結束後,仍需要在SparkUI上查看其狀態,必須啟用這個。
spark.ui.killEnabled true 允許從SparkUI上殺掉stage以及對應的作業(job)
spark.ui.port 4040 SparkUI端口,展示應用程序運行狀態。
spark.ui.retainedJobs 1000 SparkUI和status API最多保留多少個spark作業的數據(當然是在垃圾回收之前)
spark.ui.retainedStages 1000 SparkUI和status API最多保留多少個spark步驟(stage)的數據(當然是在垃圾回收之前)
spark.worker.ui.retainedExecutors 1000 SparkUI和status API最多保留多少個已結束的執行器(executor)的數據(當然是在垃圾回收之前)
spark.worker.ui.retainedDrivers 1000 SparkUI和status API最多保留多少個已結束的驅動器(driver)的數據(當然是在垃圾回收之前)
spark.sql.ui.retainedExecutions 1000 SparkUI和status API最多保留多少個已結束的執行計劃(execution)的數據(當然是在垃圾回收之前)
spark.streaming.ui.retainedBatches 1000 SparkUI和status API最多保留多少個已結束的批量(batch)的數據(當然是在垃圾回收之前)

壓縮和序列化

屬性名 默認值 含義
spark.broadcast.compress true 是否在廣播變量前使用壓縮。通常是個好主意。
spark.closure.serializer org.apache.spark.serializer.
JavaSerializer
閉包所使用的序列化類。目前隻支持Java序列化。
spark.io.compression.codec snappy 內部數據使用的壓縮算法,如:RDD分區、廣播變量、混洗輸出。Spark提供了3中算法:lz4,lzf,snappy。你也可以使用全名來指定壓縮算法:org.apache.spark.io.LZ4CompressionCodec,
org.apache.spark.io.LZFCompressionCodec,
org.apache.spark.io.SnappyCompressionCodec.
spark.io.compression.lz4.blockSize 32k LZ4算法使用的塊大小。當然你需要先使用LZ4壓縮。減少塊大小可以減少混洗時LZ4算法占用的內存量。
spark.io.compression.snappy.blockSize 32k Snappy算法使用的塊大小(先得使用Snappy算法)。減少塊大小可以減少混洗時Snappy算法占用的內存量。
spark.kryo.classesToRegister (none) 如果你使用Kryo序列化,最好指定這個以提高性能(tuning guide)。
本參數接受一個逗號分隔的類名列表,這些類都會注冊為Kryo可序列化類型。
spark.kryo.referenceTracking true (false when using Spark SQL Thrift Server) 是否跟蹤同一對象在Kryo序列化的引用。如果你的對象圖中有循環護著包含統一對象的多份拷貝,那麼最好啟用這個。如果沒有這種情況,那就禁用以提高性能。
spark.kryo.registrationRequired false Kryo序列化時,是否必須事先注冊。如果設為true,那麼Kryo遇到沒有注冊過的類型,就會拋異常。如果設為false(默認)Kryo會序列化未注冊類型的對象,但會有比較明顯的性能影響,所以啟用這個選項,可以強製必須在序列化前,注冊可序列化類型。
spark.kryo.registrator (none) 如果你使用Kryo序列化,用這個class來注冊你的自定義類型。如果你需要自定義注冊方式,這個參數很有用。否則,使用 spark.kryo.classesRegister更簡單。要設置這個參數,需要用KryoRegistrator的子類。詳見:tuning guide 。
spark.kryoserializer.buffer.max 64m 最大允許的Kryo序列化buffer。必須必你所需要序列化的對象要大。如果你在Kryo中看到”buffer limit exceeded”這個異常,你就得增加這個值了。
spark.kryoserializer.buffer 64k Kryo序列化的初始buffer大小。注意,每台worker上對應每個core會有一個buffer。buffer最大增長到 spark.kryoserializer.buffer.max
spark.rdd.compress false 是否壓縮序列化後RDD的分區(如:StorageLevel.MEMORY_ONLY_SER)。能節省大量空間,但多消耗一些CPU。
spark.serializer org.apache.spark.serializer.
JavaSerializer (org.apache.spark.serializer.
KryoSerializer when using Spark SQL Thrift Server)
用於序列化對象的類,序列化後的數據將通過網絡傳輸,或從緩存中反序列化回來。默認的Java序列化使用java的Serializable接口,但速度較慢,所以我們建議使用usingorg.apache.spark.serializer.KryoSerializer and configuring Kryo serialization如果速度需要保證的話。當然你可以自定義一個序列化器,通過繼承org.apache.spark.Serializer.
spark.serializer.objectStreamReset 100 如果使用org.apache.spark.serializer.JavaSerializer做序列化器,序列化器緩存這些對象,以避免輸出多餘數據,然而,這個會打斷垃圾回收。通過調用reset來flush序列化器,從而使老對象被回收。要禁用這一周期性reset,需要把這個參數設為-1,。默認情況下,序列化器會每過100個對象,被reset一次。

轉載自 並發編程網 - ifeve.com

最後更新:2017-05-19 16:38:20

  上一篇:go  《大數據導論》理解大數據
  下一篇:go  阿裏雲NAS文件存儲部署方案介紹和對比