消息中間件MetaQ高性能原因分【對外公布版本】
簡介
前麵寫了關於文件係統的三篇文章,[深入淺出文件係統][文件係統之讀寫基礎篇],[文件係統之讀寫高級篇],此篇算是對文件係統相關文章的一個總結。
MetaQ是一款高性能的消息中間件,經過幾年的發展,已經非常成熟穩定,曆經多年雙11的零點峰值壓測,表現堪稱完美。
MetaQ當前最新最穩定的穩本是3.x係統,MetaQ 3.x重新設計和實現,比之前的版本更優秀。雖然MetaQ借鑒了linkedin 的消息中間件kafak思想,但已經是青出於藍而勝於藍。
本文不對MetaQ做全麵的介紹,隻選擇高性能這點來分析。
性能測試對比圖
以上測試圖片,來自消息測中間件試團隊 @以夕 妹子的性能測試結果,更多測試結果請點擊
[Kafka、RabbitMQ、RocketMQ發送小消息性能對比],
[Kafka vs RocketMQ——Topic數量對單機性能的影響],
[Kafka vs RocketMQ——單機係統可靠]。
核心功能
MetaQ作為一款消息中間件,消息中間件該有的功能,MetaQ也有。本文並不全麵介紹MetaQ方麵方麵,隻是選取性能這一角度,來剖析其高性能的原因。
功能組件
-
MetaQ Server最為核心的組件,它主要可以接收應用程序發送過來的消息並存儲,然後再投遞。
-
MetaQ Master是
MetaQ Server邏輯上的角色,和MySQL Master概念類型,對外提供發送消息、訂閱消息以及維護著管理信息。 -
MetaQ Slave是
MetaQ Server邏輯上的角色,和MySQL Slave概念類型,對外提供訂閱消息功能。 -
MetaQ Client主要是應用程序使用,使用
MetaQ Client來發送消息、訂閱消息、其它控製信息。 -
其它無數據管理及控製信息組件
提供訂閱關係管理功能,
MetaQ Server服務發現功能。
發送消息
MetaQ Client 發送消息,MetaQ Server收到消息,並存儲到文件係統。也就是說MetaQ會有大量write係統調用。
訂閱消息
MetaQ Client 訂閱消息,因其是Pull的模型。MetaQ Server收到Pull消息的請求,會從磁盤上讀取出消息,然後返回給MetaQ Client。這一步有大量的read係統調用。
矛盾
從上麵的功能上看,Metaq Server要支持大量的磁盤IO操作,因為其是構建文件係統之上的消息中間件。既然使用了文件係統來存儲數據,但磁盤QPS每秒也就是幾百。MetaQ Server又必須高性能(如MetaQ Server性能是10W級別的QPS),才能在可接收的成本範圍內,滿足業務需求(不丟消息)。如何在QPS隻有幾百的磁盤上,構建出一個高性能的MetaQ消息間件正是本文的中心。
高性能
前麵介紹了MetaQ高性能的難點,那麼我們如何解決這些難點。要解決這些難點,就必須找出這些難點。那麼要寫一個高性能的消息中間件,會有哪些會部分會對影響性能。
影響性能的關鍵幾點
- 序列化與反序列化
從MetaQ Cleint要發送消息,必須要先序列化,然後才能通過網絡發送出去。 MetaQ Server收到消息後,要進行反序列化,才能解析出消息內容,最後序列化存儲到文件係統。
MetaQ Client收到消息,首頁MetaQ Server必須從文件中讀取消息,然後通過網絡發送給MetaQ Client,收到消息,進行反序列化,應用才能識別消息內容。
MetaQ核心功能,都要通過序列化與反序列化,所以其性能,對MetaQ性能有關鍵性的影響,其實不是對MetaQ,隻要使用了序列化與反序列化,其對性能影響都很大。
-
write性能
因為MetaQ Server會有大量的write係統調用 ,所以其性能對MetaQ性能有著重要的影響。
-
read性能
因為MetaQ Server會有大量的read係統調用 ,所以其性能對MetaQ性能有著重要的影響。
- 網絡框架
因為發送消息,訂閱消息都必須經過網絡,如果網絡組件性能不好,對MetaQ性能有著關鍵的影響。
如何高性能
- 序列化與反序列化
要解決序列化與反序列化性能問題,我們就必須尋種各種序列化與反序列化技術性能對比,從而選出一個高性能的序列化與反序列化技術來作為MetaQ。
我們來看下Java世界可以選擇的序列化與反序列化技術
從圖中性能數據,可以看出,個人認為Google出品的Protocol Buffers應該是最佳選擇,不管軟件的質量、社區活躍、軟件的後續發展上來說,都是不錯的選擇。
但MetaQ並沒有選擇Protocol Buffers作為其序列化與反序列化的技術,一個原因是Protocol Buffers居然在小版之間本都不兼容,2.3和2.5的版本都不兼容。這會帶來一個嚴重的問題,如果MetaQ選擇2.3的版本,應用程序選擇了2.5,都會導致衝突,反之亦然。
MetaQ消息元數據是通過JSON來序列化與反序列化,消息Body是交給應用自己序列化與反序列化。
雖然使用Protocol Buffers性能會更好,但帶給用戶帶來麻煩。所以MetaQ選擇使用JSON。
-
IO優化
前麵也已經介紹了,MetaQ Server 存大大量的IO,那麼怎麼優化呢?
read優化
read優化主要是使用了[文件係統之讀寫高級篇]裏介紹的mmap文件映射技術。這樣可以減少係統上下文切換和複製數據的開銷。更多詳情見[文件係統之讀寫高級篇]。
同時文件係統提供了文件預讀的功能,也使的讀取文件開銷,特別是順序讀時,開銷比較低。更多詳情見[文件係統之讀寫高級篇]。
write優化
前麵也介紹了,write可能存在並發問題,那麼MetaQ是如何解決的?
MetaQ消息隻保留在一個物理文件上,所有的消息都會寫一個物理文件,每個物理文件都是固定大小,超過設置的閥值後,自動創建新的一個文件。當磁盤快滿時,會自動刪除老的文件。
Group Commit技術
Group Commit也就是組提交,組提交是指可以多次分寫請求隻要通過一次刷新數據,就可以實現這些請求的數據都已刷新到磁盤上。
MySQL數據庫能保證ACID,事務提交也使用了Group Commit來提高性能(為了保證D,數據需要持久化到文件係統)。
詳細見下圖
當寫請求1到MetaQ Server時,把線程寫入內核後,觸發flush線程刷新數據到磁盤,以保證數據的可靠性。
然後再向MetaQ Client 響應發送消息成功。這個時間,隻要文件係統和磁盤不損壞,數據是不會丟失的。
正在flush線程要準備刷新數據時,寫請求2,寫請求3,寫請求4也到MetaQ Server且寫入數據,這樣因寫請求1寫數據,觸發的flush順便也把寫請求2,寫請求3,寫請求4的數據也刷新到磁盤。這樣減少了刷新磁盤的次數,性能自然就高了,同時也保證的數據的可靠性。
如何實現Group Commit,請看源碼
// Synchronization flush
if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
if (msg.isWaitStoreMsgOK()) {
request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
service.putRequest(request);
boolean flushOK =
request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig()
.getSyncFlushTimeout());
if (!flushOK) {
log.error("do groupcommit, wait for flush failed, topic: " + msg.getTopic() + " tags: "
+ msg.getTags() + " client address: " + msg.getBornHostString());
putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);
}
}
else {
service.wakeup();
}
}
// Asynchronous flush
else {
this.flushCommitLogService.wakeup();
}
並發安全
[文件係統之讀寫基礎篇]也提到過,write如何保證並發安全,在寫數據前,需要搶占一個鎖,因為這隻是把數據寫到文件係統緩存中,所以持有鎖的時間非常短,對性能友好。請看代碼
synchronized (this) {
long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
// Here settings are stored timestamp, in order to ensure an orderly
// global
msg.setStoreTimestamp(beginLockTimestamp);
MapedFile mapedFile = this.mapedFileQueue.getLastMapedFile();
if (null == mapedFile) {
log.error("create maped file1 error, topic: " + msg.getTopic() + " clientAddr: "
+ msg.getBornHostString());
return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null);
}
result = mapedFile.appendMessage(msg, this.appendMessageCallback);
switch (result.getStatus()) {
case PUT_OK:
break;
case END_OF_FILE:
// Create a new file, re-write the message
mapedFile = this.mapedFileQueue.getLastMapedFile();
if (null == mapedFile) {
// XXX: warn and notify me
log.error("create maped file2 error, topic: " + msg.getTopic() + " clientAddr: "
+ msg.getBornHostString());
return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result);
}
result = mapedFile.appendMessage(msg, this.appendMessageCallback);
break;
case MESSAGE_SIZE_EXCEEDED:
return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result);
case UNKNOWN_ERROR:
return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
default:
return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
}
DispatchRequest dispatchRequest = new DispatchRequest(//
topic,// 1
queueId,// 2
result.getWroteOffset(),// 3
result.getWroteBytes(),// 4
tagsCode,// 5
msg.getStoreTimestamp(),// 6
result.getLogicsOffset(),// 7
msg.getKeys(),// 8
/**
* Transaction
*/
msg.getSysFlag(),// 9
msg.getPreparedTransactionOffset());// 10
this.defaultMessageStore.putDispatchRequest(dispatchRequest);
eclipseTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
} // end of synchronized
網絡性能
MetaQ的網絡框架,選擇了Netty4。Netty4因出色的性能和易用性,成為高性能場景的不二選擇。
更多詳細的介紹請看我2014年為雙11優化Notify性能的文章,Notify優化性能及Netty實踐。
後記
MetaQ高性能的秘密,我們從其功能結構,從功能的作用,一個個解釋了可能影響性能的點,及怎麼解決這些問題,提高性能。
雖然一個個點看起來簡單,但要實現一個穩定、高性能的消息係統,還是不容易的。
________________________________________該篇是由原作者 傅衝 提供
最後更新:2017-06-05 11:33:54



