212
阿里云
消息发布与订阅__最佳实践_云数据库 Redis 版-阿里云
消息的发布与订阅
场景介绍
ApsaraDB for Redis 也提供了与 Redis 相同的消息发布(pub)与订阅(sub)功能。即一个 client 发布消息,其他多个 client 订阅消息。
需要注意的是,ApsaraDB for Redis 发布的消息是“非持久”的,即消息发布者只负责发送消息,而不管消息是否有接收方,也不会保存之前发送的消息,即发布的消息“即发即失”;消息订阅者也只能得到订阅之后的消息,频道(channel)中此前的消息将无从获得。
此外,消息发布者(即 publish 客户端)无需独占与服务器端的连接,您可以在发布消息的同时,使用同一个客户端连接进行其他操作(例如 List 操作等)。但是,消息订阅者(即 subscribe 客户端)需要独占与服务器端的连接,即进行 subscribe 期间,该客户端无法执行其他操作,而是以阻塞的方式等待频道(channel)中的消息;因此消息订阅者需要使用单独的服务器连接,或者需要在单独的线程中使用(参见如下示例)。
代码示例
消息发布者 (即 publish client)
package message.kvstore.aliyun.com;import redis.clients.jedis.Jedis;public class KVStorePubClient {private Jedis jedis;//public KVStorePubClient(String host,int port, String password){jedis = new Jedis(host,port);//KVStore的实例ID及密码String authString = jedis.auth(password);//passwordif (!authString.equals("OK")){System.err.println("AUTH Failed: " + authString);return;}}public void pub(String channel,String message){System.out.println(" >>> 发布(PUBLISH) > Channel:"+channel+" > 发送出的Message:"+message);jedis.publish(channel, message);}public void close(String channel){System.out.println(" >>> 发布(PUBLISH)结束 > Channel:"+channel+" > Message:quit");//消息发布者结束发送,即发送一个“quit”消息;jedis.publish(channel, "quit");}}
消息订阅者 (即 subscribe client)
package message.kvstore.aliyun.com;import redis.clients.jedis.Jedis;import redis.clients.jedis.JedisPubSub;public class KVStoreSubClient extends Thread{private Jedis jedis;private String channel;private JedisPubSub listener;public KVStoreSubClient(String host,int port, String password){jedis = new Jedis(host,port);//ApsaraDB for Redis的实例ID及密码String authString = jedis.auth(password);//kvstore_instance_id:passwordif (!authString.equals("OK")){System.err.println("AUTH Failed: " + authString);return;}}public void setChannelAndListener(JedisPubSub listener,String channel){this.listener=listener;this.channel=channel;}private void subscribe(){if(listener==null || channel==null){System.err.println("Error:SubClient> listener or channel is null");}System.out.println(" >>> 订阅(SUBSCRIBE) > Channel:"+channel);System.out.println();//接收者在侦听订阅的消息时,将会阻塞进程,直至接收到quit消息(被动方式),或主动取消订阅jedis.subscribe(listener, channel);}public void unsubscribe(String channel){System.out.println(" >>> 取消订阅(UNSUBSCRIBE) > Channel:"+channel);System.out.println();listener.unsubscribe(channel);}@Overridepublic void run() {try{System.out.println();System.out.println("----------订阅消息SUBSCRIBE 开始-------");subscribe();System.out.println("----------订阅消息SUBSCRIBE 结束-------");System.out.println();}catch(Exception e){e.printStackTrace();}}}
消息监听者
package message.kvstore.aliyun.com;import redis.clients.jedis.JedisPubSub;public class KVStoreMessageListener extends JedisPubSub{@Overridepublic void onMessage(String channel, String message) {System.out.println(" <<< 订阅(SUBSCRIBE)< Channel:" + channel + " >接收到的Message:" + message );System.out.println();//当接收到的message为quit时,取消订阅(被动方式)if(message.equalsIgnoreCase("quit")){this.unsubscribe(channel);}}@Overridepublic void onPMessage(String pattern, String channel, String message) {// TODO Auto-generated method stub}@Overridepublic void onSubscribe(String channel, int subscribedChannels) {// TODO Auto-generated method stub}@Overridepublic void onUnsubscribe(String channel, int subscribedChannels) {// TODO Auto-generated method stub}@Overridepublic void onPUnsubscribe(String pattern, int subscribedChannels) {// TODO Auto-generated method stub}@Overridepublic void onPSubscribe(String pattern, int subscribedChannels) {// TODO Auto-generated method stub}}
示例主程序
package message.kvstore.aliyun.com;import java.util.UUID;import redis.clients.jedis.JedisPubSub;public class KVStorePubSubTest {//ApsaraDB for Redis的连接信息,从控制台可以获得static final String host = "xxxxxxxxxx.m.cnhza.kvstore.aliyuncs.com";static final int port = 6379;static final String password="xxxxxxxxxx:yyyyyyyy";//kvstore_instance_id:passwordpublic static void main(String[] args) throws Exception{KVStorePubClient pubClient = new KVStorePubClient(host, port,password);final String channel = "KVStore频道-A";//消息发送者开始发消息,此时还无人订阅,所以此消息不会被接收pubClient.pub(channel, "Aliyun消息1:(此时还无人订阅,所以此消息不会被接收)");//消息接收者KVStoreSubClient subClient = new KVStoreSubClient(host, port,password);JedisPubSub listener = new KVStoreMessageListener();subClient.setChannelAndListener(listener, channel);//消息接收者开始订阅subClient.start();//消息发送者继续发消息for (int i = 0; i < 5; i++) {String message=UUID.randomUUID().toString();pubClient.pub(channel, message);Thread.sleep(1000);}//消息接收者主动取消订阅subClient.unsubscribe(channel);Thread.sleep(1000);pubClient.pub(channel, "Aliyun消息2:(此时订阅取消,所以此消息不会被接收)");//消息发布者结束发送,即发送一个“quit”消息;//此时如果有其他的消息接收者,那么在listener.onMessage()中接收到“quit”时,将执行“unsubscribe”操作。pubClient.close(channel);}}
运行结果
在输入了正确的 ApsaraDB for Redis 实例访问地址和密码之后,运行以上 Java 程序,输出结果如下。
>>> 发布(PUBLISH) > Channel:KVStore频道-A > 发送出的Message:Aliyun消息1:(此时还无人订阅,所以此消息不会被接收)----------订阅消息SUBSCRIBE 开始------->>> 订阅(SUBSCRIBE) > Channel:KVStore频道-A>>> 发布(PUBLISH) > Channel:KVStore频道-A > 发送出的Message:0f9c2cee-77c7-4498-89a0-1dc5a2f65889<<< 订阅(SUBSCRIBE)< Channel:KVStore频道-A >接收到的Message:0f9c2cee-77c7-4498-89a0-1dc5a2f65889>>> 发布(PUBLISH) > Channel:KVStore频道-A > 发送出的Message:ed5924a9-016b-469b-8203-7db63d06f812<<< 订阅(SUBSCRIBE)< Channel:KVStore频道-A >接收到的Message:ed5924a9-016b-469b-8203-7db63d06f812>>> 发布(PUBLISH) > Channel:KVStore频道-A > 发送出的Message:f1f84e0f-8f35-4362-9567-25716b1531cd<<< 订阅(SUBSCRIBE)< Channel:KVStore频道-A >接收到的Message:f1f84e0f-8f35-4362-9567-25716b1531cd>>> 发布(PUBLISH) > Channel:KVStore频道-A > 发送出的Message:746bde54-af8f-44d7-8a49-37d1a245d21b<<< 订阅(SUBSCRIBE)< Channel:KVStore频道-A >接收到的Message:746bde54-af8f-44d7-8a49-37d1a245d21b>>> 发布(PUBLISH) > Channel:KVStore频道-A > 发送出的Message:8ac3b2b8-9906-4f61-8cad-84fc1f15a3ef<<< 订阅(SUBSCRIBE)< Channel:KVStore频道-A >接收到的Message:8ac3b2b8-9906-4f61-8cad-84fc1f15a3ef>>> 取消订阅(UNSUBSCRIBE) > Channel:KVStore频道-A----------订阅消息SUBSCRIBE 结束------->>> 发布(PUBLISH) > Channel:KVStore频道-A > 发送出的Message:Aliyun消息2:(此时订阅取消,所以此消息不会被接收)>>> 发布(PUBLISH)结束 > Channel:KVStore频道-A > Message:quit
以上示例中仅演示了一个发布者与一个订阅者的情况,实际上发布者与订阅者都可以为多个,发送消息的频道(channel)也可以是多个,对以上代码稍作修改即可。
最后更新:2016-12-16 17:21:38
上一篇:
商品相关性分析__最佳实践_云数据库 Redis 版-阿里云
下一篇:
管道传输__最佳实践_云数据库 Redis 版-阿里云
云监控【主题模型】__开发人员指南_消息服务-阿里云
数据迁移规格说明__规格说明_购买指南_数据传输-阿里云
SLA__服务条款_用户指南_专有网络 VPC-阿里云
RemoveBackendServers__BackendServer相关API_API 参考_负载均衡-阿里云
Notification操作__主题接口规范_API使用手册_消息服务-阿里云
宝宝树-电商__最佳实践_DDoS 高防IP-阿里云
五步玩转事件通知__事件通知使用帮助_控制台使用帮助_消息服务-阿里云
OSS怎么更改Object元数据信息__数据操作常见问题_产品使用问题_对象存储 OSS-阿里云
业务风控Android/iOS使用说明__使用手册_数据风控-阿里云
配置config子句__搜索子句介绍_API参考手册_开放搜索-阿里云
相关内容
常见错误说明__附录_大数据计算服务-阿里云
发送短信接口__API使用手册_短信服务-阿里云
接口文档__Android_安全组件教程_移动安全-阿里云
运营商错误码(联通)__常见问题_短信服务-阿里云
设置短信模板__使用手册_短信服务-阿里云
OSS 权限问题及排查__常见错误及排除_最佳实践_对象存储 OSS-阿里云
消息通知__操作指南_批量计算-阿里云
设备端快速接入(MQTT)__快速开始_阿里云物联网套件-阿里云
查询API调用流量数据__API管理相关接口_API_API 网关-阿里云
使用STS访问__JavaScript-SDK_SDK 参考_对象存储 OSS-阿里云