閱讀631 返回首頁    go 阿裏雲 go 技術社區[雲棲]


《Spark 官方文檔》Spark配置(二)

內存管理

屬性名 默認值 含義
spark.memory.fraction 0.75 堆內存中用於執行、混洗和存儲(緩存)的比例。這個值越低,則執行中溢出到磁盤越頻繁,同時緩存被逐出內存也更頻繁。這個配置的目的,是為了留出用戶自定義數據結構、內部元數據使用的內存。推薦使用默認值。請參考this description.
spark.memory.storageFraction 0.5 不會被逐出內存的總量,表示一個相對於 spark.memory.fraction的比例。這個越高,那麼執行混洗等操作用的內存就越少,從而溢出磁盤就越頻繁。推薦使用默認值。更詳細請參考 this description.
spark.memory.offHeap.enabled true 如果true,Spark會嚐試使用堆外內存。啟用 後,spark.memory.offHeap.size必須為正數。
spark.memory.offHeap.size 0 堆外內存分配的大小(絕對值)。這個設置不會影響堆內存的使用,所以你的執行器總內存必須適應JVM的堆內存大小。必須要設為正數。並且前提是 spark.memory.offHeap.enabled=true.
spark.memory.useLegacyMode false 是否使用老式的內存管理模式(1.5以及之前)。老模式在堆內存管理上更死板,使用固定劃分的區域做不同功能,潛在的會導致過多的數據溢出到磁盤(如果不小心調整性能)。必須啟用本參數,以下選項才可用:
spark.shuffle.memoryFraction
spark.storage.memoryFraction
spark.storage.unrollFraction
spark.shuffle.memoryFraction 0.2 (廢棄)必須先啟用spark.memory.useLegacyMode這個才有用。
混洗階段用於聚合和協同分組的JVM堆內存比例。在任何指定的時間,所有用於混洗的內存總和不會超過這個上限,超出的部分會溢出到磁盤上。如果溢出台頻繁,考慮增加spark.storage.memoryFraction的大小。
spark.storage.memoryFraction 0.6 (廢棄)必須先啟用spark.memory.useLegacyMode這個才有用。
Spark用於緩存數據的對內存比例。這個值不應該比JVM 老生代(old generation)對象所占用的內存大,默認是60%的堆內存,當然你可以增加這個值,同時配置你所用的老生代對象占用內存大小。
spark.storage.unrollFraction 0.2 (廢棄)必須先啟用spark.memory.useLegacyMode這個才有用。
Spark塊展開的內存占用比例。如果沒有足夠的內存來完整展開新的塊,那麼老的塊將被拋棄。

執行行為

屬性名 默認值 含義
spark.broadcast.blockSize 4m TorrentBroadcastFactory每個分片大小。太大會減少廣播時候的並發數(更慢了);如果太小,BlockManager可能會給出性能提示。
spark.broadcast.factory org.apache.spark.broadcast.
TorrentBroadcastFactory
廣播算法的實現。
spark.cleaner.ttl (infinite) Spark記住任意元數據的保留時間(秒)。周期性的清理能保證比這個更老的元數據將被遺忘(刪除)。這對於長期運行的Spark作業非常有用(如,一些7*24運行)。注意,RDD持久化到內存中後,過了這麼長時間以後,也會被清理掉(這。。。是不是有點坑!)。
spark.executor.cores YARN模式下默認1;如果是獨立部署,則是worker節點上所有可用的core。 單個執行器可用的core數。僅針對YARN和獨立部署模式。獨立部署時,單個worker節點上會運行多個執行器(executor),隻要worker上有足夠的core。否則,每個應用在單個worker上隻會啟動一個執行器。
spark.default.parallelism 對於reduceByKey和join這樣的分布式混洗(shuffle)算子,等於父RDD中最大的分區。對於parallelize這樣沒有父RDD的算子,則取決於集群管理器:

  • Local mode: number of cores on the local machine — 本地模式:機器的core數
  • Mesos fine grained mode: 8 — Mesos細粒度模式:8
  • Others: total number of cores on all executor nodes or 2, whichever is larger — 其他:所有執行器節點上core的數量 或者 2,這兩數取較大的
如果用戶沒有在參數裏指定,這個屬性是默認的RDD transformation算子分區數,如:join,reduceByKey,parallelize等。
spark.executor.heartbeatInterval 10s 執行器心跳間隔(報告心跳給驅動器)。心跳機製使驅動器了解哪些執行器還活著,並且可以從心跳數據中獲得執行器的度量數據。
spark.files.fetchTimeout 60s 獲取文件的通訊超時,所獲取的文件是通過在驅動器上調用SparkContext.addFile()添加的。
spark.files.useFetchCache true 如果設為true(默認),則同一個spark應用的不同執行器之間,會使用一二共享緩存來拉取文件,這樣可以提升同一主機上運行多個執行器時候,任務啟動的性能。如果設為false,這個優化就被禁用,各個執行器將使用自己獨有的緩存,他們拉取的文件也是各自有一份拷貝。如果在NFS文件係統上使用本地文件係統,可以禁用掉這個優化(參考SPARK-6313
spark.files.overwrite false SparkContext.addFile()添加的文件已經存在,且內容不匹配的情況下,是否覆蓋。
spark.hadoop.cloneConf false 如設為true,對每個任務複製一份Hadoop Configuration對象。啟用這個可以繞過Configuration線程安全問題(SPARK-2546 )。默認這個是禁用的,很多job並不會受這個issue的影響。
spark.hadoop.validateOutputSpecs true 如設為true,在saveAsHadoopFile及其變體的時候,將會驗證輸出(例如,檢查輸出目錄是否存在)。對於已經驗證過或確認存在輸出目錄的情況,可以禁用這個。我們建議不要禁用,除非你確定需要和之前的spark版本兼容。可以簡單的利用Hadoop 文件係統API手動刪掉已存在的輸出目錄。這個設置會被Spark Streaming StreamingContext生成的job忽略,因為Streaming需要在回複檢查點的時候,覆蓋已有的輸出目錄。
spark.storage.memoryMapThreshold 2m spark從磁盤上讀取一個塊後,映射到內存塊的最小大小。這阻止了spark映射過小的內存塊。通常,內存映射塊是有開銷的,應該比接近或小於操作係統的頁大小。
spark.externalBlockStore.blockManager org.apache.spark.storage.TachyonBlockManager 用於存儲RDD的外部塊管理器(文件係統)的實現。
文件係統URL由spark.externalBlockStore.url決定。
spark.externalBlockStore.baseDir System.getProperty(“java.io.tmpdir”) 外部塊存儲存放RDD的目錄。文件係統URL由spark.externalBlockStore.url決定。也可以是逗號分隔的目錄列表(Tachyon文件係統)
spark.externalBlockStore.url tachyon://localhost:19998 for Tachyon 所使用的外部塊存儲文件係統URL。

網絡

屬性名 默認值 含義
spark.akka.frameSize 128 “control plane” 通訊中所允許的最大消息大小(MB)。通常,隻應用於map輸出數據的大小信息,這些信息會在執行器和驅動器之間傳遞。如果你的job包含幾千個map和reduce任務,你可能需要增大這個設置。
spark.akka.heartbeat.interval 1000s 設這麼大的值,是為了禁用Akka傳輸失敗檢測器。也可以重新啟用,如果你想用這個特性(但不建議)。設成較大的值可以減少網絡開銷,而較小的值(1秒左右)可能會對Akka的失敗檢測更有用。如有需要,可以調整這個值和spark.akka.heartbeat.pauses的組合。一種可能需要使用失敗檢測的情形是:用一個敏感的失敗檢測,可以快速識別並逐出不穩定的執行器。然而,在真實的spark集群中,這通常不是GC暫停或網絡延遲造成的。除此之外,啟用這個還會導致過多的心跳數據交換,從而造成網絡洪峰。
spark.akka.heartbeat.pauses 6000s 設這麼大的值,是為了禁用Akka傳輸失敗檢測器。也可以重新啟用,如果你想用這個特性(但不建議)。這個是可接受的Akka心跳暫停時間。這個可以用來控製對GC暫停敏感程度。如有需要,可以調整這個值和spark.akka.heartbeat.interval的組合。
spark.akka.threads 4 用於通訊的actor線程數。如果驅動器機器上有很多CPU core,你可以適當增大這個值。
spark.akka.timeout 100s Spark節點之間通訊超時。
spark.blockManager.port (random) 塊管理器(block manager)監聽端口。在驅動器和執行器上都有。
spark.broadcast.port (random) 驅動器HTTP廣播server監聽端口。這和torrent廣播沒有關係。
spark.driver.host (local hostname) 驅動器主機名。用於和執行器以及獨立部署時集群master通訊。
spark.driver.port (random) 驅動器端口。用於和執行器以及獨立部署時集群master通訊。
spark.executor.port (random) 執行器端口。用於和驅動器通訊。
spark.fileserver.port (random) 驅動器HTTP文件server監聽端口。
spark.network.timeout 120s 所有網絡交互的默認超時。這個配置是以下屬性的默認值:
spark.core.connection.ack.wait.timeout,
spark.akka.timeout,
spark.storage.blockManagerSlaveTimeoutMs,
spark.shuffle.io.connectionTimeout,spark.rpc.askTimeout or
spark.rpc.lookupTimeout
spark.port.maxRetries 16 綁定一個端口的最大重試次數。如果指定了一個端口(非0),每個後續重試會在之前嚐試的端口基礎上加1,然後再重試綁定。本質上,這確定了一個綁定端口的範圍,就是 [start port, start port + maxRetries]
spark.replClassServer.port (random) 驅動器HTTP class server的監聽端口。隻和spark shell相關。
spark.rpc.numRetries 3 RPC任務最大重試次數。RPC任務最多重試這麼多次。
spark.rpc.retry.wait 3s RPC請求操作重試前等待時間。
spark.rpc.askTimeout 120s RPC請求操作超時等待時間。
spark.rpc.lookupTimeout 120s RPC遠程端點查詢超時。

調度

屬性名 默認值 含義
spark.cores.max (not set) 如果運行在獨立部署集群模式(standalone deploy cluster)或者Mesos集群粗粒度共享模式(Mesos cluster in “coarse-grained” sharing mode),這個值決定了spark應用可以使用的最大CPU總數(應用在整個集群中可用CPU總數,而不是單個機器)。如果不設置,那麼獨立部署時默認為spark.deploy.defaultCores,Mesos集群則默認無限製(即所有可用的CPU)。
spark.locality.wait 3s 為了數據本地性最長等待時間(spark會根據數據所在位置,盡量讓任務也啟動於相同的節點,然而可能因為該節點上資源不足等原因,無法滿足這個任務分配,spark最多等待這麼多時間,然後放棄數據本地性)。數據本地性有多個級別,每一級別都是等待這麼多時間(同一進程、同一節點、同一機架、任意)。你也可以為每個級別定義不同的等待時間,需要設置spark.locality.wait.node等。如果你發現任務數據本地性不佳,可以增加這個值,但通常默認值是ok的。
spark.locality.wait.node spark.locality.wait 單獨定義同一節點數據本地性任務等待時間。你可以設為0,表示忽略節點本地性,直接跳到下一級別,即機架本地性(如果你的集群有機架信息)。
spark.locality.wait.process spark.locality.wait 單獨定義同一進程數據本地性任務等待時間。這個參數影響試圖訪問特定執行器上的緩存數據的任務。
spark.locality.wait.rack spark.locality.wait 單獨定義同一機架數據本地性等待時間。
spark.scheduler.maxRegisteredResourcesWaitingTime 30s 調度開始前,向集群管理器注冊使用資源的最大等待時間。
spark.scheduler.minRegisteredResourcesRatio 0.8 for YARN mode;
0.0 for standalone mode and Mesos coarse-grained mode
調度啟動前,需要注冊得到資源的最小比例(注冊到的資源數 / 需要資源總數)(YARN模式下,資源是執行器;獨立部署和Mesos粗粒度模式下時資源是CPU core【spark.cores.max是期望得到的資源總數】)。可以設為0.0~1.0的一個浮點數。不管job是否得到了最小資源比例,最大等待時間都是由spark.scheduler.maxRegisteredResourcesWaitingTime控製的。
spark.scheduler.mode FIFO 提交到同一個SparkContext上job的調度模式(scheduling mode)。另一個可接受的值是FAIR,而FIFO隻是簡單的把job按先來後到排隊。對於多用戶服務很有用。
spark.scheduler.revive.interval 1s 調度器複活worker的間隔時間。
spark.speculation false 如果設為true,將會啟動推測執行任務。這意味著,如果stage中有任務執行較慢,他們會被重新調度到別的節點上執行。
spark.speculation.interval 100ms Spark檢查慢任務的時間間隔。
spark.speculation.multiplier 1.5 比任務平均執行時間慢多少倍的任務會被認為是慢任務。
spark.speculation.quantile 0.75 對於一個stage來說,完成多少百分比才開始檢查慢任務,並啟動推測執行任務。
spark.task.cpus 1 每個任務分配的CPU core。
spark.task.maxFailures 4 單個任務最大失敗次數。應該>=1。最大重試次數 = spark.task.maxFailures – 1

動態分配

屬性名 默認值 含義
spark.dynamicAllocation.enabled false 是否啟用動態資源分配特性,啟用後,執行器的個數會根據工作負載動態的調整(增加或減少)。注意,目前在YARN模式下不用。更詳細信息,請參考:here該特性依賴於 spark.shuffle.service.enabled 的啟用。同時還和以下配置相關:spark.dynamicAllocation.minExecutors, spark.dynamicAllocation.maxExecutors以及 spark.dynamicAllocation.initialExecutors
spark.dynamicAllocation
.executorIdleTimeout
60s 動態分配特性啟用後,空閑時間超過該配置時間的執行器都會被移除。更詳細請參考這裏:description
spark.dynamicAllocation.cachedExecutorIdleTimeout infinity 動態分配特性啟用後,包含緩存數據的執行器如果空閑時間超過該配置設置的時間,則被移除。更詳細請參考:description
spark.dynamicAllocation.initialExecutors spark
.dynamicAllocation
.minExecutors
動態分配開啟後,執行器的初始個數
spark.dynamicAllocation.maxExecutors infinity 動態分配開啟後,執行器個數的上限
spark.dynamicAllocation.minExecutors 0 動態分配開啟後,執行器個數的下限
spark.dynamicAllocation.schedulerBacklogTimeout 1s 動態分配啟用後,如果有任務積壓的持續時間長於該配置設置的時間,則申請新的執行器。更詳細請參考:description
spark.dynamicAllocation.sustainedSchedulerBacklogTimeout schedulerBacklogTimeout 和spark.dynamicAllocation.schedulerBacklogTimeout類似,隻不過該配置對應於隨後持續的執行器申請。更詳細請參考: description

安全

屬性名 默認值 含義
spark.acls.enable false 是否啟用Spark acls(訪問控製列表)。如果啟用,那麼將會檢查用戶是否有權限查看或修改某個作業(job)。注意,檢查的前提是需要知道用戶是誰,所以如果用戶是null,則不會做任何檢查。你可以在Spark UI上設置過濾器(Filters)來做用戶認證,並設置用戶名。
spark.admin.acls Empty 逗號分隔的用戶列表,在該列表中的用戶/管理員將能夠訪問和修改所有的Spark作業(job)。如果你的集群是共享的,並且有集群管理員,還有需要調試的開發人員,那麼這個配置會很有用。如果想讓所有人都有管理員權限,隻需把該配置設置為”*”
spark.authenticate false 設置Spark是否認證集群內部連接。如果不是在YARN上運行,請參考 spark.authenticate.secret
spark.authenticate.secret None 設置Spark用於內部組件認證的秘鑰。如果不是在YARN上運行,且啟用了 spark.authenticate,那麼該配置必須設置
spark.authenticate.enableSaslEncryption false 是否對Spark內部組件認證使用加密通信。該配置目前隻有 block transfer service 使用。
spark.network.sasl.serverAlwaysEncrypt false 是否對支持SASL認證的service禁用非加密通信。該配置目前隻有 external shuffle service 支持。
spark.core.connection.ack.wait.timeout 60s 網絡連接等待應答信號的超時時間。為了避免由於GC等導致的意外超時,你可以設置一個較大的值。
spark.core.connection.auth.wait.timeout 30s 網絡連接等待認證的超時時間。
spark.modify.acls Empty 逗號分隔的用戶列表,在改列表中的用戶可以修改Spark作業。默認情況下,隻有啟動該Spark作業的用戶可以修改之(比如殺死該作業)。如果想要任何用戶都可以修改作業,請將該配置設置為”*”
spark.ui.filters None 逗號分隔的過濾器class列表,這些過濾器將用於Spark web UI。這裏的過濾器應該是一個標準的 javax servlet Filter 。每個過濾器的參數可以通過java係統屬性來設置,如下:
spark.<class name of filer>.params=’param1=value1,param2=value2’例如:
-Dspark.ui.filters=com.test.filter1
-Dspark.com.test.filter1.params=’param1=foo,param2=testing’
spark.ui.view.acls Empty 逗號分隔的用戶列表,在該列表中的用戶可以查看Spark web UI。默認,隻有啟動該Spark作業的用戶可以查看之。如果需要讓所有用戶都能查看,隻需將該配置設為”*”

加密

屬性名 默認值 含義
spark.ssl.enabled false 是否啟用SSL連接(在所有所支持的協議上)。所有SSL相關配置(spark.ssl.xxx,其中xxx是一個特定的配置屬性),都是全局的。如果需要在某些協議上覆蓋全局設置,那麼需要在該協議命名空間上進行單獨配置。使用 spark.ssl.YYY.XXX 來為協議YYY覆蓋全局配置XXX。目前YYY的可選值有 akka(用於基於AKKA框架的網絡連接) 和 fs(用於應廣播和文件服務器)
spark.ssl.enabledAlgorithms Empty 逗號分隔的加密算法列表。這些加密算法必須是JVM所支持的。這裏有個可用加密算法參考列表: this
spark.ssl.keyPassword None 在key-store中私匙對應的密碼。
spark.ssl.keyStore None key-store文件路徑。可以是絕對路徑,或者以本組件啟動的工作目錄為基礎的相對路徑。
spark.ssl.keyStorePassword None key-store的密碼。
spark.ssl.protocol None 協議名稱。該協議必須是JVM所支持的。這裏有JVM支持的協議參考列表:this
spark.ssl.trustStore None trust-store文件路徑。可以是絕對路徑,或者以本組件啟動的工作目錄為基礎的相對路徑。
spark.ssl.trustStorePassword None trust-store的密碼

Spark Streaming [流式]

屬性名 默認值 含義
spark.streaming.backpressure.enabled false 是否啟用Spark Streaming 的內部反壓機製(spark 1.5以上支持)。啟用後,Spark Streaming會根據當前批次的調度延遲和處理時長來控製接收速率,這樣一來,係統的接收速度會和處理速度相匹配。該特性會在內部動態地設置接收速率。該速率的上限將由 spark.streaming.receiver.maxRate 和 spark.streaming.kafka.maxRatePerPartition 決定(如果它們設置了的話)。
spark.streaming.blockInterval 200ms 在將數據保存到Spark之前,Spark Streaming接收器組裝數據塊的時間間隔。建議不少於50ms。關於Spark Streaming編程指南細節,請參考 performance tuning 這一節。
spark.streaming.receiver.maxRate not set 接收速度的最大速率(每秒記錄條數)。實際上,每個流每秒將消費這麼多條記錄。設置為0或者負數表示不限製速率。更多細節請參考: deployment guide
spark.streaming.receiver.writeAheadLog.enable false 是否啟用接收器預寫日誌。所有的輸入數據都會保存到預寫日誌中,這樣在驅動器失敗後,可以基於預寫日誌來恢複數據。更詳細請參考:deployment guide
spark.streaming.unpersist true 是否強製Spark Streaming 自動從內存中清理掉所生成並持久化的RDD。同時,Spark Streaming收到的原始數據也將會被自動清理掉。如果設置為false,那麼原始數據以及持久化的RDD將不會被自動清理,以便外部程序可以訪問這些數據。當然,這將導致Spark消耗更多的內存。
spark.streaming.stopGracefullyOnShutdown false 如果設為true,Spark將會在JVM關閉時,優雅地關停StreamingContext,而不是立即關閉之。
spark.streaming.kafka.maxRatePerPartition not set 在使用Kafka direct stream API時,從每個Kafka數據分區讀取數據的最大速率(每秒記錄條數)。更詳細請參考:Kafka Integration guide
spark.streaming.kafka.maxRetries 1 驅動器連續重試的最大次數,這個配置是為了讓驅動器找出每個Kafka分區上的最大offset(默認值為1,意味著驅動器將最多嚐試2次)。隻對新的Kafka direct stream API有效。
spark.streaming.ui.retainedBatches 1000 Spark Streaming UI 以及 status API 中保留的最大批次個數。

SparkR

屬性名 默認值 含義
spark.r.numRBackendThreads 2 SparkR RBackEnd處理RPC調用的後台線程數
spark.r.command Rscript 集群模式下,驅動器和worker上執行的R腳本可執行文件
spark.r.driver.command spark.r.command client模式的驅動器執行的R腳本。集群模式下會忽略

集群管理器

每個集群管理器都有一些額外的配置選項。詳細請參考這裏:

YARN
Mesos
Standalone Mode

環境變量

有些Spark設置需要通過環境變量來設定,這些環境變量可以在${SPARK_HOME}/conf/spark-env.sh腳本(Windows下是conf/spark-env.cmd)中設置。如果是獨立部署或者Mesos模式,這個文件可以指定機器相關信息(如hostname)。運行本地Spark應用或者submit腳本時,也會引用這個文件。

注意,conf/spark-env.sh默認是不存在的。你需要複製conf/spark-env.sh.template這個模板來創建,還有注意給這個文件附上可執行權限。

以下變量可以在spark-env.sh中設置:

環境變量 含義
JAVA_HOME Java安裝目錄(如果沒有在PATH變量中指定)
PYSPARK_PYTHON 驅動器和worker上使用的Python二進製可執行文件(默認是python)
PYSPARK_DRIVER_PYTHON 僅在驅動上使用的Python二進製可執行文件(默認同PYSPARK_PYTHON)
SPARKR_DRIVER_R SparkR shell使用的R二進製可執行文件(默認是R)
SPARK_LOCAL_IP 本地綁定的IP
SPARK_PUBLIC_DNS Spark程序公布給其他機器的hostname

另外,還有一些選項需要在Spark standalone cluster scripts裏設置,如:每台機器上使用的core數量,和最大內存占用量。

spark-env.sh是一個shell腳本,因此一些參數可以通過編程方式來設定 – 例如,你可以獲取本機IP來設置SPARK_LOCAL_IP。

日誌配置

Spark使用log4j 打日誌。你可以在conf目錄下用log4j.properties來配置。複製該目錄下已有的log4j.properties.template並改名為log4j.properties即可。

覆蓋配置目錄

默認Spark配置目錄是”${SPARK_HOME}/conf”,你也可以通過 ${SPARK_CONF_DIR}指定其他目錄。Spark會從這個目錄下讀取配置文件(spark-defaults.conf,spark-env.sh,log4j.properties等)

繼承Hadoop集群配置

如果你打算用Spark從HDFS讀取數據,那麼有2個Hadoop配置文件必須放到Spark的classpath下:

  • hdfs-site.xml,配置HDFS客戶端的默認行為
  • core-site.xml,默認文件係統名

這些配置文件的路徑在不同發布版本中不太一樣(如CDH和HDP版本),但通常都能在 ${HADOOP_HOME}/etc/hadoop/conf目錄下找到。一些工具,如Cloudera Manager,可以動態修改配置,而且提供了下載一份拷貝的機製。

要想讓這些配置對Spark可見,請在${SPARK_HOME}/spark-env.sh中設置HADOOP_CONF_DIR變量。

 轉載自 並發編程網 - ifeve.com

最後更新:2017-05-19 17:01:55

  上一篇:go  Java網絡編程:UDP DatagramSocket
  下一篇:go  《大數據導論》導讀