閱讀233 返回首頁    go 阿裏雲 go 技術社區[雲棲]


消息中間件ActiveMQ(4)--Publisher/Subscriber實驗

博學,切問,近思--詹子知 (https://jameszhan.github.io)


發布/訂閱(Publish/Subscribe)模式:發布/訂閱功能使消息的分發可以突破目的隊列地理指向的限製,使消息按照特定的主題甚至內容進行分發,用戶或應用程序可以根據主題或內容接收到所需要的消息。發布/訂閱功能使得發送者和接收者之間的耦合關係變得更為鬆散,發送者不必關心接收者的目的地址,而接收者也不必關心消息的發送地址,而隻是根據消息的主題進行消息的收發。在MQ家族產品中,MQ Event Broker是專門用於使用發布/訂閱技術進行數據通訊的產品,它支持基於隊列和直接基於TCP/IP兩種方式的發布和訂閱。

在開始編程之前,我們先看一下點對點和發布/訂閱接口的關係:

JMS 公共 PTP 域 Pub/Sub 域
ConnectionFactory QueueConnectionFactory TopicConnectionFactory
Connection QueueConnection TopicConnection
Destination Queue Topic
Session QueueSession TopicSession
MessageProducer QueueSender TopicPublisher
MessageConsumer QueueReceiver TopicSubscriber

JMS 1.1 通過統一的域簡化了消息傳遞,在編程中,我們實際上隻需要使用JMS公共域編程即可,對於P2P模式和Pub/Sub模式在編程方式上幾乎毫無區別。我們再看一下上文提到的那個JMS類關係圖:
JMS

按照JMS規範,發布消息的步驟如下:

  1. 從連接工廠中拿出Connecion對象。
  2. 和服務器建立連接(Connection.start())。
  3. 創建會話(Session)對象。
  4. 通過Session,在指定的Topic創建消息發布者(MessageProducer)。
  5. 使用Session創建消息。
  6. 使用消息生產者發布消息。
import javax.jms.Connection; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; public class Producer { private String name; private String dest; private Connection conn; private MessageProducer producer; private Session session; public Producer(Connection conn, String dest, String name) { this.conn = conn; this.dest = dest; this.name = name; } public void start() throws JMSException { //conn 可以不連接,當發送消息是會自動建立連接。 conn.start(); session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); producer = session.createProducer(session.createTopic(dest)); } public void send(String text) throws JMSException{ TextMessage msg = session.createTextMessage(name + ": " + text); producer.send(msg); } }

按照JMS規範,訂閱消息的步驟如下:

  1. 從連接工廠中拿出Connecion對象。
  2. 和服務器建立連接(Connection.start())。
  3. 創建會話(Session)對象。
  4. 通過Session,在指定的Topic創建消息訂閱者(MessageConsumer)。
  5. 訂閱消息:
    • 5.1.調用messageConsumer.receive方法接受消息,如果隊列上有消息,則receive方法返回該消息對象,如果隊列上無消息,則該方法阻塞。
    • 5.2.也可以以為Session指定MessageListener對象的方式來訂閱消息,該方法的好處在於,一旦有新消息到來,會自動觸發該對象的onMessage方法執行。
下類描述了以5.1的方式接受消息。 import java.util.concurrent.Executor; import java.util.concurrent.Executors; import javax.jms.Connection; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.Session; import javax.jms.TextMessage; public class Consumer { private String name; private String dest; private Connection conn; private MessageConsumer consumer; private Session session; private Executor executor = Executors.newFixedThreadPool(10); public Consumer(Connection conn, String dest, String name){ this.conn = conn; this.dest = dest; this.name = name; } public void start() throws JMSException{ //使用Consumer之前,必須調用conn的start方法建立連接。 conn.start(); session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); consumer = session.createConsumer(session.createTopic(dest)); } public void receive() { executor.execute(new Runnable() { @Override public void run() { while (true) { try { Message msg = consumer.receive(); if (msg instanceof TextMessage) { System.out.println(name + " receive message {" + ((TextMessage)msg).getText() + "}"); } else { System.out.println("msg: " + msg); } } catch (JMSException e) { e.printStackTrace(); } } } }); } } 下類描述了以5.2的方式接受消息。 import javax.jms.Connection; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Session; import javax.jms.TextMessage; public class Consumer2 implements MessageListener{ private String name; private String dest; private Connection conn; private MessageConsumer consumer; private Session session; public Consumer2(Connection conn, String dest, String name){ this.conn = conn; this.dest = dest; this.name = name; } public void start() throws JMSException{ //使用Consumer之前,必須調用conn的start方法建立連接。 conn.start(); session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); consumer = session.createConsumer(session.createTopic(dest)); consumer.setMessageListener(this); } @Override public void onMessage(Message msg) { try { System.out.println(name + " receive message {" + ((TextMessage)msg).getText() + "}"); } catch (JMSException e) { e.printStackTrace(); } } }

發布/訂閱者模式的特點是:

  1. 可以多個發布者對同一個Topic發布消息。
  2. 可以多個訂閱者監聽同一個Topic。
  3. 消息將被所有的訂閱者接收。默認情況下,消息隻會發送給所有在線的訂閱者,一旦消息發送給了所有在線的訂閱者,消息就會從Topic中移除。
  4. 可以特別地為主題創建持久的訂閱者,隻要消息不被該消費者消費,消息就會一直保留在Topic中,一旦該持久訂閱者上線,消息會自動發送給該訂閱者。

在文章 消息中間件ActiveMQ(2)--創建連接對象 中,我們介紹了創建連接對象的不同方法,這裏我們把這兩種方式做一個包裝: public class ConnFactory { private ConnectionFactory factory; public ConnFactory(){ try { Context context = new JndiFactory().getJndiContext(); this.factory = (ConnectionFactory) context.lookup("con1"); } catch (NamingException e) { this.factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_BROKER_URL); } } public Connection createConnection() throws JMSException{ return factory.createConnection(); } } 創建不同的發布者對同一Topic發送消息。 public static void main(String[] args) throws JMSException { ConnFactory cf = new ConnFactory(); Producer producer1 = new Producer(cf.createConnection(), "Topic1", "Product1"); Producer producer2 = new Producer(cf.createConnection(), "Topic1", "Product2"); producer1.start(); producer2.start(); for(int i = 0; i < 6; i++){ producer1.send("message " + i); producer2.send("message " + i); } } 創建不同的訂閱者監聽同一Topic。 public static void main(String[] args) throws JMSException { ConnFactory cf = new ConnFactory(); Consumer consumer1 = new Consumer(cf.createConnection(), "Topic1", "Consumer1"); Consumer consumer2 = new Consumer(cf.createConnection(), "Topic1", "Consumer2"); Consumer2 consumer3 = new Consumer2(cf.createConnection(), "Topic1", "Consumer3"); consumer1.start(); consumer2.start(); consumer3.start(); consumer1.receive(); consumer2.receive(); } 實驗結果如下(注意事項,應先啟動訂閱者監聽Topic,再使用發布者發布消息。): Consumer3 receive message {Product1: message 0} Consumer1 receive message {Product1: message 0} Consumer2 receive message {Product1: message 0} Consumer3 receive message {Product2: message 0} Consumer1 receive message {Product2: message 0} Consumer2 receive message {Product2: message 0} Consumer2 receive message {Product1: message 1} Consumer3 receive message {Product1: message 1} Consumer1 receive message {Product1: message 1} Consumer3 receive message {Product2: message 1} Consumer2 receive message {Product2: message 1} Consumer1 receive message {Product2: message 1} Consumer3 receive message {Product1: message 2} Consumer2 receive message {Product1: message 2} Consumer1 receive message {Product1: message 2} Consumer1 receive message {Product2: message 2} Consumer2 receive message {Product2: message 2} Consumer3 receive message {Product2: message 2} Consumer3 receive message {Product1: message 3} Consumer2 receive message {Product1: message 3} Consumer3 receive message {Product2: message 3} Consumer1 receive message {Product1: message 3} Consumer1 receive message {Product2: message 3} Consumer3 receive message {Product1: message 4} Consumer1 receive message {Product1: message 4} Consumer2 receive message {Product2: message 3} Consumer3 receive message {Product2: message 4} Consumer1 receive message {Product2: message 4} Consumer3 receive message {Product1: message 5} Consumer2 receive message {Product1: message 4} Consumer3 receive message {Product2: message 5} Consumer1 receive message {Product1: message 5} Consumer2 receive message {Product2: message 4} Consumer1 receive message {Product2: message 5} Consumer2 receive message {Product1: message 5} Consumer2 receive message {Product2: message 5}

最後更新:2017-04-02 04:00:25

  上一篇:go 百度有啊前端js框架分析(一)
  下一篇:go C++編程思想第二卷(實用編程技術)摘要