212
阿裏雲
消息發布與訂閱__最佳實踐_雲數據庫 Redis 版-阿裏雲
消息的發布與訂閱
場景介紹
ApsaraDB for Redis 也提供了與 Redis 相同的消息發布(pub)與訂閱(sub)功能。即一個 client 發布消息,其他多個 client 訂閱消息。
需要注意的是,ApsaraDB for Redis 發布的消息是“非持久”的,即消息發布者隻負責發送消息,而不管消息是否有接收方,也不會保存之前發送的消息,即發布的消息“即發即失”;消息訂閱者也隻能得到訂閱之後的消息,頻道(channel)中此前的消息將無從獲得。
此外,消息發布者(即 publish 客戶端)無需獨占與服務器端的連接,您可以在發布消息的同時,使用同一個客戶端連接進行其他操作(例如 List 操作等)。但是,消息訂閱者(即 subscribe 客戶端)需要獨占與服務器端的連接,即進行 subscribe 期間,該客戶端無法執行其他操作,而是以阻塞的方式等待頻道(channel)中的消息;因此消息訂閱者需要使用單獨的服務器連接,或者需要在單獨的線程中使用(參見如下示例)。
代碼示例
消息發布者 (即 publish client)
package message.kvstore.aliyun.com;import redis.clients.jedis.Jedis;public class KVStorePubClient {private Jedis jedis;//public KVStorePubClient(String host,int port, String password){jedis = new Jedis(host,port);//KVStore的實例ID及密碼String authString = jedis.auth(password);//passwordif (!authString.equals("OK")){System.err.println("AUTH Failed: " + authString);return;}}public void pub(String channel,String message){System.out.println(" >>> 發布(PUBLISH) > Channel:"+channel+" > 發送出的Message:"+message);jedis.publish(channel, message);}public void close(String channel){System.out.println(" >>> 發布(PUBLISH)結束 > Channel:"+channel+" > Message:quit");//消息發布者結束發送,即發送一個“quit”消息;jedis.publish(channel, "quit");}}
消息訂閱者 (即 subscribe client)
package message.kvstore.aliyun.com;import redis.clients.jedis.Jedis;import redis.clients.jedis.JedisPubSub;public class KVStoreSubClient extends Thread{private Jedis jedis;private String channel;private JedisPubSub listener;public KVStoreSubClient(String host,int port, String password){jedis = new Jedis(host,port);//ApsaraDB for Redis的實例ID及密碼String authString = jedis.auth(password);//kvstore_instance_id:passwordif (!authString.equals("OK")){System.err.println("AUTH Failed: " + authString);return;}}public void setChannelAndListener(JedisPubSub listener,String channel){this.listener=listener;this.channel=channel;}private void subscribe(){if(listener==null || channel==null){System.err.println("Error:SubClient> listener or channel is null");}System.out.println(" >>> 訂閱(SUBSCRIBE) > Channel:"+channel);System.out.println();//接收者在偵聽訂閱的消息時,將會阻塞進程,直至接收到quit消息(被動方式),或主動取消訂閱jedis.subscribe(listener, channel);}public void unsubscribe(String channel){System.out.println(" >>> 取消訂閱(UNSUBSCRIBE) > Channel:"+channel);System.out.println();listener.unsubscribe(channel);}@Overridepublic void run() {try{System.out.println();System.out.println("----------訂閱消息SUBSCRIBE 開始-------");subscribe();System.out.println("----------訂閱消息SUBSCRIBE 結束-------");System.out.println();}catch(Exception e){e.printStackTrace();}}}
消息監聽者
package message.kvstore.aliyun.com;import redis.clients.jedis.JedisPubSub;public class KVStoreMessageListener extends JedisPubSub{@Overridepublic void onMessage(String channel, String message) {System.out.println(" <<< 訂閱(SUBSCRIBE)< Channel:" + channel + " >接收到的Message:" + message );System.out.println();//當接收到的message為quit時,取消訂閱(被動方式)if(message.equalsIgnoreCase("quit")){this.unsubscribe(channel);}}@Overridepublic void onPMessage(String pattern, String channel, String message) {// TODO Auto-generated method stub}@Overridepublic void onSubscribe(String channel, int subscribedChannels) {// TODO Auto-generated method stub}@Overridepublic void onUnsubscribe(String channel, int subscribedChannels) {// TODO Auto-generated method stub}@Overridepublic void onPUnsubscribe(String pattern, int subscribedChannels) {// TODO Auto-generated method stub}@Overridepublic void onPSubscribe(String pattern, int subscribedChannels) {// TODO Auto-generated method stub}}
示例主程序
package message.kvstore.aliyun.com;import java.util.UUID;import redis.clients.jedis.JedisPubSub;public class KVStorePubSubTest {//ApsaraDB for Redis的連接信息,從控製台可以獲得static final String host = "xxxxxxxxxx.m.cnhza.kvstore.aliyuncs.com";static final int port = 6379;static final String password="xxxxxxxxxx:yyyyyyyy";//kvstore_instance_id:passwordpublic static void main(String[] args) throws Exception{KVStorePubClient pubClient = new KVStorePubClient(host, port,password);final String channel = "KVStore頻道-A";//消息發送者開始發消息,此時還無人訂閱,所以此消息不會被接收pubClient.pub(channel, "Aliyun消息1:(此時還無人訂閱,所以此消息不會被接收)");//消息接收者KVStoreSubClient subClient = new KVStoreSubClient(host, port,password);JedisPubSub listener = new KVStoreMessageListener();subClient.setChannelAndListener(listener, channel);//消息接收者開始訂閱subClient.start();//消息發送者繼續發消息for (int i = 0; i < 5; i++) {String message=UUID.randomUUID().toString();pubClient.pub(channel, message);Thread.sleep(1000);}//消息接收者主動取消訂閱subClient.unsubscribe(channel);Thread.sleep(1000);pubClient.pub(channel, "Aliyun消息2:(此時訂閱取消,所以此消息不會被接收)");//消息發布者結束發送,即發送一個“quit”消息;//此時如果有其他的消息接收者,那麼在listener.onMessage()中接收到“quit”時,將執行“unsubscribe”操作。pubClient.close(channel);}}
運行結果
在輸入了正確的 ApsaraDB for Redis 實例訪問地址和密碼之後,運行以上 Java 程序,輸出結果如下。
>>> 發布(PUBLISH) > Channel:KVStore頻道-A > 發送出的Message:Aliyun消息1:(此時還無人訂閱,所以此消息不會被接收)----------訂閱消息SUBSCRIBE 開始------->>> 訂閱(SUBSCRIBE) > Channel:KVStore頻道-A>>> 發布(PUBLISH) > Channel:KVStore頻道-A > 發送出的Message:0f9c2cee-77c7-4498-89a0-1dc5a2f65889<<< 訂閱(SUBSCRIBE)< Channel:KVStore頻道-A >接收到的Message:0f9c2cee-77c7-4498-89a0-1dc5a2f65889>>> 發布(PUBLISH) > Channel:KVStore頻道-A > 發送出的Message:ed5924a9-016b-469b-8203-7db63d06f812<<< 訂閱(SUBSCRIBE)< Channel:KVStore頻道-A >接收到的Message:ed5924a9-016b-469b-8203-7db63d06f812>>> 發布(PUBLISH) > Channel:KVStore頻道-A > 發送出的Message:f1f84e0f-8f35-4362-9567-25716b1531cd<<< 訂閱(SUBSCRIBE)< Channel:KVStore頻道-A >接收到的Message:f1f84e0f-8f35-4362-9567-25716b1531cd>>> 發布(PUBLISH) > Channel:KVStore頻道-A > 發送出的Message:746bde54-af8f-44d7-8a49-37d1a245d21b<<< 訂閱(SUBSCRIBE)< Channel:KVStore頻道-A >接收到的Message:746bde54-af8f-44d7-8a49-37d1a245d21b>>> 發布(PUBLISH) > Channel:KVStore頻道-A > 發送出的Message:8ac3b2b8-9906-4f61-8cad-84fc1f15a3ef<<< 訂閱(SUBSCRIBE)< Channel:KVStore頻道-A >接收到的Message:8ac3b2b8-9906-4f61-8cad-84fc1f15a3ef>>> 取消訂閱(UNSUBSCRIBE) > Channel:KVStore頻道-A----------訂閱消息SUBSCRIBE 結束------->>> 發布(PUBLISH) > Channel:KVStore頻道-A > 發送出的Message:Aliyun消息2:(此時訂閱取消,所以此消息不會被接收)>>> 發布(PUBLISH)結束 > Channel:KVStore頻道-A > Message:quit
以上示例中僅演示了一個發布者與一個訂閱者的情況,實際上發布者與訂閱者都可以為多個,發送消息的頻道(channel)也可以是多個,對以上代碼稍作修改即可。
最後更新:2016-12-16 17:21:38
上一篇:
商品相關性分析__最佳實踐_雲數據庫 Redis 版-阿裏雲
下一篇:
管道傳輸__最佳實踐_雲數據庫 Redis 版-阿裏雲
雲監控【主題模型】__開發人員指南_消息服務-阿裏雲
數據遷移規格說明__規格說明_購買指南_數據傳輸-阿裏雲
SLA__服務條款_用戶指南_專有網絡 VPC-阿裏雲
RemoveBackendServers__BackendServer相關API_API 參考_負載均衡-阿裏雲
Notification操作__主題接口規範_API使用手冊_消息服務-阿裏雲
寶寶樹-電商__最佳實踐_DDoS 高防IP-阿裏雲
五步玩轉事件通知__事件通知使用幫助_控製台使用幫助_消息服務-阿裏雲
OSS怎麼更改Object元數據信息__數據操作常見問題_產品使用問題_對象存儲 OSS-阿裏雲
業務風控Android/iOS使用說明__使用手冊_數據風控-阿裏雲
配置config子句__搜索子句介紹_API參考手冊_開放搜索-阿裏雲
相關內容
常見錯誤說明__附錄_大數據計算服務-阿裏雲
發送短信接口__API使用手冊_短信服務-阿裏雲
接口文檔__Android_安全組件教程_移動安全-阿裏雲
運營商錯誤碼(聯通)__常見問題_短信服務-阿裏雲
設置短信模板__使用手冊_短信服務-阿裏雲
OSS 權限問題及排查__常見錯誤及排除_最佳實踐_對象存儲 OSS-阿裏雲
消息通知__操作指南_批量計算-阿裏雲
設備端快速接入(MQTT)__快速開始_阿裏雲物聯網套件-阿裏雲
查詢API調用流量數據__API管理相關接口_API_API 網關-阿裏雲
使用STS訪問__JavaScript-SDK_SDK 參考_對象存儲 OSS-阿裏雲