閱讀965 返回首頁    go 阿裏雲 go 技術社區[雲棲]


消息中間件MetaQ高性能原因分【對外公布版本】

簡介

前麵寫了關於文件係統的三篇文章,[深入淺出文件係統][文件係統之讀寫基礎篇],[文件係統之讀寫高級篇],此篇算是對文件係統相關文章的一個總結。

MetaQ是一款高性能的消息中間件,經過幾年的發展,已經非常成熟穩定,曆經多年雙11的零點峰值壓測,表現堪稱完美。

MetaQ當前最新最穩定的穩本是3.x係統,MetaQ 3.x重新設計和實現,比之前的版本更優秀。雖然MetaQ借鑒了linkedin 的消息中間件kafak思想,但已經是青出於藍而勝於藍。

本文不對MetaQ做全麵的介紹,隻選擇高性能這點來分析。

性能測試對比圖

metaq

以上測試圖片,來自消息測中間件試團隊 @以夕 妹子的性能測試結果,更多測試結果請點擊
[Kafka、RabbitMQ、RocketMQ發送小消息性能對比],
[Kafka vs RocketMQ——Topic數量對單機性能的影響],
[Kafka vs RocketMQ——單機係統可靠]。

核心功能

MetaQ作為一款消息中間件,消息中間件該有的功能,MetaQ也有。本文並不全麵介紹MetaQ方麵方麵,隻是選取性能這一角度,來剖析其高性能的原因。

功能組件

  • MetaQ Server

    最為核心的組件,它主要可以接收應用程序發送過來的消息並存儲,然後再投遞。

  • MetaQ Master

    MetaQ Server邏輯上的角色,和MySQL Master概念類型,對外提供發送消息、訂閱消息以及維護著管理信息。

  • MetaQ Slave

    MetaQ Server邏輯上的角色,和MySQL Slave概念類型,對外提供訂閱消息功能。

  • MetaQ Client

    主要是應用程序使用,使用MetaQ Client來發送消息、訂閱消息、其它控製信息。

  • 其它無數據管理及控製信息組件

    提供訂閱關係管理功能,MetaQ Server服務發現功能。

發送消息

MetaQ Client 發送消息,MetaQ Server收到消息,並存儲到文件係統。也就是說MetaQ會有大量write係統調用。

訂閱消息

MetaQ Client 訂閱消息,因其是Pull的模型。MetaQ Server收到Pull消息的請求,會從磁盤上讀取出消息,然後返回給MetaQ Client。這一步有大量的read係統調用。

矛盾

從上麵的功能上看,Metaq Server要支持大量的磁盤IO操作,因為其是構建文件係統之上的消息中間件。既然使用了文件係統來存儲數據,但磁盤QPS每秒也就是幾百。MetaQ Server又必須高性能(如MetaQ Server性能是10W級別的QPS),才能在可接收的成本範圍內,滿足業務需求(不丟消息)。如何在QPS隻有幾百的磁盤上,構建出一個高性能的MetaQ消息間件正是本文的中心。

高性能

前麵介紹了MetaQ高性能的難點,那麼我們如何解決這些難點。要解決這些難點,就必須找出這些難點。那麼要寫一個高性能的消息中間件,會有哪些會部分會對影響性能。

影響性能的關鍵幾點

  • 序列化與反序列化

MetaQ Cleint要發送消息,必須要先序列化,然後才能通過網絡發送出去。 MetaQ Server收到消息後,要進行反序列化,才能解析出消息內容,最後序列化存儲到文件係統。

MetaQ Client收到消息,首頁MetaQ Server必須從文件中讀取消息,然後通過網絡發送給MetaQ Client,收到消息,進行反序列化,應用才能識別消息內容。

MetaQ核心功能,都要通過序列化與反序列化,所以其性能,對MetaQ性能有關鍵性的影響,其實不是對MetaQ,隻要使用了序列化與反序列化,其對性能影響都很大。

  • write性能

因為MetaQ Server會有大量的write係統調用 ,所以其性能對MetaQ性能有著重要的影響。

  • read性能

因為MetaQ Server會有大量的read係統調用 ,所以其性能對MetaQ性能有著重要的影響。

  • 網絡框架

因為發送消息,訂閱消息都必須經過網絡,如果網絡組件性能不好,對MetaQ性能有著關鍵的影響。

如何高性能

  • 序列化與反序列化

要解決序列化與反序列化性能問題,我們就必須尋種各種序列化與反序列化技術性能對比,從而選出一個高性能的序列化與反序列化技術來作為MetaQ

我們來看下Java世界可以選擇的序列化與反序列化技術

ser

ser2

從圖中性能數據,可以看出,個人認為Google出品的Protocol Buffers應該是最佳選擇,不管軟件的質量、社區活躍、軟件的後續發展上來說,都是不錯的選擇。

MetaQ並沒有選擇Protocol Buffers作為其序列化與反序列化的技術,一個原因是Protocol Buffers居然在小版之間本都不兼容,2.32.5的版本都不兼容。這會帶來一個嚴重的問題,如果MetaQ選擇2.3的版本,應用程序選擇了2.5,都會導致衝突,反之亦然。

MetaQ消息元數據是通過JSON來序列化與反序列化,消息Body是交給應用自己序列化與反序列化。

雖然使用Protocol Buffers性能會更好,但帶給用戶帶來麻煩。所以MetaQ選擇使用JSON

  • IO優化

前麵也已經介紹了,MetaQ Server 存大大量的IO,那麼怎麼優化呢?

read優化

read優化主要是使用了[文件係統之讀寫高級篇]裏介紹的mmap文件映射技術。這樣可以減少係統上下文切換和複製數據的開銷。更多詳情見[文件係統之讀寫高級篇]。

同時文件係統提供了文件預讀的功能,也使的讀取文件開銷,特別是順序讀時,開銷比較低。更多詳情見[文件係統之讀寫高級篇]。

write優化

前麵也介紹了,write可能存在並發問題,那麼MetaQ是如何解決的?

MetaQ消息隻保留在一個物理文件上,所有的消息都會寫一個物理文件,每個物理文件都是固定大小,超過設置的閥值後,自動創建新的一個文件。當磁盤快滿時,會自動刪除老的文件。

Group Commit技術

Group Commit也就是組提交,組提交是指可以多次分寫請求隻要通過一次刷新數據,就可以實現這些請求的數據都已刷新到磁盤上。

MySQL數據庫能保證ACID,事務提交也使用了Group Commit來提高性能(為了保證D,數據需要持久化到文件係統)。

詳細見下圖

group_commit

寫請求1MetaQ Server時,把線程寫入內核後,觸發flush線程刷新數據到磁盤,以保證數據的可靠性。
然後再向MetaQ Client 響應發送消息成功。這個時間,隻要文件係統和磁盤不損壞,數據是不會丟失的。

正在flush線程要準備刷新數據時,寫請求2寫請求3寫請求4也到MetaQ Server且寫入數據,這樣因寫請求1寫數據,觸發的flush順便也把寫請求2寫請求3寫請求4的數據也刷新到磁盤。這樣減少了刷新磁盤的次數,性能自然就高了,同時也保證的數據的可靠性。

如何實現Group Commit,請看源碼

  // Synchronization flush
        if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
            GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
            if (msg.isWaitStoreMsgOK()) {
                request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
                service.putRequest(request);
                boolean flushOK =
                        request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig()
                            .getSyncFlushTimeout());
                if (!flushOK) {
                    log.error("do groupcommit, wait for flush failed, topic: " + msg.getTopic() + " tags: "
                            + msg.getTags() + " client address: " + msg.getBornHostString());
                    putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);
                }
            }
            else {
                service.wakeup();
            }
        }
        // Asynchronous flush
        else {
            this.flushCommitLogService.wakeup();
        }

並發安全

[文件係統之讀寫基礎篇]也提到過,write如何保證並發安全,在寫數據前,需要搶占一個鎖,因為這隻是把數據寫到文件係統緩存中,所以持有鎖的時間非常短,對性能友好。請看代碼

 synchronized (this) {
            long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();

            // Here settings are stored timestamp, in order to ensure an orderly
            // global
            msg.setStoreTimestamp(beginLockTimestamp);

            MapedFile mapedFile = this.mapedFileQueue.getLastMapedFile();
            if (null == mapedFile) {
                log.error("create maped file1 error, topic: " + msg.getTopic() + " clientAddr: "
                        + msg.getBornHostString());
                return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null);
            }
            result = mapedFile.appendMessage(msg, this.appendMessageCallback);
            switch (result.getStatus()) {
            case PUT_OK:
                break;
            case END_OF_FILE:
                // Create a new file, re-write the message
                mapedFile = this.mapedFileQueue.getLastMapedFile();
                if (null == mapedFile) {
                    // XXX: warn and notify me
                    log.error("create maped file2 error, topic: " + msg.getTopic() + " clientAddr: "
                            + msg.getBornHostString());
                    return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result);
                }
                result = mapedFile.appendMessage(msg, this.appendMessageCallback);
                break;
            case MESSAGE_SIZE_EXCEEDED:
                return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result);
            case UNKNOWN_ERROR:
                return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
            default:
                return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
            }

            DispatchRequest dispatchRequest = new DispatchRequest(//
                topic,// 1
                queueId,// 2
                result.getWroteOffset(),// 3
                result.getWroteBytes(),// 4
                tagsCode,// 5
                msg.getStoreTimestamp(),// 6
                result.getLogicsOffset(),// 7
                msg.getKeys(),// 8
                /**
                 * Transaction
                 */
                msg.getSysFlag(),// 9
                msg.getPreparedTransactionOffset());// 10

            this.defaultMessageStore.putDispatchRequest(dispatchRequest);

            eclipseTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
        } // end of synchronized

網絡性能

MetaQ的網絡框架,選擇了Netty4Netty4因出色的性能和易用性,成為高性能場景的不二選擇。

更多詳細的介紹請看我2014年為雙11優化Notify性能的文章,Notify優化性能及Netty實踐

後記

MetaQ高性能的秘密,我們從其功能結構,從功能的作用,一個個解釋了可能影響性能的點,及怎麼解決這些問題,提高性能。

雖然一個個點看起來簡單,但要實現一個穩定、高性能的消息係統,還是不容易的。

________________________________________該篇是由原作者 傅衝 提供

最後更新:2017-06-05 11:33:54

  上一篇:go  C++多任務編程簡明教程 (1) - C++的多任務其實很簡單
  下一篇:go  《Cucumber:行為驅動開發指南》——1.3 活的文檔