713
財經資訊
Elasticsearch 漫談
前言之前在研究ElasticSearch的時候,發現竟然已經有七篇文章了。這些文章通常都是遇到了問題,於是去研讀相關代碼,試圖搞清楚裏麵的機製,順帶記錄下來而成文的。如果加上一些黏邊的文章,譬如ELK的崛起等,則應當在十篇左右。 涉及到了聚合,索引構建,Rest/RCP API,Recovery 等多個方麵。相對而言,ES 索引構建流程相關的文章已經比較完備:
在ES中,索引構建和查詢因為沒有做分離,所以他們之間存在著非常激烈的競爭關係,而ES所暴露出來的那無數參數就是調整兩者之間關係的。
Merge的影響其實是非常大的。現在大部分存儲係統對於更新和刪除其實都是生成新的文件,並不會直接去更新原來的文件,查詢時對應的Reader會讀取這些文件,從而實現類似合並後的效果。在ES中,Merge由兩部分構成,MergeScheduler和MergePolicy。MergeScheduler控製合並的使用的工作線程以及一次合並多少文件等。MergePolicy則是控製如何進行文件的合並。默認的TireMergePolicy,會生成多個不大於5G的文件。
所以,對於Merge其實我們可以調整MergeScheduler和MergePolicy。對應的你可以在ElasticsearchConcurrentMergeScheduler和MergePolicyConfig兩個類裏看到詳細的可配置參數列表。
Merge有啥影響的?其實它和Shard數的控製也有很大關係。假設我們有100台服務器,2400顆核,單機24顆核心,那麼默認每個分片會有四個線程用於Merge操作。假設我們有500個分片,那麼Merge可以使用的CPU核數達到了2000個,在一個數據寫入非常頻繁的係統,大部分CPU可能都會被Merge給消耗掉。所以並不是分片越多越好,這裏需要考慮Merge對係統的影響,並且分片越多,那麼用於Bulk的CPU就越多,對Search的性能其實也是有影響的。
在我的實際測試過程中,如果我將分片數設置為服務器數,並且將merge線程設置為1,也就是一個Shard一個merge線程,這種情況下,CPU會有效的降低,並且索引構建性能也能得到一定的提升。我猜測,如果調低 index.merge.policy.max_merged_segment,假設現在設置為1G,那麼將一個2M的新的Segment合並到1G的文件將比原來5G的快得多,消耗的CPU也更少,然而帶來的影響可能是索引查詢性能的下降以及可能導致係統文件句柄的耗盡。
如果一個Shard分片裏的數據過大,那麼譬如聚合查詢的響應時間基本就難以接受了,對於數據規模在五六億的一個分片而言,簡單的groupby 加sum的查詢可能耗時都能夠達到2分鍾,相對而言,Shard的文檔數量在百萬規模,能夠獲得一個較好的查詢響應時間,然而可能依然以秒計。
Shard一多,Merge以及Bulk構建索引消耗的CPU都會變得巨大,讓Search變得愈加困難。而隨著數據量規模的日益龐大,而單個Shard數據量又不宜太大,那麼隻能加大Shard數量,這就導致我們陷入了一個困境。
解決上麵的問題似乎有兩個簡單而有效的方案:
- 讓同一Node實例的Shard共用一個Merge線程池,而不是現在的每個Shard單獨戰友一個Merge線程池。
- 將Shard 字段的列式存儲,最好是能夠分成多個block,然後利用其有序性,對每個Block保留Min-Max值,從而在做equal或者range類的過濾時,跳過部分Block,避免時間消耗和Shard的數據量成線性關係。而且如果單個文件,則很難全部緩存起來,無法高效利用係統緩存。
- 有些查詢理論上是不精準的,有誤差的,然而大部分場景下卻都是準確的。
- 有些理論上是不精準的,有誤差的,實際場景也是有誤差的。
那為啥ES不能做精準的計算呢?那是因為ES是一個存儲,而不是一個正真意義上的分布式計算引擎。分布式計算引擎一定要有一個強大的Reduce能力,而ES目前還隻能在單機做Reduce,這就導致它必定受限於單機的內存,所以他必須做一些假設或者采用某種估算算法才能避免內存被耗盡。
ES-Hadoop基本就是個半成品。為啥說是半成品呢?因為我們確實能夠利用ES-Hadoop項目很好的和Spark做結合,將數據導入到ES中。然而進行查詢的時候,因為ES-Hadoop采用了http協議,通過RestAPI 去獲取ES的數據導入到Spark中做計算,導致加載效率極低。加載效率低的原因其實不僅僅是采用了HTTP協議的緣故(如果換做RPC據說效率有50%以上的提升),還有如:
- Scroll API 需要每次重新獲得和過濾候選集,然後得到新批次的數據
- Scan後獲得DocId集合,然後fetch _source 是一個隨機讀過程而讓IO性能無法接受
其中影響最大的是fetch _source。 這也是Spark Data source API 帶來的問題,也不能全怪ES。為什麼這麼說呢? 因為Spark Data Source API 依然無法發揮底層存儲的計算能力,它隻能下沉(PushDown)一些filter,而無法接受groupby後的結果進行計算,這就導致數據規模下不來。
能夠跑後台任務對類似ES這種係統是很重要的。現在的ES無法實現把任務丟進去(或者查詢),然後可以異步監控獲取結果。一種比較直觀的場景是,我丟一個SQL進去,類似 insert to newtable from (select * from oldtalbe)這種,然後第二天就可以出結果,然後BI報表讀取newtable就能夠顯示了。這個隻是功能的話是比較容易做的,最大的難點是資源的控製,不能說一個query任務就耗盡了所有的資源甚至跑掛了ES。 實際上涉及到兩個點:
- 資源隔離
- 任務調度
要實現資源隔離,隻能自己去管理內存,可能需要JVM實現一個TaskMemoryManager的管理器,然後所有task都需要到這裏來申請資源,其實是很複雜的一件事情。
我們知道 ES是有自己的DSL的,是一個用JSON來定義的查詢語言。寫起來還是比較繁瑣的,而相當一部分功能其實是可以映射到SQL上的。我覺得官方有必要提供對SQL的支持,Solr現在已經做了,但是ES目前還隻有第三方在做。在我的視角裏,沒有SQL支持的查詢係統,我基本是不考慮的。Spark 提供了那麼多易用的API,然而純SQL還是最好用的。
在討論這個問題之前,我們先要理解一下文件的寫入過程。當我們打開一個文件描述符往裏麵寫入數據的時候,一般而言會寫入文件係統的緩存裏,所以再最後需要fsync一下,強製將所有數據刷入磁盤。那麼對應的,Segment產生也分兩個階段,一個是產生了文件,一個是fsync到磁盤後不再變化了。
我們這裏指的產生Segment就是指已經被commit到磁盤的segment.
Segment這個名詞來自於Lucene,在前麵Merge相關的內容裏已經反複有所提及。Translog是觸發Segment生成一個比較重要的地方,因為他們本來就是起互補作用的。當我們要清空Translog然後打開新的Translog時,就會將現有的數據持久化到Segment裏。所以Translog的配置直接影響了Segment的生成頻率。另外,Translog做Recovery的時候,其實也是會觸發flush動作的,比如做SNAPSHOT。當然,ES也可以通過API手動觸發Flush從而產生Flush動作。
ES副本對索引性能的影響幾乎是100%。 然而目前的機製而言,你是不能去掉副本的,因為一旦發生主片丟失,就不僅僅是已經存在的數據丟失,還包括新的數據部分也無法進入集群。至於為啥影響是100%呢?因為副本和主片都是通過HTTP協議完成的,而不是類似傳統的文件拷貝的方式。在5.0之後有一個優化,就是fsync可以實現異步化,可以有效提高吞吐。
隨著ES在數據分析領域的大放異彩,索引速度越來越是個瓶頸。企業似乎也願意投資,使用百台高性能服務器錄入千億規模數據的大有人在。然而和原生的Lucene的速度相比較,差距仍然是比較大的。那麼速度到底差在哪裏呢?
大體有幾個因子影響了索引的速度:
- Translog ,你可以類比MySQL的Binlog
- Version,版本檢查
- 一些特殊字段,譬如_all,_fieldNames等
- Schema Mapping相關的(譬如mapping Dynamic Update)
- JSON的解析(ES 交互基本是以JSON為主體的)
- Segments 的Merging
- Refresh Interval ,索引的刷新周期
在默認參數下,Translog 寫入的CPU消耗甚至比Lucene 的addDocument 還高兩倍。這點我還是蠻詫異的。Translog也要落磁盤,也需要commit,所以我們可以通過將index.translog.durability設置為async,這樣translog的寫入由默認的每次請求後就執行改成定時(5s)commit一次。這樣帶來的額外好處是減少 Translog寫磁盤的次數,也就了減少了構建索引的消耗。
Translog並不會無限存在,到了一定程度,就需要觸發索引的flush,具體動作是
- commit index segment
- clear translog
- open new translog
- flush的越少,那麼索引性能越高
- flush的越少,translog就可能越大,那麼當發生故障時,恢複時間就可能越長。
這裏解釋下translog和故障恢複的關係。當數據進行recovery的時候,大致是如下一個流程:INIT -> INDEX -> VERIFY_INDEX -> TRANSLOG -> FINALIZE -> DONE
第二種情況是重新Load某個Shard,比如某個Node被快速重啟了,這個時候因為數據還沒來得及commit成segment就掛了,再次啟動後,丟失的數據就可以從Trasnlog裏恢複了,如果Translog多了,就讓恢複變得很慢。所以在這種情況下,Translog保留多少條就變得很重要了,可以通過參數index.translog.flush_threshold_ops 控製。
當然,前麵討論的一些設置讓translog也變得不可靠,一旦產生當機等問題,可能在內存中的translog沒有及時commit到磁盤而導致數據丟失。吞吐和可靠總是存在某種矛盾。
關於Translog的內容,大致就如上了。我覺得Translog的寫入和讀取等還是有優化空間的。這裏再說說5.0裏和Translog有關的一個優化,在ES裏實時Get的話,其實是通過內存中通過docId拿到translog offset ,然後再去拿的,5.0之後不需要這樣了,隻要在內存維護最新文檔的docId而不是docId和translog offset的映射關係,然後有請求的話,將數據flush到segment裏然後直接去取。
我們再說說Version機製,Version大致會有一個Map緩存,如果緩存沒有,就會走磁盤。索引Version檢查其實是一個昂貴的操作。如果是時序數據(不變數據),則讓係統auto generate id可以跳過Version檢查,這樣的話對性能也是巨大的提升。
在ES裏有一些特殊字段,比如_all,_fieldNames,_source等。_all性能影響還是比較大的。_source我們一般需要保留,否則會有很多不便,因為無法還原完整的記錄。_all一般而言可以關掉。之前我沒注意到_fieldNames這個字段,通過JProfiler我發現如下的代碼竟然占了整個Bulk過程CPU的6%左右的消耗。

後來一查,發現是為了生成_fieldNames字段的。如果你要追求索引灌入的性能,果斷關掉這個字段吧。
ES的Mapping其實消耗也非常大,比如Dynamic update 特性。建議固定好的你Schema,然後在ETL過程中規範你的數據,然後關掉該特性。
JSON的解析其實是比較慢的,通過性能分析發現,比如StringFieldMapper裏的parseCreateFieldForString方法消耗CPU就特別厲害,仔細一看,

最後更新:2017-04-01 17:13:51