閱讀212 返回首頁    go 魔獸


消息發布與訂閱__最佳實踐_雲數據庫 Redis 版-阿裏雲

消息的發布與訂閱

場景介紹

ApsaraDB for Redis 也提供了與 Redis 相同的消息發布(pub)與訂閱(sub)功能。即一個 client 發布消息,其他多個 client 訂閱消息。

需要注意的是,ApsaraDB for Redis 發布的消息是“非持久”的,即消息發布者隻負責發送消息,而不管消息是否有接收方,也不會保存之前發送的消息,即發布的消息“即發即失”;消息訂閱者也隻能得到訂閱之後的消息,頻道(channel)中此前的消息將無從獲得。

此外,消息發布者(即 publish 客戶端)無需獨占與服務器端的連接,您可以在發布消息的同時,使用同一個客戶端連接進行其他操作(例如 List 操作等)。但是,消息訂閱者(即 subscribe 客戶端)需要獨占與服務器端的連接,即進行 subscribe 期間,該客戶端無法執行其他操作,而是以阻塞的方式等待頻道(channel)中的消息;因此消息訂閱者需要使用單獨的服務器連接,或者需要在單獨的線程中使用(參見如下示例)。

代碼示例

消息發布者 (即 publish client)

  1. package message.kvstore.aliyun.com;
  2. import redis.clients.jedis.Jedis;
  3. public class KVStorePubClient {
  4. private Jedis jedis;//
  5. public KVStorePubClient(String host,int port, String password){
  6. jedis = new Jedis(host,port);
  7. //KVStore的實例ID及密碼
  8. String authString = jedis.auth(password);//password
  9. if (!authString.equals("OK"))
  10. {
  11. System.err.println("AUTH Failed: " + authString);
  12. return;
  13. }
  14. }
  15. public void pub(String channel,String message){
  16. System.out.println(" >>> 發布(PUBLISH) > Channel:"+channel+" > 發送出的Message:"+message);
  17. jedis.publish(channel, message);
  18. }
  19. public void close(String channel){
  20. System.out.println(" >>> 發布(PUBLISH)結束 > Channel:"+channel+" > Message:quit");
  21. //消息發布者結束發送,即發送一個“quit”消息;
  22. jedis.publish(channel, "quit");
  23. }
  24. }

消息訂閱者 (即 subscribe client)

  1. package message.kvstore.aliyun.com;
  2. import redis.clients.jedis.Jedis;
  3. import redis.clients.jedis.JedisPubSub;
  4. public class KVStoreSubClient extends Thread{
  5. private Jedis jedis;
  6. private String channel;
  7. private JedisPubSub listener;
  8. public KVStoreSubClient(String host,int port, String password){
  9. jedis = new Jedis(host,port);
  10. //ApsaraDB for Redis的實例ID及密碼
  11. String authString = jedis.auth(password);//kvstore_instance_id:password
  12. if (!authString.equals("OK"))
  13. {
  14. System.err.println("AUTH Failed: " + authString);
  15. return;
  16. }
  17. }
  18. public void setChannelAndListener(JedisPubSub listener,String channel){
  19. this.listener=listener;
  20. this.channel=channel;
  21. }
  22. private void subscribe(){
  23. if(listener==null || channel==null){
  24. System.err.println("Error:SubClient> listener or channel is null");
  25. }
  26. System.out.println(" >>> 訂閱(SUBSCRIBE) > Channel:"+channel);
  27. System.out.println();
  28. //接收者在偵聽訂閱的消息時,將會阻塞進程,直至接收到quit消息(被動方式),或主動取消訂閱
  29. jedis.subscribe(listener, channel);
  30. }
  31. public void unsubscribe(String channel){
  32. System.out.println(" >>> 取消訂閱(UNSUBSCRIBE) > Channel:"+channel);
  33. System.out.println();
  34. listener.unsubscribe(channel);
  35. }
  36. @Override
  37. public void run() {
  38. try{
  39. System.out.println();
  40. System.out.println("----------訂閱消息SUBSCRIBE 開始-------");
  41. subscribe();
  42. System.out.println("----------訂閱消息SUBSCRIBE 結束-------");
  43. System.out.println();
  44. }catch(Exception e){
  45. e.printStackTrace();
  46. }
  47. }
  48. }

消息監聽者

  1. package message.kvstore.aliyun.com;
  2. import redis.clients.jedis.JedisPubSub;
  3. public class KVStoreMessageListener extends JedisPubSub{
  4. @Override
  5. public void onMessage(String channel, String message) {
  6. System.out.println(" <<< 訂閱(SUBSCRIBE)< Channel:" + channel + " >接收到的Message:" + message );
  7. System.out.println();
  8. //當接收到的message為quit時,取消訂閱(被動方式)
  9. if(message.equalsIgnoreCase("quit")){
  10. this.unsubscribe(channel);
  11. }
  12. }
  13. @Override
  14. public void onPMessage(String pattern, String channel, String message) {
  15. // TODO Auto-generated method stub
  16. }
  17. @Override
  18. public void onSubscribe(String channel, int subscribedChannels) {
  19. // TODO Auto-generated method stub
  20. }
  21. @Override
  22. public void onUnsubscribe(String channel, int subscribedChannels) {
  23. // TODO Auto-generated method stub
  24. }
  25. @Override
  26. public void onPUnsubscribe(String pattern, int subscribedChannels) {
  27. // TODO Auto-generated method stub
  28. }
  29. @Override
  30. public void onPSubscribe(String pattern, int subscribedChannels) {
  31. // TODO Auto-generated method stub
  32. }
  33. }

示例主程序

  1. package message.kvstore.aliyun.com;
  2. import java.util.UUID;
  3. import redis.clients.jedis.JedisPubSub;
  4. public class KVStorePubSubTest {
  5. //ApsaraDB for Redis的連接信息,從控製台可以獲得
  6. static final String host = "xxxxxxxxxx.m.cnhza.kvstore.aliyuncs.com";
  7. static final int port = 6379;
  8. static final String password="xxxxxxxxxx:yyyyyyyy";//kvstore_instance_id:password
  9. public static void main(String[] args) throws Exception{
  10. KVStorePubClient pubClient = new KVStorePubClient(host, port,password);
  11. final String channel = "KVStore頻道-A";
  12. //消息發送者開始發消息,此時還無人訂閱,所以此消息不會被接收
  13. pubClient.pub(channel, "Aliyun消息1:(此時還無人訂閱,所以此消息不會被接收)");
  14. //消息接收者
  15. KVStoreSubClient subClient = new KVStoreSubClient(host, port,password);
  16. JedisPubSub listener = new KVStoreMessageListener();
  17. subClient.setChannelAndListener(listener, channel);
  18. //消息接收者開始訂閱
  19. subClient.start();
  20. //消息發送者繼續發消息
  21. for (int i = 0; i < 5; i++) {
  22. String message=UUID.randomUUID().toString();
  23. pubClient.pub(channel, message);
  24. Thread.sleep(1000);
  25. }
  26. //消息接收者主動取消訂閱
  27. subClient.unsubscribe(channel);
  28. Thread.sleep(1000);
  29. pubClient.pub(channel, "Aliyun消息2:(此時訂閱取消,所以此消息不會被接收)");
  30. //消息發布者結束發送,即發送一個“quit”消息;
  31. //此時如果有其他的消息接收者,那麼在listener.onMessage()中接收到“quit”時,將執行“unsubscribe”操作。
  32. pubClient.close(channel);
  33. }
  34. }

運行結果

在輸入了正確的 ApsaraDB for Redis 實例訪問地址和密碼之後,運行以上 Java 程序,輸出結果如下。

  1. >>> 發布(PUBLISH) > Channel:KVStore頻道-A > 發送出的Message:Aliyun消息1:(此時還無人訂閱,所以此消息不會被接收)
  2. ----------訂閱消息SUBSCRIBE 開始-------
  3. >>> 訂閱(SUBSCRIBE) > Channel:KVStore頻道-A
  4. >>> 發布(PUBLISH) > Channel:KVStore頻道-A > 發送出的Message:0f9c2cee-77c7-4498-89a0-1dc5a2f65889
  5. <<< 訂閱(SUBSCRIBE)< Channel:KVStore頻道-A >接收到的Message:0f9c2cee-77c7-4498-89a0-1dc5a2f65889
  6. >>> 發布(PUBLISH) > Channel:KVStore頻道-A > 發送出的Message:ed5924a9-016b-469b-8203-7db63d06f812
  7. <<< 訂閱(SUBSCRIBE)< Channel:KVStore頻道-A >接收到的Message:ed5924a9-016b-469b-8203-7db63d06f812
  8. >>> 發布(PUBLISH) > Channel:KVStore頻道-A > 發送出的Message:f1f84e0f-8f35-4362-9567-25716b1531cd
  9. <<< 訂閱(SUBSCRIBE)< Channel:KVStore頻道-A >接收到的Message:f1f84e0f-8f35-4362-9567-25716b1531cd
  10. >>> 發布(PUBLISH) > Channel:KVStore頻道-A > 發送出的Message:746bde54-af8f-44d7-8a49-37d1a245d21b
  11. <<< 訂閱(SUBSCRIBE)< Channel:KVStore頻道-A >接收到的Message:746bde54-af8f-44d7-8a49-37d1a245d21b
  12. >>> 發布(PUBLISH) > Channel:KVStore頻道-A > 發送出的Message:8ac3b2b8-9906-4f61-8cad-84fc1f15a3ef
  13. <<< 訂閱(SUBSCRIBE)< Channel:KVStore頻道-A >接收到的Message:8ac3b2b8-9906-4f61-8cad-84fc1f15a3ef
  14. >>> 取消訂閱(UNSUBSCRIBE) > Channel:KVStore頻道-A
  15. ----------訂閱消息SUBSCRIBE 結束-------
  16. >>> 發布(PUBLISH) > Channel:KVStore頻道-A > 發送出的Message:Aliyun消息2:(此時訂閱取消,所以此消息不會被接收)
  17. >>> 發布(PUBLISH)結束 > Channel:KVStore頻道-A > Message:quit

以上示例中僅演示了一個發布者與一個訂閱者的情況,實際上發布者與訂閱者都可以為多個,發送消息的頻道(channel)也可以是多個,對以上代碼稍作修改即可。

最後更新:2016-12-16 17:21:38

  上一篇:go 商品相關性分析__最佳實踐_雲數據庫 Redis 版-阿裏雲
  下一篇:go 管道傳輸__最佳實踐_雲數據庫 Redis 版-阿裏雲