975
阿裏雲
Spring 集成__Java SDK_TCP 接入(專業)_消息隊列 MQ-阿裏雲
本文介紹如何在 Spring 框架下用 MQ 收發消息。主要包括三部分內容:普通消息生產者和 Spring 集成,事務消息生產者和 Spring 集成, 消息消費者與 Spring 集成。
生產者與 Spring 集成
1.在 producer.xml 中定義生產者 Bean 等信息。
<?xml version="1.0" encoding="UTF-8"?><beans xmlns="https://www.springframework.org/schema/beans"xmlns:xsi="https://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="https://www.springframework.org/schema/beans https://www.springframework.org/schema/beans/spring-beans.xsd"><bean id="producer" class="com.aliyun.openservices.ons.api.bean.ProducerBean" init-method="start" destroy-method="shutdown"><property name="properties" > <!--生產者配置信息--><props><prop key="ProducerId">PID_DEMO</prop> <!--請替換XXX--><prop key="AccessKey">XXX</prop><prop key="SecretKey">XXX</prop></props></property></bean></beans>
2.通過已經與 Spring 集成好的生產者生產消息。
package demo;import com.aliyun.openservices.ons.api.Message;import com.aliyun.openservices.ons.api.Producer;import com.aliyun.openservices.ons.api.SendResult;import com.aliyun.openservices.ons.api.exception.ONSClientException;import org.springframework.context.ApplicationContext;import org.springframework.context.support.ClassPathXmlApplicationContext;public class ProduceWithSpring {public static void main(String[] args) {/*** 生產者Bean配置在producer.xml中,可通過ApplicationContext獲取或者直接注入到其他類(比如具體的Controller)中.*/ApplicationContext context = new ClassPathXmlApplicationContext("producer.xml");Producer producer = (Producer) context.getBean("producer");//循環發送消息for (int i = 0; i < 100; i++) {Message msg = new Message( //// Message Topic"TopicTestMQ",// Message Tag 可理解為Gmail中的標簽,對消息進行再歸類,方便Consumer指定過濾條件在MQ服務器過濾"TagA",// Message Body 可以是任何二進製形式的數據, MQ不做任何幹預// 需要Producer與Consumer協商好一致的序列化和反序列化方式"Hello MQ".getBytes());// 設置代表消息的業務關鍵屬性,請盡可能全局唯一// 以方便您在無法正常收到消息情況下,可通過MQ 控製台查詢消息並補發// 注意:不設置也不會影響消息正常收發msg.setKey("ORDERID_100");// 發送消息,隻要不拋異常就是成功try {SendResult sendResult = producer.send(msg);assert sendResult != null;System.out.println("send success: " + sendResult.getMessageId());}catch (ONSClientException e) {System.out.println("發送失敗");}}}}
事務消息生產者與 Spring 集成
有關事務消息的概念請查看發送事務消息。
1.首先需要實現一個 LocalTransactionChecker,如下所示。一個消息生產者隻能有一個 LocalTransactionChecker。
package demo;import com.aliyun.openservices.ons.api.Message;import com.aliyun.openservices.ons.api.transaction.LocalTransactionChecker;import com.aliyun.openservices.ons.api.transaction.TransactionStatus;public class DemoLocalTransactionChecker implements LocalTransactionChecker {public TransactionStatus check(Message msg) {System.out.println("開始回查本地事務狀態");return TransactionStatus.CommitTransaction; //根據本地事務狀態檢查結果返回不同的TransactionStatus}}
2.其次,在 transactionProducer.xml 中定義事務消息生產者 Bean 等信息。
<?xml version="1.0" encoding="UTF-8"?><beans xmlns="https://www.springframework.org/schema/beans"xmlns:xsi="https://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="https://www.springframework.org/schema/beans https://www.springframework.org/schema/beans/spring-beans.xsd"><bean id="localTransactionChecker" class="demo.DemoLocalTransactionChecker"></bean><bean id="transactionProducer" class="com.aliyun.openservices.ons.api.bean.TransactionProducerBean" init-method="start" destroy-method="shutdown"><property name="properties" > <!--事務消息生產者配置信息--><props><prop key="ProducerId">PID_DEMO</prop> <!--請替換XXX--><prop key="AccessKey">AKDEMO</prop><prop key="SecretKey">SKDEMO</prop></props></property><property name="localTransactionChecker" ref="localTransactionChecker"></property></bean></beans>
3.通過已經與 Spring 集成好的生產者生產事務消息。
package demo;import com.aliyun.openservices.ons.api.Message;import com.aliyun.openservices.ons.api.SendResult;import com.aliyun.openservices.ons.api.transaction.LocalTransactionExecuter;import com.aliyun.openservices.ons.api.transaction.TransactionProducer;import com.aliyun.openservices.ons.api.transaction.TransactionStatus;import org.springframework.context.ApplicationContext;import org.springframework.context.support.ClassPathXmlApplicationContext;public class ProduceTransMsgWithSpring {public static void main(String[] args) {/*** 事務消息生產者Bean配置在transactionProducer.xml中,可通過ApplicationContext獲取或者直接注入到其他類(比如具體的Controller)中.* 請結合例子"發送事務消息"*/ApplicationContext context = new ClassPathXmlApplicationContext("transactionProducer.xml");TransactionProducer transactionProducer = (TransactionProducer) context.getBean("transactionProducer");Message msg = new Message("XXX", "TagA", "Hello MQ transaction===".getBytes());SendResult sendResult = transactionProducer.send(msg, new LocalTransactionExecuter() {@Overridepublic TransactionStatus execute(Message msg, Object arg) {System.out.println("執行本地事務");return TransactionStatus.CommitTransaction; //根據本地事務執行結果來返回不同的TransactionStatus}}, null);}}
消費者與 Spring 集成
1.創建 MessageListener,如下所示。
package demo;import com.aliyun.openservices.ons.api.Action;import com.aliyun.openservices.ons.api.ConsumeContext;import com.aliyun.openservices.ons.api.Message;import com.aliyun.openservices.ons.api.MessageListener;public class DemoMessageListener implements MessageListener {public Action consume(Message message, ConsumeContext context) {System.out.println("Receive: " + message.getMsgID());try {//do something..return Action.CommitMessage;}catch (Exception e) {//消費失敗return Action.ReconsumeLater;}}}
2.在 consumer.xml 中定義消費者 Bean 等信息。
<?xml version="1.0" encoding="UTF-8"?><beans xmlns="https://www.springframework.org/schema/beans"xmlns:xsi="https://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="https://www.springframework.org/schema/beans https://www.springframework.org/schema/beans/spring-beans.xsd"><bean id="msgListener" class="demo.DemoMessageListener"></bean> <!--Listener配置--><bean id="consumer" class="com.aliyun.openservices.ons.api.bean.ConsumerBean" init-method="start" destroy-method="shutdown"><property name="properties" > <!--消費者配置信息--><props><prop key="ConsumerId">CID_DEMO</prop> <!--請替換XXX--><prop key="AccessKey">AKDEMO</prop><prop key="SecretKey">SKDEMO</prop><!--將消費者線程數固定為50個<prop key="ConsumeThreadNums">50</prop>--></props></property><property name="subscriptionTable"><map><entry value-ref="msgListener"><key><bean class="com.aliyun.openservices.ons.api.bean.Subscription"><property name="topic" value="TopicTestMQ"/><property name="expression" value="*"/><!--expression即Tag,可以設置成具體的Tag,如 taga||tagb||tagc,也可設置成*。 *僅代表訂閱所有Tag,不支持通配--></bean></key></entry><!--更多的訂閱添加entry節點即可--></map></property></bean></beans>
3.運行已經與 Spring 集成好的消費者,如下所示。
package demo;import org.springframework.context.ApplicationContext;import org.springframework.context.support.ClassPathXmlApplicationContext;public class ConsumeWithSpring {public static void main(String[] args) {/*** 消費者Bean配置在consumer.xml中,可通過ApplicationContext獲取或者直接注入到其他類(比如具體的Controller)中*/ApplicationContext context = new ClassPathXmlApplicationContext("consumer.xml");System.out.println("Consumer Started");}}
最後更新:2016-12-20 11:42:57
上一篇:
集群方式訂閱消息__Java SDK_TCP 接入(專業)_消息隊列 MQ-阿裏雲
下一篇:
C/C++ SDK 環境準備__C/C++ SDK_TCP 接入(專業)_消息隊列 MQ-阿裏雲
6.2 分析型數據庫權限模型__第六章 用戶與權限_使用手冊_分析型數據庫-阿裏雲
鑒權Action__OpenAPI RAM鑒權_OpenAPI 2.0_移動推送-阿裏雲
ODPS代碼編輯器__數據開發手冊_用戶操作指南_大數據開發套件-阿裏雲
阿裏雲棲大會結束了,但是這些東西不要錯過
OpenID Connect認證__使用手冊(開放API)_API 網關-阿裏雲
Dockerfile 支持的指令__構建管理_用戶指南_容器服務-阿裏雲
開始使用阿裏雲數據庫__快速入門(MySQL)_雲數據庫 RDS 版-阿裏雲
認證考試學習方法__大數據認證(ACP級)_如何獲得專業技術認證?_專業技術認證-阿裏雲
消息過濾__特色功能_消息隊列 MQ-阿裏雲
IT培訓細分化:雲計算和大數據缺口催生“阿裏雲大學”
相關內容
常見錯誤說明__附錄_大數據計算服務-阿裏雲
發送短信接口__API使用手冊_短信服務-阿裏雲
接口文檔__Android_安全組件教程_移動安全-阿裏雲
運營商錯誤碼(聯通)__常見問題_短信服務-阿裏雲
設置短信模板__使用手冊_短信服務-阿裏雲
OSS 權限問題及排查__常見錯誤及排除_最佳實踐_對象存儲 OSS-阿裏雲
消息通知__操作指南_批量計算-阿裏雲
設備端快速接入(MQTT)__快速開始_阿裏雲物聯網套件-阿裏雲
查詢API調用流量數據__API管理相關接口_API_API 網關-阿裏雲
使用STS訪問__JavaScript-SDK_SDK 參考_對象存儲 OSS-阿裏雲