《Spark 官方文檔》Spark配置(二)
內存管理
屬性名 | 默認值 | 含義 |
---|---|---|
spark.memory.fraction |
0.75 | 堆內存中用於執行、混洗和存儲(緩存)的比例。這個值越低,則執行中溢出到磁盤越頻繁,同時緩存被逐出內存也更頻繁。這個配置的目的,是為了留出用戶自定義數據結構、內部元數據使用的內存。推薦使用默認值。請參考this description. |
spark.memory.storageFraction |
0.5 | 不會被逐出內存的總量,表示一個相對於 spark.memory.fraction的比例。這個越高,那麼執行混洗等操作用的內存就越少,從而溢出磁盤就越頻繁。推薦使用默認值。更詳細請參考 this description. |
spark.memory.offHeap.enabled |
true | 如果true,Spark會嚐試使用堆外內存。啟用 後,spark.memory.offHeap.size必須為正數。 |
spark.memory.offHeap.size |
0 | 堆外內存分配的大小(絕對值)。這個設置不會影響堆內存的使用,所以你的執行器總內存必須適應JVM的堆內存大小。必須要設為正數。並且前提是 spark.memory.offHeap.enabled=true. |
spark.memory.useLegacyMode |
false | 是否使用老式的內存管理模式(1.5以及之前)。老模式在堆內存管理上更死板,使用固定劃分的區域做不同功能,潛在的會導致過多的數據溢出到磁盤(如果不小心調整性能)。必須啟用本參數,以下選項才可用:
spark.shuffle.memoryFraction spark.storage.memoryFraction spark.storage.unrollFraction
|
spark.shuffle.memoryFraction |
0.2 | (廢棄)必須先啟用spark.memory.useLegacyMode這個才有用。 混洗階段用於聚合和協同分組的JVM堆內存比例。在任何指定的時間,所有用於混洗的內存總和不會超過這個上限,超出的部分會溢出到磁盤上。如果溢出台頻繁,考慮增加spark.storage.memoryFraction的大小。 |
spark.storage.memoryFraction |
0.6 | (廢棄)必須先啟用spark.memory.useLegacyMode這個才有用。 Spark用於緩存數據的對內存比例。這個值不應該比JVM 老生代(old generation)對象所占用的內存大,默認是60%的堆內存,當然你可以增加這個值,同時配置你所用的老生代對象占用內存大小。 |
spark.storage.unrollFraction |
0.2 | (廢棄)必須先啟用spark.memory.useLegacyMode這個才有用。 Spark塊展開的內存占用比例。如果沒有足夠的內存來完整展開新的塊,那麼老的塊將被拋棄。 |
執行行為
屬性名 | 默認值 | 含義 |
---|---|---|
spark.broadcast.blockSize |
4m | TorrentBroadcastFactory每個分片大小。太大會減少廣播時候的並發數(更慢了);如果太小,BlockManager可能會給出性能提示。 |
spark.broadcast.factory |
org.apache.spark.broadcast. TorrentBroadcastFactory |
廣播算法的實現。 |
spark.cleaner.ttl |
(infinite) | Spark記住任意元數據的保留時間(秒)。周期性的清理能保證比這個更老的元數據將被遺忘(刪除)。這對於長期運行的Spark作業非常有用(如,一些7*24運行)。注意,RDD持久化到內存中後,過了這麼長時間以後,也會被清理掉(這。。。是不是有點坑!)。 |
spark.executor.cores |
YARN模式下默認1;如果是獨立部署,則是worker節點上所有可用的core。 | 單個執行器可用的core數。僅針對YARN和獨立部署模式。獨立部署時,單個worker節點上會運行多個執行器(executor),隻要worker上有足夠的core。否則,每個應用在單個worker上隻會啟動一個執行器。 |
spark.default.parallelism |
對於reduceByKey和join這樣的分布式混洗(shuffle)算子,等於父RDD中最大的分區。對於parallelize這樣沒有父RDD的算子,則取決於集群管理器:
|
如果用戶沒有在參數裏指定,這個屬性是默認的RDD transformation算子分區數,如:join,reduceByKey,parallelize等。 |
spark.executor.heartbeatInterval |
10s | 執行器心跳間隔(報告心跳給驅動器)。心跳機製使驅動器了解哪些執行器還活著,並且可以從心跳數據中獲得執行器的度量數據。 |
spark.files.fetchTimeout |
60s | 獲取文件的通訊超時,所獲取的文件是通過在驅動器上調用SparkContext.addFile()添加的。 |
spark.files.useFetchCache |
true | 如果設為true(默認),則同一個spark應用的不同執行器之間,會使用一二共享緩存來拉取文件,這樣可以提升同一主機上運行多個執行器時候,任務啟動的性能。如果設為false,這個優化就被禁用,各個執行器將使用自己獨有的緩存,他們拉取的文件也是各自有一份拷貝。如果在NFS文件係統上使用本地文件係統,可以禁用掉這個優化(參考SPARK-6313) |
spark.files.overwrite |
false | SparkContext.addFile()添加的文件已經存在,且內容不匹配的情況下,是否覆蓋。 |
spark.hadoop.cloneConf |
false | 如設為true,對每個任務複製一份Hadoop Configuration對象。啟用這個可以繞過Configuration線程安全問題(SPARK-2546 )。默認這個是禁用的,很多job並不會受這個issue的影響。 |
spark.hadoop.validateOutputSpecs |
true | 如設為true,在saveAsHadoopFile及其變體的時候,將會驗證輸出(例如,檢查輸出目錄是否存在)。對於已經驗證過或確認存在輸出目錄的情況,可以禁用這個。我們建議不要禁用,除非你確定需要和之前的spark版本兼容。可以簡單的利用Hadoop 文件係統API手動刪掉已存在的輸出目錄。這個設置會被Spark Streaming StreamingContext生成的job忽略,因為Streaming需要在回複檢查點的時候,覆蓋已有的輸出目錄。 |
spark.storage.memoryMapThreshold |
2m | spark從磁盤上讀取一個塊後,映射到內存塊的最小大小。這阻止了spark映射過小的內存塊。通常,內存映射塊是有開銷的,應該比接近或小於操作係統的頁大小。 |
spark.externalBlockStore.blockManager |
org.apache.spark.storage.TachyonBlockManager | 用於存儲RDD的外部塊管理器(文件係統)的實現。 文件係統URL由spark.externalBlockStore.url決定。 |
spark.externalBlockStore.baseDir |
System.getProperty(“java.io.tmpdir”) | 外部塊存儲存放RDD的目錄。文件係統URL由spark.externalBlockStore.url決定。也可以是逗號分隔的目錄列表(Tachyon文件係統) |
spark.externalBlockStore.url |
tachyon://localhost:19998 for Tachyon | 所使用的外部塊存儲文件係統URL。 |
網絡
屬性名 | 默認值 | 含義 |
---|---|---|
spark.akka.frameSize |
128 | “control plane” 通訊中所允許的最大消息大小(MB)。通常,隻應用於map輸出數據的大小信息,這些信息會在執行器和驅動器之間傳遞。如果你的job包含幾千個map和reduce任務,你可能需要增大這個設置。 |
spark.akka.heartbeat.interval |
1000s | 設這麼大的值,是為了禁用Akka傳輸失敗檢測器。也可以重新啟用,如果你想用這個特性(但不建議)。設成較大的值可以減少網絡開銷,而較小的值(1秒左右)可能會對Akka的失敗檢測更有用。如有需要,可以調整這個值和spark.akka.heartbeat.pauses的組合。一種可能需要使用失敗檢測的情形是:用一個敏感的失敗檢測,可以快速識別並逐出不穩定的執行器。然而,在真實的spark集群中,這通常不是GC暫停或網絡延遲造成的。除此之外,啟用這個還會導致過多的心跳數據交換,從而造成網絡洪峰。 |
spark.akka.heartbeat.pauses |
6000s | 設這麼大的值,是為了禁用Akka傳輸失敗檢測器。也可以重新啟用,如果你想用這個特性(但不建議)。這個是可接受的Akka心跳暫停時間。這個可以用來控製對GC暫停敏感程度。如有需要,可以調整這個值和spark.akka.heartbeat.interval的組合。 |
spark.akka.threads |
4 | 用於通訊的actor線程數。如果驅動器機器上有很多CPU core,你可以適當增大這個值。 |
spark.akka.timeout |
100s | Spark節點之間通訊超時。 |
spark.blockManager.port |
(random) | 塊管理器(block manager)監聽端口。在驅動器和執行器上都有。 |
spark.broadcast.port |
(random) | 驅動器HTTP廣播server監聽端口。這和torrent廣播沒有關係。 |
spark.driver.host |
(local hostname) | 驅動器主機名。用於和執行器以及獨立部署時集群master通訊。 |
spark.driver.port |
(random) | 驅動器端口。用於和執行器以及獨立部署時集群master通訊。 |
spark.executor.port |
(random) | 執行器端口。用於和驅動器通訊。 |
spark.fileserver.port |
(random) | 驅動器HTTP文件server監聽端口。 |
spark.network.timeout |
120s | 所有網絡交互的默認超時。這個配置是以下屬性的默認值:spark.core.connection.ack.wait.timeout ,spark.akka.timeout ,spark.storage.blockManagerSlaveTimeoutMs ,spark.shuffle.io.connectionTimeout ,spark.rpc.askTimeout orspark.rpc.lookupTimeout
|
spark.port.maxRetries |
16 | 綁定一個端口的最大重試次數。如果指定了一個端口(非0),每個後續重試會在之前嚐試的端口基礎上加1,然後再重試綁定。本質上,這確定了一個綁定端口的範圍,就是 [start port, start port + maxRetries] |
spark.replClassServer.port |
(random) | 驅動器HTTP class server的監聽端口。隻和spark shell相關。 |
spark.rpc.numRetries |
3 | RPC任務最大重試次數。RPC任務最多重試這麼多次。 |
spark.rpc.retry.wait |
3s | RPC請求操作重試前等待時間。 |
spark.rpc.askTimeout |
120s | RPC請求操作超時等待時間。 |
spark.rpc.lookupTimeout |
120s | RPC遠程端點查詢超時。 |
調度
屬性名 | 默認值 | 含義 |
---|---|---|
spark.cores.max |
(not set) | 如果運行在獨立部署集群模式(standalone deploy cluster)或者Mesos集群粗粒度共享模式(Mesos cluster in “coarse-grained” sharing mode),這個值決定了spark應用可以使用的最大CPU總數(應用在整個集群中可用CPU總數,而不是單個機器)。如果不設置,那麼獨立部署時默認為spark.deploy.defaultCores,Mesos集群則默認無限製(即所有可用的CPU)。 |
spark.locality.wait |
3s | 為了數據本地性最長等待時間(spark會根據數據所在位置,盡量讓任務也啟動於相同的節點,然而可能因為該節點上資源不足等原因,無法滿足這個任務分配,spark最多等待這麼多時間,然後放棄數據本地性)。數據本地性有多個級別,每一級別都是等待這麼多時間(同一進程、同一節點、同一機架、任意)。你也可以為每個級別定義不同的等待時間,需要設置spark.locality.wait.node等。如果你發現任務數據本地性不佳,可以增加這個值,但通常默認值是ok的。 |
spark.locality.wait.node |
spark.locality.wait | 單獨定義同一節點數據本地性任務等待時間。你可以設為0,表示忽略節點本地性,直接跳到下一級別,即機架本地性(如果你的集群有機架信息)。 |
spark.locality.wait.process |
spark.locality.wait | 單獨定義同一進程數據本地性任務等待時間。這個參數影響試圖訪問特定執行器上的緩存數據的任務。 |
spark.locality.wait.rack |
spark.locality.wait | 單獨定義同一機架數據本地性等待時間。 |
spark.scheduler.maxRegisteredResourcesWaitingTime |
30s | 調度開始前,向集群管理器注冊使用資源的最大等待時間。 |
spark.scheduler.minRegisteredResourcesRatio |
0.8 for YARN mode; 0.0 for standalone mode and Mesos coarse-grained mode |
調度啟動前,需要注冊得到資源的最小比例(注冊到的資源數 / 需要資源總數)(YARN模式下,資源是執行器;獨立部署和Mesos粗粒度模式下時資源是CPU core【spark.cores.max是期望得到的資源總數】)。可以設為0.0~1.0的一個浮點數。不管job是否得到了最小資源比例,最大等待時間都是由spark.scheduler.maxRegisteredResourcesWaitingTime控製的。 |
spark.scheduler.mode |
FIFO | 提交到同一個SparkContext上job的調度模式(scheduling mode)。另一個可接受的值是FAIR,而FIFO隻是簡單的把job按先來後到排隊。對於多用戶服務很有用。 |
spark.scheduler.revive.interval |
1s | 調度器複活worker的間隔時間。 |
spark.speculation |
false | 如果設為true,將會啟動推測執行任務。這意味著,如果stage中有任務執行較慢,他們會被重新調度到別的節點上執行。 |
spark.speculation.interval |
100ms | Spark檢查慢任務的時間間隔。 |
spark.speculation.multiplier |
1.5 | 比任務平均執行時間慢多少倍的任務會被認為是慢任務。 |
spark.speculation.quantile |
0.75 | 對於一個stage來說,完成多少百分比才開始檢查慢任務,並啟動推測執行任務。 |
spark.task.cpus |
1 | 每個任務分配的CPU core。 |
spark.task.maxFailures |
4 | 單個任務最大失敗次數。應該>=1。最大重試次數 = spark.task.maxFailures – 1 |
動態分配
屬性名 | 默認值 | 含義 |
---|---|---|
spark.dynamicAllocation.enabled |
false |
是否啟用動態資源分配特性,啟用後,執行器的個數會根據工作負載動態的調整(增加或減少)。注意,目前在YARN模式下不用。更詳細信息,請參考: here該特性依賴於 spark.shuffle.service.enabled 的啟用。同時還和以下配置相關:spark.dynamicAllocation.minExecutors, spark.dynamicAllocation.maxExecutors以及 spark.dynamicAllocation.initialExecutors |
spark.dynamicAllocation |
60s | 動態分配特性啟用後,空閑時間超過該配置時間的執行器都會被移除。更詳細請參考這裏:description |
spark.dynamicAllocation.cachedExecutorIdleTimeout |
infinity | 動態分配特性啟用後,包含緩存數據的執行器如果空閑時間超過該配置設置的時間,則被移除。更詳細請參考:description |
spark.dynamicAllocation.initialExecutors |
spark .dynamicAllocation .minExecutors |
動態分配開啟後,執行器的初始個數 |
spark.dynamicAllocation.maxExecutors |
infinity | 動態分配開啟後,執行器個數的上限 |
spark.dynamicAllocation.minExecutors |
0 | 動態分配開啟後,執行器個數的下限 |
spark.dynamicAllocation.schedulerBacklogTimeout |
1s | 動態分配啟用後,如果有任務積壓的持續時間長於該配置設置的時間,則申請新的執行器。更詳細請參考:description |
spark.dynamicAllocation.sustainedSchedulerBacklogTimeout |
schedulerBacklogTimeout |
和spark.dynamicAllocation.schedulerBacklogTimeout類似,隻不過該配置對應於隨後持續的執行器申請。更詳細請參考: description |
安全
屬性名 | 默認值 | 含義 |
---|---|---|
spark.acls.enable |
false | 是否啟用Spark acls(訪問控製列表)。如果啟用,那麼將會檢查用戶是否有權限查看或修改某個作業(job)。注意,檢查的前提是需要知道用戶是誰,所以如果用戶是null,則不會做任何檢查。你可以在Spark UI上設置過濾器(Filters)來做用戶認證,並設置用戶名。 |
spark.admin.acls |
Empty | 逗號分隔的用戶列表,在該列表中的用戶/管理員將能夠訪問和修改所有的Spark作業(job)。如果你的集群是共享的,並且有集群管理員,還有需要調試的開發人員,那麼這個配置會很有用。如果想讓所有人都有管理員權限,隻需把該配置設置為”*” |
spark.authenticate |
false | 設置Spark是否認證集群內部連接。如果不是在YARN上運行,請參考 spark.authenticate.secret |
spark.authenticate.secret |
None | 設置Spark用於內部組件認證的秘鑰。如果不是在YARN上運行,且啟用了 spark.authenticate,那麼該配置必須設置 |
spark.authenticate.enableSaslEncryption |
false | 是否對Spark內部組件認證使用加密通信。該配置目前隻有 block transfer service 使用。 |
spark.network.sasl.serverAlwaysEncrypt |
false | 是否對支持SASL認證的service禁用非加密通信。該配置目前隻有 external shuffle service 支持。 |
spark.core.connection.ack.wait.timeout |
60s | 網絡連接等待應答信號的超時時間。為了避免由於GC等導致的意外超時,你可以設置一個較大的值。 |
spark.core.connection.auth.wait.timeout |
30s | 網絡連接等待認證的超時時間。 |
spark.modify.acls |
Empty | 逗號分隔的用戶列表,在改列表中的用戶可以修改Spark作業。默認情況下,隻有啟動該Spark作業的用戶可以修改之(比如殺死該作業)。如果想要任何用戶都可以修改作業,請將該配置設置為”*” |
spark.ui.filters |
None |
逗號分隔的過濾器class列表,這些過濾器將用於Spark web UI。這裏的過濾器應該是一個標準的 javax servlet Filter 。每個過濾器的參數可以通過java係統屬性來設置,如下:spark.<class name of filer>.params=’param1=value1,param2=value2’例如: -Dspark.ui.filters=com.test.filter1 -Dspark.com.test.filter1.params=’param1=foo,param2=testing’ |
spark.ui.view.acls |
Empty | 逗號分隔的用戶列表,在該列表中的用戶可以查看Spark web UI。默認,隻有啟動該Spark作業的用戶可以查看之。如果需要讓所有用戶都能查看,隻需將該配置設為”*” |
加密
屬性名 | 默認值 | 含義 |
---|---|---|
spark.ssl.enabled |
false | 是否啟用SSL連接(在所有所支持的協議上)。所有SSL相關配置(spark.ssl.xxx,其中xxx是一個特定的配置屬性),都是全局的。如果需要在某些協議上覆蓋全局設置,那麼需要在該協議命名空間上進行單獨配置。使用 spark.ssl.YYY.XXX 來為協議YYY覆蓋全局配置XXX。目前YYY的可選值有 akka(用於基於AKKA框架的網絡連接) 和 fs(用於應廣播和文件服務器) |
spark.ssl.enabledAlgorithms |
Empty | 逗號分隔的加密算法列表。這些加密算法必須是JVM所支持的。這裏有個可用加密算法參考列表: this |
spark.ssl.keyPassword |
None | 在key-store中私匙對應的密碼。 |
spark.ssl.keyStore |
None | key-store文件路徑。可以是絕對路徑,或者以本組件啟動的工作目錄為基礎的相對路徑。 |
spark.ssl.keyStorePassword |
None | key-store的密碼。 |
spark.ssl.protocol |
None | 協議名稱。該協議必須是JVM所支持的。這裏有JVM支持的協議參考列表:this |
spark.ssl.trustStore |
None | trust-store文件路徑。可以是絕對路徑,或者以本組件啟動的工作目錄為基礎的相對路徑。 |
spark.ssl.trustStorePassword |
None | trust-store的密碼 |
Spark Streaming [流式]
屬性名 | 默認值 | 含義 |
---|---|---|
spark.streaming.backpressure.enabled |
false | 是否啟用Spark Streaming 的內部反壓機製(spark 1.5以上支持)。啟用後,Spark Streaming會根據當前批次的調度延遲和處理時長來控製接收速率,這樣一來,係統的接收速度會和處理速度相匹配。該特性會在內部動態地設置接收速率。該速率的上限將由 spark.streaming.receiver.maxRate 和 spark.streaming.kafka.maxRatePerPartition 決定(如果它們設置了的話)。 |
spark.streaming.blockInterval |
200ms | 在將數據保存到Spark之前,Spark Streaming接收器組裝數據塊的時間間隔。建議不少於50ms。關於Spark Streaming編程指南細節,請參考 performance tuning 這一節。 |
spark.streaming.receiver.maxRate |
not set | 接收速度的最大速率(每秒記錄條數)。實際上,每個流每秒將消費這麼多條記錄。設置為0或者負數表示不限製速率。更多細節請參考: deployment guide |
spark.streaming.receiver.writeAheadLog.enable |
false | 是否啟用接收器預寫日誌。所有的輸入數據都會保存到預寫日誌中,這樣在驅動器失敗後,可以基於預寫日誌來恢複數據。更詳細請參考:deployment guide |
spark.streaming.unpersist |
true | 是否強製Spark Streaming 自動從內存中清理掉所生成並持久化的RDD。同時,Spark Streaming收到的原始數據也將會被自動清理掉。如果設置為false,那麼原始數據以及持久化的RDD將不會被自動清理,以便外部程序可以訪問這些數據。當然,這將導致Spark消耗更多的內存。 |
spark.streaming.stopGracefullyOnShutdown |
false | 如果設為true,Spark將會在JVM關閉時,優雅地關停StreamingContext,而不是立即關閉之。 |
spark.streaming.kafka.maxRatePerPartition |
not set | 在使用Kafka direct stream API時,從每個Kafka數據分區讀取數據的最大速率(每秒記錄條數)。更詳細請參考:Kafka Integration guide |
spark.streaming.kafka.maxRetries |
1 | 驅動器連續重試的最大次數,這個配置是為了讓驅動器找出每個Kafka分區上的最大offset(默認值為1,意味著驅動器將最多嚐試2次)。隻對新的Kafka direct stream API有效。 |
spark.streaming.ui.retainedBatches |
1000 | Spark Streaming UI 以及 status API 中保留的最大批次個數。 |
SparkR
屬性名 | 默認值 | 含義 |
---|---|---|
spark.r.numRBackendThreads |
2 | SparkR RBackEnd處理RPC調用的後台線程數 |
spark.r.command |
Rscript | 集群模式下,驅動器和worker上執行的R腳本可執行文件 |
spark.r.driver.command |
spark.r.command | client模式的驅動器執行的R腳本。集群模式下會忽略 |
集群管理器
每個集群管理器都有一些額外的配置選項。詳細請參考這裏:
YARN
Mesos
Standalone Mode
環境變量
有些Spark設置需要通過環境變量來設定,這些環境變量可以在${SPARK_HOME}/conf/spark-env.sh腳本(Windows下是conf/spark-env.cmd)中設置。如果是獨立部署或者Mesos模式,這個文件可以指定機器相關信息(如hostname)。運行本地Spark應用或者submit腳本時,也會引用這個文件。
注意,conf/spark-env.sh默認是不存在的。你需要複製conf/spark-env.sh.template這個模板來創建,還有注意給這個文件附上可執行權限。
以下變量可以在spark-env.sh中設置:
環境變量 | 含義 |
---|---|
JAVA_HOME |
Java安裝目錄(如果沒有在PATH變量中指定) |
PYSPARK_PYTHON |
驅動器和worker上使用的Python二進製可執行文件(默認是python) |
PYSPARK_DRIVER_PYTHON |
僅在驅動上使用的Python二進製可執行文件(默認同PYSPARK_PYTHON) |
SPARKR_DRIVER_R |
SparkR shell使用的R二進製可執行文件(默認是R) |
SPARK_LOCAL_IP |
本地綁定的IP |
SPARK_PUBLIC_DNS |
Spark程序公布給其他機器的hostname |
另外,還有一些選項需要在Spark standalone cluster scripts裏設置,如:每台機器上使用的core數量,和最大內存占用量。
spark-env.sh是一個shell腳本,因此一些參數可以通過編程方式來設定 – 例如,你可以獲取本機IP來設置SPARK_LOCAL_IP。
日誌配置
Spark使用log4j 打日誌。你可以在conf目錄下用log4j.properties來配置。複製該目錄下已有的log4j.properties.template並改名為log4j.properties即可。
覆蓋配置目錄
默認Spark配置目錄是”${SPARK_HOME}/conf”,你也可以通過 ${SPARK_CONF_DIR}指定其他目錄。Spark會從這個目錄下讀取配置文件(spark-defaults.conf,spark-env.sh,log4j.properties等)
繼承Hadoop集群配置
如果你打算用Spark從HDFS讀取數據,那麼有2個Hadoop配置文件必須放到Spark的classpath下:
- hdfs-site.xml,配置HDFS客戶端的默認行為
- core-site.xml,默認文件係統名
這些配置文件的路徑在不同發布版本中不太一樣(如CDH和HDP版本),但通常都能在 ${HADOOP_HOME}/etc/hadoop/conf目錄下找到。一些工具,如Cloudera Manager,可以動態修改配置,而且提供了下載一份拷貝的機製。
要想讓這些配置對Spark可見,請在${SPARK_HOME}/spark-env.sh中設置HADOOP_CONF_DIR變量。
最後更新:2017-05-19 17:01:55