Spring 集成__Java SDK_TCP 接入(专业)_消息队列 MQ-阿里云
本文介绍如何在 Spring 框架下用 MQ 收发消息。主要包括三部分内容:普通消息生产者和 Spring 集成,事务消息生产者和 Spring 集成, 消息消费者与 Spring 集成。
生产者与 Spring 集成
1.在 producer.xml 中定义生产者 Bean 等信息。
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="https://www.springframework.org/schema/beans"
xmlns:xsi="https://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="https://www.springframework.org/schema/beans https://www.springframework.org/schema/beans/spring-beans.xsd">
<bean id="producer" class="com.aliyun.openservices.ons.api.bean.ProducerBean" init-method="start" destroy-method="shutdown">
<property name="properties" > <!--生产者配置信息-->
<props>
<prop key="ProducerId">PID_DEMO</prop> <!--请替换XXX-->
<prop key="AccessKey">XXX</prop>
<prop key="SecretKey">XXX</prop>
</props>
</property>
</bean>
</beans>
2.通过已经与 Spring 集成好的生产者生产消息。
package demo;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.Producer;
import com.aliyun.openservices.ons.api.SendResult;
import com.aliyun.openservices.ons.api.exception.ONSClientException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
public class ProduceWithSpring {
public static void main(String[] args) {
/**
* 生产者Bean配置在producer.xml中,可通过ApplicationContext获取或者直接注入到其他类(比如具体的Controller)中.
*/
ApplicationContext context = new ClassPathXmlApplicationContext("producer.xml");
Producer producer = (Producer) context.getBean("producer");
//循环发送消息
for (int i = 0; i < 100; i++) {
Message msg = new Message( //
// Message Topic
"TopicTestMQ",
// Message Tag 可理解为Gmail中的标签,对消息进行再归类,方便Consumer指定过滤条件在MQ服务器过滤
"TagA",
// Message Body 可以是任何二进制形式的数据, MQ不做任何干预
// 需要Producer与Consumer协商好一致的序列化和反序列化方式
"Hello MQ".getBytes());
// 设置代表消息的业务关键属性,请尽可能全局唯一
// 以方便您在无法正常收到消息情况下,可通过MQ 控制台查询消息并补发
// 注意:不设置也不会影响消息正常收发
msg.setKey("ORDERID_100");
// 发送消息,只要不抛异常就是成功
try {
SendResult sendResult = producer.send(msg);
assert sendResult != null;
System.out.println("send success: " + sendResult.getMessageId());
}catch (ONSClientException e) {
System.out.println("发送失败");
}
}
}
}
事务消息生产者与 Spring 集成
有关事务消息的概念请查看发送事务消息。
1.首先需要实现一个 LocalTransactionChecker,如下所示。一个消息生产者只能有一个 LocalTransactionChecker。
package demo;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.transaction.LocalTransactionChecker;
import com.aliyun.openservices.ons.api.transaction.TransactionStatus;
public class DemoLocalTransactionChecker implements LocalTransactionChecker {
public TransactionStatus check(Message msg) {
System.out.println("开始回查本地事务状态");
return TransactionStatus.CommitTransaction; //根据本地事务状态检查结果返回不同的TransactionStatus
}
}
2.其次,在 transactionProducer.xml 中定义事务消息生产者 Bean 等信息。
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="https://www.springframework.org/schema/beans"
xmlns:xsi="https://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="https://www.springframework.org/schema/beans https://www.springframework.org/schema/beans/spring-beans.xsd">
<bean id="localTransactionChecker" class="demo.DemoLocalTransactionChecker"></bean>
<bean id="transactionProducer" class="com.aliyun.openservices.ons.api.bean.TransactionProducerBean" init-method="start" destroy-method="shutdown">
<property name="properties" > <!--事务消息生产者配置信息-->
<props>
<prop key="ProducerId">PID_DEMO</prop> <!--请替换XXX-->
<prop key="AccessKey">AKDEMO</prop>
<prop key="SecretKey">SKDEMO</prop>
</props>
</property>
<property name="localTransactionChecker" ref="localTransactionChecker"></property>
</bean>
</beans>
3.通过已经与 Spring 集成好的生产者生产事务消息。
package demo;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.SendResult;
import com.aliyun.openservices.ons.api.transaction.LocalTransactionExecuter;
import com.aliyun.openservices.ons.api.transaction.TransactionProducer;
import com.aliyun.openservices.ons.api.transaction.TransactionStatus;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
public class ProduceTransMsgWithSpring {
public static void main(String[] args) {
/**
* 事务消息生产者Bean配置在transactionProducer.xml中,可通过ApplicationContext获取或者直接注入到其他类(比如具体的Controller)中.
* 请结合例子"发送事务消息"
*/
ApplicationContext context = new ClassPathXmlApplicationContext("transactionProducer.xml");
TransactionProducer transactionProducer = (TransactionProducer) context.getBean("transactionProducer");
Message msg = new Message("XXX", "TagA", "Hello MQ transaction===".getBytes());
SendResult sendResult = transactionProducer.send(msg, new LocalTransactionExecuter() {
@Override
public TransactionStatus execute(Message msg, Object arg) {
System.out.println("执行本地事务");
return TransactionStatus.CommitTransaction; //根据本地事务执行结果来返回不同的TransactionStatus
}
}, null);
}
}
消费者与 Spring 集成
1.创建 MessageListener,如下所示。
package demo;
import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.ConsumeContext;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.MessageListener;
public class DemoMessageListener implements MessageListener {
public Action consume(Message message, ConsumeContext context) {
System.out.println("Receive: " + message.getMsgID());
try {
//do something..
return Action.CommitMessage;
}catch (Exception e) {
//消费失败
return Action.ReconsumeLater;
}
}
}
2.在 consumer.xml 中定义消费者 Bean 等信息。
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="https://www.springframework.org/schema/beans"
xmlns:xsi="https://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="https://www.springframework.org/schema/beans https://www.springframework.org/schema/beans/spring-beans.xsd">
<bean id="msgListener" class="demo.DemoMessageListener"></bean> <!--Listener配置-->
<bean id="consumer" class="com.aliyun.openservices.ons.api.bean.ConsumerBean" init-method="start" destroy-method="shutdown">
<property name="properties" > <!--消费者配置信息-->
<props>
<prop key="ConsumerId">CID_DEMO</prop> <!--请替换XXX-->
<prop key="AccessKey">AKDEMO</prop>
<prop key="SecretKey">SKDEMO</prop>
<!--将消费者线程数固定为50个
<prop key="ConsumeThreadNums">50</prop>
-->
</props>
</property>
<property name="subscriptionTable">
<map>
<entry value-ref="msgListener">
<key>
<bean class="com.aliyun.openservices.ons.api.bean.Subscription">
<property name="topic" value="TopicTestMQ"/>
<property name="expression" value="*"/><!--expression即Tag,可以设置成具体的Tag,如 taga||tagb||tagc,也可设置成*。 *仅代表订阅所有Tag,不支持通配-->
</bean>
</key>
</entry>
<!--更多的订阅添加entry节点即可-->
</map>
</property>
</bean>
</beans>
3.运行已经与 Spring 集成好的消费者,如下所示。
package demo;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
public class ConsumeWithSpring {
public static void main(String[] args) {
/**
* 消费者Bean配置在consumer.xml中,可通过ApplicationContext获取或者直接注入到其他类(比如具体的Controller)中
*/
ApplicationContext context = new ClassPathXmlApplicationContext("consumer.xml");
System.out.println("Consumer Started");
}
}
最后更新:2016-12-20 11:42:57
上一篇:
集群方式订阅消息__Java SDK_TCP 接入(专业)_消息队列 MQ-阿里云
下一篇:
C/C++ SDK 环境准备__C/C++ SDK_TCP 接入(专业)_消息队列 MQ-阿里云
6.2 分析型数据库权限模型__第六章 用户与权限_使用手册_分析型数据库-阿里云
鉴权Action__OpenAPI RAM鉴权_OpenAPI 2.0_移动推送-阿里云
ODPS代码编辑器__数据开发手册_用户操作指南_大数据开发套件-阿里云
阿里云栖大会结束了,但是这些东西不要错过
OpenID Connect认证__使用手册(开放API)_API 网关-阿里云
Dockerfile 支持的指令__构建管理_用户指南_容器服务-阿里云
开始使用阿里云数据库__快速入门(MySQL)_云数据库 RDS 版-阿里云
认证考试学习方法__大数据认证(ACP级)_如何获得专业技术认证?_专业技术认证-阿里云
消息过滤__特色功能_消息队列 MQ-阿里云
IT培训细分化:云计算和大数据缺口催生“阿里云大学”
相关内容
常见错误说明__附录_大数据计算服务-阿里云
发送短信接口__API使用手册_短信服务-阿里云
接口文档__Android_安全组件教程_移动安全-阿里云
运营商错误码(联通)__常见问题_短信服务-阿里云
设置短信模板__使用手册_短信服务-阿里云
OSS 权限问题及排查__常见错误及排除_最佳实践_对象存储 OSS-阿里云
消息通知__操作指南_批量计算-阿里云
设备端快速接入(MQTT)__快速开始_阿里云物联网套件-阿里云
查询API调用流量数据__API管理相关接口_API_API 网关-阿里云
使用STS访问__JavaScript-SDK_SDK 参考_对象存储 OSS-阿里云