RabbitMQ消息隊列(九):Publisher的消息確認機製
在前麵的文章中提到了queue和consumer之間的消息確認機製:通過設置ack。那麼Publisher能不到知道他post的Message有沒有到達queue,甚至更近一步,是否被某個Consumer處理呢?畢竟對於一些非常重要的數據,可能Publisher需要確認某個消息已經被正確處理。
在我們的係統中,我們沒有是實現這種確認,也就是說,不管Message是否被Consume了,Publisher不會去care。他隻是將自己的狀態publish給上層,由上層的邏輯去處理。如果Message沒有被正確處理,可能會導致某些狀態丟失。但是由於提供了其他強製刷新全部狀態的機製,因此這種異常情況的影響也就可以忽略不計了。
對於某些異步操作,比如客戶端需要創建一個FileSystem,這個可能需要比較長的時間,甚至要數秒鍾。這時候通過RPC可以解決這個問題。因此也就不存在Publisher端的確認機製了。
那麼,有沒有一種機製能保證Publisher能夠感知它的Message有沒有被處理的?答案肯定的。在這裏感謝笑天居士同學:他在我的《RabbitMQ消息隊列(三):任務分發機製》文後留言一起討論了問題,而且也查找了一些資料。在這裏我整理了一下他轉載和一篇文章和原創的一篇文章。銜接已經附後。
1. 事務機製 VS Publisher Confirm
如果采用標準的 AMQP 協議,則唯一能夠保證消息不會丟失的方式是利用事務機製 -- 令 channel 處於 transactional 模式、向其 publish 消息、執行 commit 動作。在這種方式下,事務機製會帶來大量的多餘開銷,並會導致吞吐量下降 250% 。為了補救事務帶來的問題,引入了 confirmation 機製(即 Publisher Confirm)。
為了使能 confirm 機製,client 首先要發送 confirm.select 方法幀。取決於是否設置了 no-wait 屬性,broker 會相應的判定是否以 confirm.select-ok 進行應答。一旦在 channel 上使用 confirm.select方法,channel 就將處於 confirm 模式。處於 transactional 模式的 channel 不能再被設置成 confirm 模式,反之亦然。
一旦 channel 處於 confirm 模式,broker 和 client 都將啟動消息計數(以 confirm.select 為基礎從 1 開始計數)。broker 會在處理完消息後,在當前 channel 上通過發送 basic.ack 的方式對其進行 confirm 。delivery-tag 域的值標識了被 confirm 消息的序列號。broker 也可以通過設置 basic.ack 中的 multiple 域來表明到指定序列號為止的所有消息都已被 broker 正確的處理了。
在異常情況中,broker 將無法成功處理相應的消息,此時 broker 將發送 basic.nack 來代替 basic.ack 。在這個情形下,basic.nack 中各域值的含義與 basic.ack 中相應各域含義是相同的,同時 requeue 域的值應該被忽略。通過 nack 一或多條消息,broker 表明自身無法對相應消息完成處理,並拒絕為這些消息的處理負責。在這種情況下,client 可以選擇將消息 re-publish 。
在 channel 被設置成 confirm 模式之後,所有被 publish 的後續消息都將被 confirm(即 ack) 或者被 nack 一次。但是沒有對消息被 confirm 的快慢做任何保證,並且同一條消息不會既被 confirm 又被 nack 。
2. 消息在什麼時候確認
broker 將在下麵的情況中對消息進行 confirm :- broker 發現當前消息無法被路由到指定的 queues 中(如果設置了 mandatory 屬性,則 broker 會先發送 basic.return)
- 非持久屬性的消息到達了其所應該到達的所有 queue 中(和鏡像 queue 中)
- 持久消息到達了其所應該到達的所有 queue 中(和鏡像 queue 中),並被持久化到了磁盤(被 fsync)
- 持久消息從其所在的所有 queue 中被 consume 了(如果必要則會被 acknowledge)
broker 會丟失持久化消息,如果 broker 在將上述消息寫入磁盤前異常。在一定條件下,這種情況會導致 broker 以一種奇怪的方式運行。例如,考慮下述情景:
1. 一個 client 將持久消息 publish 到持久 queue 中
2. 另一個 client 從 queue 中 consume 消息(注意:該消息具有持久屬性,並且 queue 是持久化的),當尚未對其進行 ack
3. broker 異常重啟
4. client 重連並開始 consume 消息
在上述情景下,client 有理由認為消息需要被(broker)重新 deliver 。但這並非事實:重啟(有可能)會令 broker 丟失消息。為了確保持久性,client 應該使用 confirm 機製。如果 publisher 使用的 channel 被設置為 confirm 模式,publisher 將不會收到已丟失消息的 ack(這是因為 consumer 沒有對消息進行 ack ,同時該消息也未被寫入磁盤)。
3. 編程實現
首先要區別AMQP協議mandatory和immediate標誌位的作用。
mandatory和immediate是AMQP協議中basic.pulish方法中的兩個標誌位,它們都有當消息傳遞過程中不可達目的地時將消息返回給生產者的功能。具體區別在於:
1. mandatory標誌位
當mandatory標誌位設置為true時,如果exchange根據自身類型和消息routeKey無法找到一個符合條件的queue,那麼會調用basic.return方法將消息返還給生產者;當mandatory設為false時,出現上述情形broker會直接將消息扔掉。
2. immediate標誌位
當immediate標誌位設置為true時,如果exchange在將消息route到queue(s)時發現對應的queue上沒有消費者,那麼這條消息不會放入隊列中。當與消息routeKey關聯的所有queue(一個或多個)都沒有消費者時,該消息會通過basic.return方法返還給生產者。
具體的代碼參考請參考參考資料1.
參考資料:
1. https://blog.csdn.net/jiao_fuyou/article/details/21594205
2. https://blog.csdn.net/jiao_fuyou/article/details/21594947
3. https://my.oschina.net/moooofly/blog/142095
最後更新:2017-04-03 12:55:41
上一篇:
qq平台登錄
下一篇:
從一道麵試題分析Linux進程+IO緩衝區機製
在ios下定時任務的小例子
SSI技術詳解
九度題目1528:最長回文子串
tomcat中conf\Catalina\localhost目錄下的J2EE項目META-INF配置文件
厲害了黑科技,動態安全下的防拖庫原來可以這麼簡單!
Objective-C 基礎語法
Android開發20——單個監聽器監聽多個按鈕點擊事件
Red Hat Enterprise Linux 7 64位安裝mysql-5.7.17詳細步驟
IE環境下判斷IE版本的語句...[if lte IE 6]……[endif][if lte IE 7]……[endif]
阿裏推出智能音箱“天貓精靈”,張張口就可以買買買了