京東消息中間件演進之路:三次更迭,八大突破
京東商城基礎平台團隊:包括大規模容器集群調度、數據庫與存儲技術、消息係統與服務框架、架構與運維、機器學習與人工智能等技術方向。由京東商城首席架構師劉海鋒擔任部門負責人。基礎平台運營多個數據中心數萬台服務器,支撐京東無數在線業務(團隊公眾號ID:ipdchat)。
導讀:本文將簡單介紹京東消息中間件的演進曆程,以及作為消息中間件,在每一代產品中我們是如何解決MQ麵臨的一些通用問題,如:如何處理IO,消息如何存儲,消息如何路由等等。
我們在開始之前要明確一些基本概念:
-
Broker:消息中間件服務端的一個實例;
-
Producer:消息的發送方;
-
Consumer:消息的接收方;
-
Topic:消息中間裏的數據分類的標示,一個topic也就代表一類消息,producer和consumer通過Topic實現關聯。
第一代AMQ
2012年初,京東唯一一個類似消息中間件的產品還是一個基於數據開發的,消息分發係統。所以確切地說,當時京東還沒有一個統一的消息中間件平台,但隨著公司組織架構的升級,建設一個統一的消息中間件平台就提上了日程。
目標就是建立一消息平台對外提供高可用,可擴展的消息傳遞服務,並能消息進行有效的治理。
第一代選型:
消息核心:基於ActiveMQ5.6做擴展。
配置中心:MySQL + Zookeeper。
管理監控平台:自主研發。提供比ActiveMQ自帶的管理平台更豐富的消息監控功能,並提供消息管理服務
當時選擇ActiveMQ是由於它是由JAVA開發符合公司的技術路線(.NET轉JAVA),有主從複製方案,性能尚可(單實例TPS 3000+),支持JMS、AMQP等多種技術規範。至於為何選擇Zookeeper,可參考我廠另一篇文章《服務框架技術選型實踐》中的“注冊中心選型”。
AMQ是如何解決MQ麵臨的一些通用問題:
因為核心是使用的ActiveMQ,所以部分問題就回歸到了ActiveMQ是如何解決這些通用問題上來,但其中不乏我們對ActiveMQ的擴展,關於ActiveMQ部分的詳細情況我們可以參看其官方文檔,以下我們主要看下我們在其基礎上的擴展及改進:
1.如何存儲?
ActiveMQ使用了KahaDB存儲引擎進行存儲,BTree索引,為了保證可靠性索引文件和日誌文件都要同步刷盤。此外,ActiveMQ通過虛擬主題(VirtualTopic)的方式實現Topic,也就是如果一個Topic有多個訂閱者,ActiveMQ就為每個訂閱者創建一個隊列,那麼producer每發一條消息,有多少個訂閱者,broker就會將消息複製多少份,將其放到不同訂閱者的隊列中,broker在從中獲取消息推送給consumer。
2. 如何支持集群?
當時的原生的ActiveMQ客戶端在收發消息上是這樣的:針對一個Topic發送者就隻能往特定的單個Broker上發送消息,消費者亦是如此,隻能從指定的單個Broker上消費消息。從客戶端的角度來看就是不支持服務集群化,一旦分給這個Topic的這組Broker掛掉那針對這個Topic來說整個服務就處於不可用狀態。
我們做的第一件事就是讓客戶端支持集群,我們采用ZK作為配置中心對客戶端進行了擴展,使客戶端支持了集群的同時還實現了其對服務端動態擴展的支持。以下是集群化之後的客戶端與服務端整體架構。
圖1:客戶端服務端整體架構
3. Push or pull?消息如何路由?
AMQ采用的是push模式,也就是消息由producer發送到broker端之後是由broker推送給consumer。
對於原生的客戶端隻能在單個Broker上收發消息,那也就不存在消息路由的問題,我們來看下集群化之後消息是如何路由的,這裏的路由涉及到兩個方麵,一方麵是producer在發消息是如何選擇將消息發往哪個Broker,另一方麵就是broker如何決定一條消息應該推送給哪個consumer。
發送路由:
-
隨機發送,這種模式是完全隨機的,也就是producer在topic指定的多個broker當中隨機選擇一個broker向其發消息。
-
加權隨機發送,這種方式下我們可以對一個topic分配的多個broker設置不同的權重,發送時producer就根據權重來進行選擇,權重越高被選中的幾率越大。這種方式有個好處就是當一個broker有異常時我們就可以將其權重置為0,這樣producer就不會往上發消息,待其恢複之後我們再逐步的將權重給予恢複,這就能夠避免在單個broker出異常時客戶端大量報錯。
消費路由:broker隨機選擇一個當前在線的consumer並將消息推送給它。
圖2:客戶端路由模型
4. 如何處理消費失敗的消息?
原生ActiveMQ的consumer在成功處理一條消息之後會向broker發一個ACK,以確認消息被成功消費。當處理一條消息失敗之後就會向broker返回一個消息處理失敗的應答,此時當broker收到這條處理失敗的應答時會將這條處理失敗的消息放到“死信隊列”(DLQ)。
針對每個普通隊列Broker端都對應著這樣一個死信隊列,此隊列的消息不能被消費者所獲取。對此我們對客戶端進行了擴展,在消費出錯的時候攔截錯誤信息,然後將出錯的消息通過SAF協議(我廠的SOA框架)調用“重試服務”將異常消息入庫,入庫成功之後consumer再向broker發送一個消費成功的ACK,這時Broker就會將此消息刪除,這樣我們實際上就是將broker端的消息轉移到了庫裏。而後consumer再根據一定策略通過SAF去調用“重試服務”獲取之前入庫的消息進行處理。
圖3:客戶端重試架構圖
5. 如何生成消息軌跡?
原生ActiveMQ並不支持消息軌跡,我們擴展了服務端,對寫消息和消費消息進行了攔截,在消息入隊成功和消費成功後分別記下日誌,再由Agent采集日誌並進行歸檔,Agent將元信息寫入歸檔庫,然後將消息內容寫入JFS(JD File System,我廠的分布式文件係統)。歸檔成功後可在管理控製平台追蹤消息的處理軌跡,必要時還可下載消息體。
6. 其他擴展、優化
-
優化了broker寫消息邏輯將性能提升到單實例TPS 4000+(原生單實例TPS3000+);
-
實現了新的主從複製(原生主從複製為完全的同步複製,在slave落後時需要手動將Master數據拷貝到Slave,運維極其不方便)
-
實現新的主從選舉使其更易維護,更好的支持集群(原生的主從選舉容易出現雙Master);
-
增加了監控告警模塊以及其他的一些運維工具,使得管理裏更加簡單,運維更加方便。
經過一係列的擴展 和優化,AMQ基本上實現了高可用、可擴展以及統一的治理消息等目標,初步形成了一套完整的企業級消息中間件解決方案。以下是AMQ平台粗略的整體架構圖:
圖4:AMQ整體架構
第二代JMQ
到2014年初AMQ幾經優化已經很成熟。但隨著公司業務的發展,接入主題數量越來越多,消息量也是成倍增長。
此外隨著公司在各地新建機房,應用的部署結構也變得比較複雜,這就有了跨機房部署等需求。加上AMQ本身的一些問題,歸結一下主要有:
-
Broker較重,采用B-Tree索引,性能隨消息積壓量的上升急劇下降;
-
Broker端采用的VirtualTopic模式針對一個Topic有多個訂閱者的情況會對每個訂閱者單獨存儲一份消息。而京東的生產環境中大部分都是采用VirtualTopic並且每個Topic訂閱者都很多,舉個例子,比如“訂單管道”消息:它有將近100個訂閱者,也就是同一個數據要寫將近100份,不僅如此,這100份消息還要通過網絡發送到Slave上,經過這些流程,寫入TPS隻能達到幾百。所以不管本地寫性能性能、網落利用率、還是存儲空間利用率來看這種方案都急需調整;
-
Broker邏輯複雜,其模型就決定了無法在其基礎上擴展消息回放、順序消息、廣播消息等個性化需求,而實際使用過程當中又比較渴望我們支持此類特性;
-
重客戶端,由於集群、異常消息重試等功能都是通過擴展ActiveMQ的原生客戶端並引入SAF、ZooKeeper等服務得以支持的,一定程度上增加了客戶端的複雜度,相應的在客戶端的穩定性、可維護性等方麵就打了折扣;
-
注冊中心直接暴露給了客戶端,這樣最明顯的一個缺點就是隨著客戶端實例數的增多,注冊中心的連接數越來越多,這就很難對注冊中心實施保護措施;
-
監控數據不完善。
基於以上原因我們2014年初開啟了第二代消息中間件JMQ的自研過程。主要做了以下一些工作:
a. JMQ服務端:實現輕量級的存儲模型、支持消息回放、支持重試、支持消息軌跡、支持順序消息、支持廣播消息等,並兼容AMQ客戶端使用的OpenWire協議;
b. JMQ客戶端:實現輕量級客戶端隻和Broker通信,支持動態接收參數,內置性能采集、支持跨機房;
c. 管理控製平台:管理監控功能更強大;
d. HTTP代理:基於Netty,支持跨語言。
回到JMQ是如何解決MQ麵臨一些通用問題上來:
1. 如何解決IO問題?
JMQ沒有采用AMQ通過自己開發重複造輪子的方式解決IO問題,而是使用了Netty4.0,此框架開源,支持epoll,編程模型相對簡單。這在一定程度上減少了服務端的開發工作,也降低了服務端的複雜度。
在應用層,我們自定義了JMQ協議,序列化和反序列化也完全自己開發。這種序列化反序列化方式雖然在一定程度上降低我們的開發效率,但我們不用考慮如果采用第三方的序列化和反序列化方案會帶來的性能損耗問題,對於性能上的提升是顯而易見的。
2. 如何存儲消息?
JMQ存儲分為日誌文件(journal)和消息隊列文件(queue)以及消費位置文件(offset)都存儲在Broker所在機器的本地磁盤上。
日誌文件,主要存儲消息內容,包括消息所在隊列文件的位置,有以下特點:
a. 同一broker上不同topic消息存儲在同一日誌文件上,日誌文件按固定大小切分;
b. 文件名為起始全局偏移量;
c. 消息順序追加。
d. 日誌文件同步刷盤;
由於JMQ主要使用在可靠性要求極高的下單、支付等環節,所以broker必須保證收到的每條消息都落到物理磁盤,這樣一種日誌文件設計主要是為了提高多topic大並發下磁盤的寫性能。不僅限於模型的設計,為了提高寫性能我們在邏輯上還實現了Group commit。下圖是JMQ中Group Commit基本示意圖:
圖5:Group commit 示意圖
消息隊列,主要存儲消息所在日誌文件的全局偏移量,此文件有以下特點:
a. 同一broker上不同topic的隊列信息存儲在不同的隊列文件上,隊列文件按固定大小切分;
b. 文件名為全局偏移量;
c. 索引順序追加;
d. 隊列文件異步刷盤。
由於在日誌的寫入是單線程的,那我們在寫入之前就可以提前獲取到消息在隊列文件的位置,並將這個位置寫入到在日誌文件當中。所以隻要日誌文件寫成功,即便隊列文件寫失敗,我們也能從日誌文件中將隊列恢複出來,因此隊列文件采用異步刷盤。
圖6:日誌文件和隊列文件模型
消費位置文件:主要用於存儲不同訂閱者針對於某個topic所消費到的隊列的一個偏移量。
下圖簡單描述了消息在服務端的流轉過程:
圖7:消息流轉示意圖
3. 如何容災?
說到容災,我們上一節談到了每條消息在broker上都會落地,但這遠遠不夠,我們必須要保證單機失效甚至機房失效的情況下數據還能被恢複,所以我們需要的一套完整的方案來進行數據災備。在AMQ裏采用的是一主一從,主從分布在一個數據中心,主從同步複製這樣的方案。
JMQ裏我們實現了一套全新的複製方案:采用一主一從,至少一個備份,主從分布在同一個數據中心、備份分布在其他數據中心,主從同步複製、備份異步複製這樣一種方式進行數據災備。主從同步複製這樣就保證了同一條消息我們至少有兩個完整備份,即便一個備份丟失,我們還有另一個備份可用。備份節點就保證了極端情況下即使整個數據中心掛掉,我們絕大部分的消息還能得以恢複;
圖8:JMQ複製模型
4. Push or pull?消息如何路由?
JMQ采用pull模式,也就是消息由producer發送到broker之後是由consumer主動發起請求去broker上取消息。
發送者的路由和AMQ客戶端采取的策略基本相同。
消費者路由和AMQ差不多也是隨機,不同點在於JMQ是拉模式,在Broker采取長輪詢策略。
5. 如何處理消費失敗的消息?
與AMQ不同,由於JMQ 的broker直接就支持重試,在Consumer在處理消息失敗時直接向服務端發送一個重試消息命令,服務端接到到命令後將此消息入庫。隨後在consumer發起拉取消息命令時,服務端再根據一定的策略從庫裏將消息取出,返回給consumer進行處理。這樣做帶來一個好處就是JMQ客戶端減少了一個第三方服務依賴。
6. 如何記錄消息軌跡?
JMQ消息軌跡功能整體流程和AMQ基本相同,主要不同點在於JMQ將消息軌跡相關信息存儲到HBase,這樣JMQ消息軌跡信息的存儲周期變的更長,可以存儲的量也更大了,並且采用多種手段優化之後性能也得到了極大的提升。
7. 如何管理元數據?
通過第一部分我們知道,AMQ的元數據會持久化在MySQL,並在入庫的同時寫入Zookeeper,由ZK再下發到Broker和客戶端。這樣就有兩個問題:
a. 客戶端引入了Zookeeper這個第三方服務,引入服務越多那客戶端穩定性和可維護性就越差。
b. 注冊中心也就是Zookeeper直接暴露給了客戶端,這樣就會導致注冊中心的連接數越來越多在出現故障時缺乏必要的手段對注冊中心進行保護。
在JMQ中我們依然利用MySQL來持久化元數據,同時也會將元數據寫入Zookeeper,Zookeeper再通知到Broker,但客戶端不再直接連接ZooKeeper,而是轉而連接Broker,從Broker上獲取元數據信息。
由於每個Broker都有全量的元數據信息,所以客戶端端連接任意的Broker都能獲取到元數據信息。這種設計就帶來了幾個好處:
a. 減少了客戶端對Zookeeper服務的依賴,至此我們客戶端就隻需和broker通信,客戶邏輯得到了簡化,客戶端穩定得到了極大提升。
b. Zookeeper不再暴露給客戶端,這樣ZooKeeper的穩定性也有了保證。
c. 由於連接任意一個Broker都能獲取到元數據,極端情況下即便有個別broker宕機也不影響客戶端獲取元數據,所以從另外一個角度來看這又提高了我們注冊中心的可用性。
8. 其他
除了以上提到的一些基本問題之外,我們還解決了很多問題,由於篇幅問題就不一一在此羅列說明,其中包括但不限於:
a. 如何實現嚴格順序消息;
b. 如何實現廣播消息;
c. 如何實現兩階段事務;
d. 如何實現消息回放。
下麵是JMQ的一個粗略整體架構圖:
圖9:JMQ架構圖
JMQ性能數據
測試所用機器情況:
場景1:一主一從,Master同步刷盤+同步複製到Slave
場景2:一主一從,Master異步刷盤+異步複製Slave
JMQ規模
-
Topic數量:1000+
-
接入應用數量:2500+
-
單日消息入隊數量:500億+
JMQ3.0
到2016年中,JMQ經過兩個大版本的迭代,目前線上運行的JMQ2.0版本已經非常穩定,性能也很優秀,有人可能會問,既然都挺好了你們是不是就沒事幹了?隻需要運維好就行了。
其實不然,隨著業務的發展,服務端規模的擴充,如何構建一個多樣化、自動化的係統就顯得尤為重要,所以在今年下半年我們又規劃了我們JMQ3.0,JMQ3.0我們有幾個重要的目標:
1. 優化JMQ協議;
2. 優化複製模型;
3. 實現KAFKA協議兼容;
4. 實現全局負載均衡;
5. 實現全新的選舉方案;
6. 實現資源的彈性調度。
從這些目標不難看出JMQ3.0在功能上和架構上都會有一個大的升級,所以我們打算通過兩次大的迭代逐步實現上述這些目標。目前我們正在進行JMQ3.0第一版聯調測試工作,預計在12月底第一個版本就會進行預發。
第一版我們基本實現了Kafka協議兼容、類Raft的主從選舉、全局負載均衡、對複製進行了優化,同時實現了一個彈性調度的簡單DEMO。從我們目前的一些測試數據來看,新版本在性能上又有了一定的提高。期望JMQ3.0第一期上線後我們再通過一期或者兩期的迭代,能夠讓JMQ更上一層樓。
下圖是我們JMQ3.0的一個整體架構:
圖10:JMQ3.0整體架構
原文發布時間為:2017-01-22
本文來自雲棲社區合作夥伴DBAplus
最後更新:2017-05-15 17:33:55