Beanstalkd隊列 概述
本文對beanstalkd的介紹簡短精煉,適合初次接觸消息隊列並想使用beanstalk的人。在我看過的一些介紹資料裏,這篇文章介紹地最為精煉,本文大多內容修改自它:P。我也是第一次接觸消息隊列,使用beanstalkd的原因是組裏比較資深的經理推薦,說道beanstalkd在某些大公司內都會使用,很輕量級,性能很高(是純C寫的)。
基本特性
同RabbitMQ,ZeroMQ相比,Beanstalkd是一個更加輕量級的消息隊列,也可以理解為是一個工作隊列,本質上是基於多個tube的workqueue,queue裏存放的是job,每個job帶一個遞增的唯一id,以及消息體(byte[])。
Beanstalkd的job狀態多樣化,支持任務優先級 (priority), 延時 (delay), 超時重發 (time-to-run) 和預留 (buried), 能夠很好的支持分布式的後台任務和定時任務處理。
它的內部實現采用 libevent, 服務器-客戶端之間用類似 memcached 的輕量級通訊協議,因此有很高的性能。
beanstalkd 提供了 binlog 機製,啟動的時候可以打開此功能。當重啟 beanstalkd 時,當前任務狀態能夠從紀錄的本地 binlog 中恢複。雖然沒有單點的HA,但是帶了容錯。
兩個重要概念
tube 類似於消息主題 (topic),在一個 Beanstalkd 中可以支持多個管道, 每個管道都有自己的發布者 (producer) 和消費者 (consumer),管道之間可以做到互不影響。
job 真正的消息體,帶有唯一id(從1開始遞增)和一個body(byte流),有豐富的可控狀態,設計地非常好用。
下圖詳細闡述了job狀態的流轉流程,理解清楚下圖對於使用beanstalkd有重要意義。
READY - 需要立即處理的任務,當延時 (DELAYED) 任務到期後會自動成為當前任務
DELAYED - 延遲執行的任務, 當消費者處理任務後, 可以用將消息再次放回 DELAYED 隊列延遲執行
RESERVED - 已經被消費者獲取, 正在執行的任務(將不會被別的消費者拿到)。Beanstalkd 負責檢查任務是否在 TTR(time-to-run) 內完成;
BURIED - 保留的任務: 任務不會被執行,也不會消失,除非有人把它 "踢" 回隊列;
DELETED - 消息被徹底刪除。Beanstalkd 不再維持這些消息。
以上這些狀態可以通過實際啟動beanstalkd後,用telnet連接並執行一些put,reserve操作或者直接編程體驗的方式得到更好的理解。
其他概念
任務優先級 (priority)
任務 (job) 可以有 0~2^32 個優先級, 0 代表最高優先級。 beanstalkd 采用最大最小堆 (Min-max heap) 處理任務優先級排序, 任何時刻調用 reserve 命令的消費者總是能拿到當前優先級最高的任務, 時間複雜度為 O(logn).
延時任務 (delay)
有兩種方式可以延時執行任務 (job): 生產者發布任務時指定延時;或者當任務處理完畢後, 消費者再次將任務放入隊列延時執行 (RELEASE with <delay>)。這種機製可以實現分布式的 java.util.Timer,這種分布式定時任務的優勢是:如果某個消費者節點故障,任務超時重發 (time-to-run) 能夠保證任務轉移到另外的節點執行。
任務超時重發 (time-to-run)
Beanstalkd 把任務返回給消費者以後:消費者必須在預設的 TTR (time-to-run) 時間內發送 delete / release/ bury 改變任務狀態;否則 Beanstalkd 會認為消息處理失敗,然後把任務交給另外的消費者節點執行。如果消費者預計在 TTR (time-to-run) 時間內無法完成任務, 也可以發送 touch 命令, 它的作用是讓 Beanstalkd 從係統時間重新計算 TTR (time-to-run).
任務預留 (buried)
如果任務因為某些原因無法執行, 消費者可以把任務置為 buried 狀態讓 Beanstalkd 保留這些任務。管理員可以通過 peek buried 命令查詢被保留的任務,並且進行人工幹預。簡單的, kick <n> 能夠一次性把 n 條被保留的任務踢回隊列。
Beanstalkd 協議
Beanstalkd 采用類 memcached 協議, 客戶端通過文本命令與服務器交互。這些命令可以簡單的分成三組:
生產類 - use <tube> / put <priority> <delay> <ttr> [bytes]:
生產者用 use 選擇一個管道 (tube), 然後用 put 命令向管道發布任務 (job).
消費類 - watch <tubes> / reserve / delete <id> / release <id> <priority> <delay> / bury <id> / touch <id>
消費者用 watch 選擇多個管道 (tube), 然後用 reserve 命令獲取待執行的任務,這個命令是阻塞的。客戶端直到有任務可執行才返回。當任務處理完畢後, 消費者可以徹底刪除任務 (DELETE), 釋放任務讓別人處理 (RELEASE), 或者保留 (BURY) 任務。
維護類 - peek job / peek delayed / peek ready / peek buried / kick <n>
用於維護管道內的任務狀態, 在不改變任務狀態的條件下獲取任務。可以用消費類命令改變這些任務的狀態。
被保留 (buried) 的任務可以用 kick 命令 "踢" 回隊列。
java客戶端
public class BeanstalkExample { protected static Log log = LogFactory.getLog(BeanstalkExample.class); public static void main(String[] args) { try { clientExample(); //pooledExample(); } catch (BeanstalkException e) { // TODO Auto-generated catch block e.printStackTrace(); } } /** * Example for using an unpooled client * * @throws BeanstalkException */ public static void clientExample() throws BeanstalkException { BeanstalkClient client = new BeanstalkClient("ip", 8300, "example"); //client.put(1l, 0, 5000, "this is some data".getBytes()); log.info(client.tubeStats()); //while(client.reserve(60) != null) { BeanstalkJob job = client.reserve(60); log.info("Get job: " + job.getId()); client.deleteJob(job); //} client.close(); // closes the connection } public static void pooledExample() throws BeanstalkException { BeanstalkPool pool = new BeanstalkPool("jx-crm-flare00.jx.baidu.com", 8300, 30, // poolsize "example" // tube to use ); BeanstalkClient client = pool.getClient(); client.put(1l, 0, 5000, "this is some data".getBytes()); BeanstalkJob job = client.reserve(60); client.deleteJob(job); client.close(); // returns the connection to the pool } }
最後更新:2017-04-03 12:53:59