Spring 集成__Java SDK_TCP 接入(專業)_消息隊列 MQ-阿裏雲
本文介紹如何在 Spring 框架下用 MQ 收發消息。主要包括三部分內容:普通消息生產者和 Spring 集成,事務消息生產者和 Spring 集成, 消息消費者與 Spring 集成。
生產者與 Spring 集成
1.在 producer.xml 中定義生產者 Bean 等信息。
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="https://www.springframework.org/schema/beans"
xmlns:xsi="https://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="https://www.springframework.org/schema/beans https://www.springframework.org/schema/beans/spring-beans.xsd">
<bean id="producer" class="com.aliyun.openservices.ons.api.bean.ProducerBean" init-method="start" destroy-method="shutdown">
<property name="properties" > <!--生產者配置信息-->
<props>
<prop key="ProducerId">PID_DEMO</prop> <!--請替換XXX-->
<prop key="AccessKey">XXX</prop>
<prop key="SecretKey">XXX</prop>
</props>
</property>
</bean>
</beans>
2.通過已經與 Spring 集成好的生產者生產消息。
package demo;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.Producer;
import com.aliyun.openservices.ons.api.SendResult;
import com.aliyun.openservices.ons.api.exception.ONSClientException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
public class ProduceWithSpring {
public static void main(String[] args) {
/**
* 生產者Bean配置在producer.xml中,可通過ApplicationContext獲取或者直接注入到其他類(比如具體的Controller)中.
*/
ApplicationContext context = new ClassPathXmlApplicationContext("producer.xml");
Producer producer = (Producer) context.getBean("producer");
//循環發送消息
for (int i = 0; i < 100; i++) {
Message msg = new Message( //
// Message Topic
"TopicTestMQ",
// Message Tag 可理解為Gmail中的標簽,對消息進行再歸類,方便Consumer指定過濾條件在MQ服務器過濾
"TagA",
// Message Body 可以是任何二進製形式的數據, MQ不做任何幹預
// 需要Producer與Consumer協商好一致的序列化和反序列化方式
"Hello MQ".getBytes());
// 設置代表消息的業務關鍵屬性,請盡可能全局唯一
// 以方便您在無法正常收到消息情況下,可通過MQ 控製台查詢消息並補發
// 注意:不設置也不會影響消息正常收發
msg.setKey("ORDERID_100");
// 發送消息,隻要不拋異常就是成功
try {
SendResult sendResult = producer.send(msg);
assert sendResult != null;
System.out.println("send success: " + sendResult.getMessageId());
}catch (ONSClientException e) {
System.out.println("發送失敗");
}
}
}
}
事務消息生產者與 Spring 集成
有關事務消息的概念請查看發送事務消息。
1.首先需要實現一個 LocalTransactionChecker,如下所示。一個消息生產者隻能有一個 LocalTransactionChecker。
package demo;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.transaction.LocalTransactionChecker;
import com.aliyun.openservices.ons.api.transaction.TransactionStatus;
public class DemoLocalTransactionChecker implements LocalTransactionChecker {
public TransactionStatus check(Message msg) {
System.out.println("開始回查本地事務狀態");
return TransactionStatus.CommitTransaction; //根據本地事務狀態檢查結果返回不同的TransactionStatus
}
}
2.其次,在 transactionProducer.xml 中定義事務消息生產者 Bean 等信息。
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="https://www.springframework.org/schema/beans"
xmlns:xsi="https://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="https://www.springframework.org/schema/beans https://www.springframework.org/schema/beans/spring-beans.xsd">
<bean id="localTransactionChecker" class="demo.DemoLocalTransactionChecker"></bean>
<bean id="transactionProducer" class="com.aliyun.openservices.ons.api.bean.TransactionProducerBean" init-method="start" destroy-method="shutdown">
<property name="properties" > <!--事務消息生產者配置信息-->
<props>
<prop key="ProducerId">PID_DEMO</prop> <!--請替換XXX-->
<prop key="AccessKey">AKDEMO</prop>
<prop key="SecretKey">SKDEMO</prop>
</props>
</property>
<property name="localTransactionChecker" ref="localTransactionChecker"></property>
</bean>
</beans>
3.通過已經與 Spring 集成好的生產者生產事務消息。
package demo;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.SendResult;
import com.aliyun.openservices.ons.api.transaction.LocalTransactionExecuter;
import com.aliyun.openservices.ons.api.transaction.TransactionProducer;
import com.aliyun.openservices.ons.api.transaction.TransactionStatus;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
public class ProduceTransMsgWithSpring {
public static void main(String[] args) {
/**
* 事務消息生產者Bean配置在transactionProducer.xml中,可通過ApplicationContext獲取或者直接注入到其他類(比如具體的Controller)中.
* 請結合例子"發送事務消息"
*/
ApplicationContext context = new ClassPathXmlApplicationContext("transactionProducer.xml");
TransactionProducer transactionProducer = (TransactionProducer) context.getBean("transactionProducer");
Message msg = new Message("XXX", "TagA", "Hello MQ transaction===".getBytes());
SendResult sendResult = transactionProducer.send(msg, new LocalTransactionExecuter() {
@Override
public TransactionStatus execute(Message msg, Object arg) {
System.out.println("執行本地事務");
return TransactionStatus.CommitTransaction; //根據本地事務執行結果來返回不同的TransactionStatus
}
}, null);
}
}
消費者與 Spring 集成
1.創建 MessageListener,如下所示。
package demo;
import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.ConsumeContext;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.MessageListener;
public class DemoMessageListener implements MessageListener {
public Action consume(Message message, ConsumeContext context) {
System.out.println("Receive: " + message.getMsgID());
try {
//do something..
return Action.CommitMessage;
}catch (Exception e) {
//消費失敗
return Action.ReconsumeLater;
}
}
}
2.在 consumer.xml 中定義消費者 Bean 等信息。
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="https://www.springframework.org/schema/beans"
xmlns:xsi="https://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="https://www.springframework.org/schema/beans https://www.springframework.org/schema/beans/spring-beans.xsd">
<bean id="msgListener" class="demo.DemoMessageListener"></bean> <!--Listener配置-->
<bean id="consumer" class="com.aliyun.openservices.ons.api.bean.ConsumerBean" init-method="start" destroy-method="shutdown">
<property name="properties" > <!--消費者配置信息-->
<props>
<prop key="ConsumerId">CID_DEMO</prop> <!--請替換XXX-->
<prop key="AccessKey">AKDEMO</prop>
<prop key="SecretKey">SKDEMO</prop>
<!--將消費者線程數固定為50個
<prop key="ConsumeThreadNums">50</prop>
-->
</props>
</property>
<property name="subscriptionTable">
<map>
<entry value-ref="msgListener">
<key>
<bean class="com.aliyun.openservices.ons.api.bean.Subscription">
<property name="topic" value="TopicTestMQ"/>
<property name="expression" value="*"/><!--expression即Tag,可以設置成具體的Tag,如 taga||tagb||tagc,也可設置成*。 *僅代表訂閱所有Tag,不支持通配-->
</bean>
</key>
</entry>
<!--更多的訂閱添加entry節點即可-->
</map>
</property>
</bean>
</beans>
3.運行已經與 Spring 集成好的消費者,如下所示。
package demo;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
public class ConsumeWithSpring {
public static void main(String[] args) {
/**
* 消費者Bean配置在consumer.xml中,可通過ApplicationContext獲取或者直接注入到其他類(比如具體的Controller)中
*/
ApplicationContext context = new ClassPathXmlApplicationContext("consumer.xml");
System.out.println("Consumer Started");
}
}
最後更新:2016-12-20 11:42:57
上一篇:
集群方式訂閱消息__Java SDK_TCP 接入(專業)_消息隊列 MQ-阿裏雲
下一篇:
C/C++ SDK 環境準備__C/C++ SDK_TCP 接入(專業)_消息隊列 MQ-阿裏雲
6.2 分析型數據庫權限模型__第六章 用戶與權限_使用手冊_分析型數據庫-阿裏雲
鑒權Action__OpenAPI RAM鑒權_OpenAPI 2.0_移動推送-阿裏雲
ODPS代碼編輯器__數據開發手冊_用戶操作指南_大數據開發套件-阿裏雲
阿裏雲棲大會結束了,但是這些東西不要錯過
OpenID Connect認證__使用手冊(開放API)_API 網關-阿裏雲
Dockerfile 支持的指令__構建管理_用戶指南_容器服務-阿裏雲
開始使用阿裏雲數據庫__快速入門(MySQL)_雲數據庫 RDS 版-阿裏雲
認證考試學習方法__大數據認證(ACP級)_如何獲得專業技術認證?_專業技術認證-阿裏雲
消息過濾__特色功能_消息隊列 MQ-阿裏雲
IT培訓細分化:雲計算和大數據缺口催生“阿裏雲大學”
相關內容
常見錯誤說明__附錄_大數據計算服務-阿裏雲
發送短信接口__API使用手冊_短信服務-阿裏雲
接口文檔__Android_安全組件教程_移動安全-阿裏雲
運營商錯誤碼(聯通)__常見問題_短信服務-阿裏雲
設置短信模板__使用手冊_短信服務-阿裏雲
OSS 權限問題及排查__常見錯誤及排除_最佳實踐_對象存儲 OSS-阿裏雲
消息通知__操作指南_批量計算-阿裏雲
設備端快速接入(MQTT)__快速開始_阿裏雲物聯網套件-阿裏雲
查詢API調用流量數據__API管理相關接口_API_API 網關-阿裏雲
使用STS訪問__JavaScript-SDK_SDK 參考_對象存儲 OSS-阿裏雲