消息中間件ActiveMQ(4)--Publisher/Subscriber實驗
博學,切問,近思--詹子知 (https://jameszhan.github.io)
發布/訂閱(Publish/Subscribe)模式:發布/訂閱功能使消息的分發可以突破目的隊列地理指向的限製,使消息按照特定的主題甚至內容進行分發,用戶或應用程序可以根據主題或內容接收到所需要的消息。發布/訂閱功能使得發送者和接收者之間的耦合關係變得更為鬆散,發送者不必關心接收者的目的地址,而接收者也不必關心消息的發送地址,而隻是根據消息的主題進行消息的收發。在MQ家族產品中,MQ Event Broker是專門用於使用發布/訂閱技術進行數據通訊的產品,它支持基於隊列和直接基於TCP/IP兩種方式的發布和訂閱。
在開始編程之前,我們先看一下點對點和發布/訂閱接口的關係:
JMS 公共 | PTP 域 | Pub/Sub 域 |
ConnectionFactory | QueueConnectionFactory | TopicConnectionFactory |
Connection | QueueConnection | TopicConnection |
Destination | Queue | Topic |
Session | QueueSession | TopicSession |
MessageProducer | QueueSender | TopicPublisher |
MessageConsumer | QueueReceiver | TopicSubscriber |
JMS 1.1 通過統一的域簡化了消息傳遞,在編程中,我們實際上隻需要使用JMS公共域編程即可,對於P2P模式和Pub/Sub模式在編程方式上幾乎毫無區別。我們再看一下上文提到的那個JMS類關係圖:

按照JMS規範,發布消息的步驟如下:
- 從連接工廠中拿出Connecion對象。
- 和服務器建立連接(Connection.start())。
- 創建會話(Session)對象。
- 通過Session,在指定的Topic創建消息發布者(MessageProducer)。
- 使用Session創建消息。
- 使用消息生產者發布消息。
按照JMS規範,訂閱消息的步驟如下:
- 從連接工廠中拿出Connecion對象。
- 和服務器建立連接(Connection.start())。
- 創建會話(Session)對象。
- 通過Session,在指定的Topic創建消息訂閱者(MessageConsumer)。
- 訂閱消息:
- 5.1.調用messageConsumer.receive方法接受消息,如果隊列上有消息,則receive方法返回該消息對象,如果隊列上無消息,則該方法阻塞。
- 5.2.也可以以為Session指定MessageListener對象的方式來訂閱消息,該方法的好處在於,一旦有新消息到來,會自動觸發該對象的onMessage方法執行。
發布/訂閱者模式的特點是:
- 可以多個發布者對同一個Topic發布消息。
- 可以多個訂閱者監聽同一個Topic。
- 消息將被所有的訂閱者接收。默認情況下,消息隻會發送給所有在線的訂閱者,一旦消息發送給了所有在線的訂閱者,消息就會從Topic中移除。
- 可以特別地為主題創建持久的訂閱者,隻要消息不被該消費者消費,消息就會一直保留在Topic中,一旦該持久訂閱者上線,消息會自動發送給該訂閱者。
在文章 消息中間件ActiveMQ(2)--創建連接對象 中,我們介紹了創建連接對象的不同方法,這裏我們把這兩種方式做一個包裝: public class ConnFactory { private ConnectionFactory factory; public ConnFactory(){ try { Context context = new JndiFactory().getJndiContext(); this.factory = (ConnectionFactory) context.lookup("con1"); } catch (NamingException e) { this.factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_BROKER_URL); } } public Connection createConnection() throws JMSException{ return factory.createConnection(); } } 創建不同的發布者對同一Topic發送消息。 public static void main(String[] args) throws JMSException { ConnFactory cf = new ConnFactory(); Producer producer1 = new Producer(cf.createConnection(), "Topic1", "Product1"); Producer producer2 = new Producer(cf.createConnection(), "Topic1", "Product2"); producer1.start(); producer2.start(); for(int i = 0; i < 6; i++){ producer1.send("message " + i); producer2.send("message " + i); } } 創建不同的訂閱者監聽同一Topic。 public static void main(String[] args) throws JMSException { ConnFactory cf = new ConnFactory(); Consumer consumer1 = new Consumer(cf.createConnection(), "Topic1", "Consumer1"); Consumer consumer2 = new Consumer(cf.createConnection(), "Topic1", "Consumer2"); Consumer2 consumer3 = new Consumer2(cf.createConnection(), "Topic1", "Consumer3"); consumer1.start(); consumer2.start(); consumer3.start(); consumer1.receive(); consumer2.receive(); } 實驗結果如下(注意事項,應先啟動訂閱者監聽Topic,再使用發布者發布消息。): Consumer3 receive message {Product1: message 0} Consumer1 receive message {Product1: message 0} Consumer2 receive message {Product1: message 0} Consumer3 receive message {Product2: message 0} Consumer1 receive message {Product2: message 0} Consumer2 receive message {Product2: message 0} Consumer2 receive message {Product1: message 1} Consumer3 receive message {Product1: message 1} Consumer1 receive message {Product1: message 1} Consumer3 receive message {Product2: message 1} Consumer2 receive message {Product2: message 1} Consumer1 receive message {Product2: message 1} Consumer3 receive message {Product1: message 2} Consumer2 receive message {Product1: message 2} Consumer1 receive message {Product1: message 2} Consumer1 receive message {Product2: message 2} Consumer2 receive message {Product2: message 2} Consumer3 receive message {Product2: message 2} Consumer3 receive message {Product1: message 3} Consumer2 receive message {Product1: message 3} Consumer3 receive message {Product2: message 3} Consumer1 receive message {Product1: message 3} Consumer1 receive message {Product2: message 3} Consumer3 receive message {Product1: message 4} Consumer1 receive message {Product1: message 4} Consumer2 receive message {Product2: message 3} Consumer3 receive message {Product2: message 4} Consumer1 receive message {Product2: message 4} Consumer3 receive message {Product1: message 5} Consumer2 receive message {Product1: message 4} Consumer3 receive message {Product2: message 5} Consumer1 receive message {Product1: message 5} Consumer2 receive message {Product2: message 4} Consumer1 receive message {Product2: message 5} Consumer2 receive message {Product1: message 5} Consumer2 receive message {Product2: message 5}
最後更新:2017-04-02 04:00:25