閱讀975 返回首頁    go 阿裏雲


Spring 集成__Java SDK_TCP 接入(專業)_消息隊列 MQ-阿裏雲

本文介紹如何在 Spring 框架下用 MQ 收發消息。主要包括三部分內容:普通消息生產者和 Spring 集成,事務消息生產者和 Spring 集成, 消息消費者與 Spring 集成。

生產者與 Spring 集成

1.在 producer.xml 中定義生產者 Bean 等信息。

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <beans xmlns="https://www.springframework.org/schema/beans"
  3. xmlns:xsi="https://www.w3.org/2001/XMLSchema-instance"
  4. xsi:schemaLocation="https://www.springframework.org/schema/beans https://www.springframework.org/schema/beans/spring-beans.xsd">
  5. <bean id="producer" class="com.aliyun.openservices.ons.api.bean.ProducerBean" init-method="start" destroy-method="shutdown">
  6. <property name="properties" > <!--生產者配置信息-->
  7. <props>
  8. <prop key="ProducerId">PID_DEMO</prop> <!--請替換XXX-->
  9. <prop key="AccessKey">XXX</prop>
  10. <prop key="SecretKey">XXX</prop>
  11. </props>
  12. </property>
  13. </bean>
  14. </beans>

2.通過已經與 Spring 集成好的生產者生產消息。

  1. package demo;
  2. import com.aliyun.openservices.ons.api.Message;
  3. import com.aliyun.openservices.ons.api.Producer;
  4. import com.aliyun.openservices.ons.api.SendResult;
  5. import com.aliyun.openservices.ons.api.exception.ONSClientException;
  6. import org.springframework.context.ApplicationContext;
  7. import org.springframework.context.support.ClassPathXmlApplicationContext;
  8. public class ProduceWithSpring {
  9. public static void main(String[] args) {
  10. /**
  11. * 生產者Bean配置在producer.xml中,可通過ApplicationContext獲取或者直接注入到其他類(比如具體的Controller)中.
  12. */
  13. ApplicationContext context = new ClassPathXmlApplicationContext("producer.xml");
  14. Producer producer = (Producer) context.getBean("producer");
  15. //循環發送消息
  16. for (int i = 0; i < 100; i++) {
  17. Message msg = new Message( //
  18. // Message Topic
  19. "TopicTestMQ",
  20. // Message Tag 可理解為Gmail中的標簽,對消息進行再歸類,方便Consumer指定過濾條件在MQ服務器過濾
  21. "TagA",
  22. // Message Body 可以是任何二進製形式的數據, MQ不做任何幹預
  23. // 需要Producer與Consumer協商好一致的序列化和反序列化方式
  24. "Hello MQ".getBytes());
  25. // 設置代表消息的業務關鍵屬性,請盡可能全局唯一
  26. // 以方便您在無法正常收到消息情況下,可通過MQ 控製台查詢消息並補發
  27. // 注意:不設置也不會影響消息正常收發
  28. msg.setKey("ORDERID_100");
  29. // 發送消息,隻要不拋異常就是成功
  30. try {
  31. SendResult sendResult = producer.send(msg);
  32. assert sendResult != null;
  33. System.out.println("send success: " + sendResult.getMessageId());
  34. }catch (ONSClientException e) {
  35. System.out.println("發送失敗");
  36. }
  37. }
  38. }
  39. }

事務消息生產者與 Spring 集成

有關事務消息的概念請查看發送事務消息

1.首先需要實現一個 LocalTransactionChecker,如下所示。一個消息生產者隻能有一個 LocalTransactionChecker。

  1. package demo;
  2. import com.aliyun.openservices.ons.api.Message;
  3. import com.aliyun.openservices.ons.api.transaction.LocalTransactionChecker;
  4. import com.aliyun.openservices.ons.api.transaction.TransactionStatus;
  5. public class DemoLocalTransactionChecker implements LocalTransactionChecker {
  6. public TransactionStatus check(Message msg) {
  7. System.out.println("開始回查本地事務狀態");
  8. return TransactionStatus.CommitTransaction; //根據本地事務狀態檢查結果返回不同的TransactionStatus
  9. }
  10. }

2.其次,在 transactionProducer.xml 中定義事務消息生產者 Bean 等信息。

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <beans xmlns="https://www.springframework.org/schema/beans"
  3. xmlns:xsi="https://www.w3.org/2001/XMLSchema-instance"
  4. xsi:schemaLocation="https://www.springframework.org/schema/beans https://www.springframework.org/schema/beans/spring-beans.xsd">
  5. <bean id="localTransactionChecker" class="demo.DemoLocalTransactionChecker"></bean>
  6. <bean id="transactionProducer" class="com.aliyun.openservices.ons.api.bean.TransactionProducerBean" init-method="start" destroy-method="shutdown">
  7. <property name="properties" > <!--事務消息生產者配置信息-->
  8. <props>
  9. <prop key="ProducerId">PID_DEMO</prop> <!--請替換XXX-->
  10. <prop key="AccessKey">AKDEMO</prop>
  11. <prop key="SecretKey">SKDEMO</prop>
  12. </props>
  13. </property>
  14. <property name="localTransactionChecker" ref="localTransactionChecker"></property>
  15. </bean>
  16. </beans>

3.通過已經與 Spring 集成好的生產者生產事務消息。

  1. package demo;
  2. import com.aliyun.openservices.ons.api.Message;
  3. import com.aliyun.openservices.ons.api.SendResult;
  4. import com.aliyun.openservices.ons.api.transaction.LocalTransactionExecuter;
  5. import com.aliyun.openservices.ons.api.transaction.TransactionProducer;
  6. import com.aliyun.openservices.ons.api.transaction.TransactionStatus;
  7. import org.springframework.context.ApplicationContext;
  8. import org.springframework.context.support.ClassPathXmlApplicationContext;
  9. public class ProduceTransMsgWithSpring {
  10. public static void main(String[] args) {
  11. /**
  12. * 事務消息生產者Bean配置在transactionProducer.xml中,可通過ApplicationContext獲取或者直接注入到其他類(比如具體的Controller)中.
  13. * 請結合例子"發送事務消息"
  14. */
  15. ApplicationContext context = new ClassPathXmlApplicationContext("transactionProducer.xml");
  16. TransactionProducer transactionProducer = (TransactionProducer) context.getBean("transactionProducer");
  17. Message msg = new Message("XXX", "TagA", "Hello MQ transaction===".getBytes());
  18. SendResult sendResult = transactionProducer.send(msg, new LocalTransactionExecuter() {
  19. @Override
  20. public TransactionStatus execute(Message msg, Object arg) {
  21. System.out.println("執行本地事務");
  22. return TransactionStatus.CommitTransaction; //根據本地事務執行結果來返回不同的TransactionStatus
  23. }
  24. }, null);
  25. }
  26. }

消費者與 Spring 集成

1.創建 MessageListener,如下所示。

  1. package demo;
  2. import com.aliyun.openservices.ons.api.Action;
  3. import com.aliyun.openservices.ons.api.ConsumeContext;
  4. import com.aliyun.openservices.ons.api.Message;
  5. import com.aliyun.openservices.ons.api.MessageListener;
  6. public class DemoMessageListener implements MessageListener {
  7. public Action consume(Message message, ConsumeContext context) {
  8. System.out.println("Receive: " + message.getMsgID());
  9. try {
  10. //do something..
  11. return Action.CommitMessage;
  12. }catch (Exception e) {
  13. //消費失敗
  14. return Action.ReconsumeLater;
  15. }
  16. }
  17. }

2.在 consumer.xml 中定義消費者 Bean 等信息。

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <beans xmlns="https://www.springframework.org/schema/beans"
  3. xmlns:xsi="https://www.w3.org/2001/XMLSchema-instance"
  4. xsi:schemaLocation="https://www.springframework.org/schema/beans https://www.springframework.org/schema/beans/spring-beans.xsd">
  5. <bean id="msgListener" class="demo.DemoMessageListener"></bean> <!--Listener配置-->
  6. <bean id="consumer" class="com.aliyun.openservices.ons.api.bean.ConsumerBean" init-method="start" destroy-method="shutdown">
  7. <property name="properties" > <!--消費者配置信息-->
  8. <props>
  9. <prop key="ConsumerId">CID_DEMO</prop> <!--請替換XXX-->
  10. <prop key="AccessKey">AKDEMO</prop>
  11. <prop key="SecretKey">SKDEMO</prop>
  12. <!--將消費者線程數固定為50個
  13. <prop key="ConsumeThreadNums">50</prop>
  14. -->
  15. </props>
  16. </property>
  17. <property name="subscriptionTable">
  18. <map>
  19. <entry value-ref="msgListener">
  20. <key>
  21. <bean class="com.aliyun.openservices.ons.api.bean.Subscription">
  22. <property name="topic" value="TopicTestMQ"/>
  23. <property name="expression" value="*"/><!--expression即Tag,可以設置成具體的Tag,如 taga||tagb||tagc,也可設置成*。 *僅代表訂閱所有Tag,不支持通配-->
  24. </bean>
  25. </key>
  26. </entry>
  27. <!--更多的訂閱添加entry節點即可-->
  28. </map>
  29. </property>
  30. </bean>
  31. </beans>

3.運行已經與 Spring 集成好的消費者,如下所示。

  1. package demo;
  2. import org.springframework.context.ApplicationContext;
  3. import org.springframework.context.support.ClassPathXmlApplicationContext;
  4. public class ConsumeWithSpring {
  5. public static void main(String[] args) {
  6. /**
  7. * 消費者Bean配置在consumer.xml中,可通過ApplicationContext獲取或者直接注入到其他類(比如具體的Controller)中
  8. */
  9. ApplicationContext context = new ClassPathXmlApplicationContext("consumer.xml");
  10. System.out.println("Consumer Started");
  11. }
  12. }

最後更新:2016-12-20 11:42:57

  上一篇:go 集群方式訂閱消息__Java SDK_TCP 接入(專業)_消息隊列 MQ-阿裏雲
  下一篇:go C/C++ SDK 環境準備__C/C++ SDK_TCP 接入(專業)_消息隊列 MQ-阿裏雲