閱讀532 返回首頁    go 阿裏雲 go 技術社區[雲棲]


消息中間件ActiveMQ(3)--P2P實驗

博學,切問,近思--詹子知 (https://jameszhan.github.io)


點對點方式是最為傳統和常見的通訊方式,它支持一對一、一對多、多對多、多對一等多種配置方式,支持樹狀、網狀等多種拓撲結構。 

按照JMS規範,發送消息的步驟如下:

1.從連接工廠中拿出Connecion對象。

2.和服務器建立連接(Connection.start())。

3.創建會話(Session)對象。

4.通過Session,在指定的Queue創建消息生產者(MessageProducer)。

5.使用Session創建消息。

6.使用消息生產者發送消息。 import javax.jms.Connection; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; public class Producer { private String name; private String dest; private Connection conn; private MessageProducer producer; private Session session; public Producer(Connection conn, String dest, String name) { this.conn = conn; this.dest = dest; this.name = name; } public void start() throws JMSException { //conn 可以不連接,當發送消息是會自動建立連接。 conn.start(); session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); producer = session.createProducer(session.createQueue(dest)); } public void send(String text) throws JMSException{ TextMessage msg = session.createTextMessage(name + ": " + text); producer.send(msg); } }    

接收消息的步驟如下: 

1.從連接工廠中拿出Connecion對象。

2.和服務器建立連接(Connection.start())。 

3.創建會話(Session)對象。

4.通過Session,在指定的Queue創建消息接受者(MessageConsumer)。

5.1.調用messageConsumer.receive方法接受消息,如果隊列上有消息,則receive方法返回該消息對象,如果隊列上無消息,則該方法阻塞。

5.2.也可以以為Session指定MessageListener對象的方式來接受消息,該方法的好處在於,一旦有新消息到來,會自動觸發該對象的onMessage方法執行。 

下類描述了以5.1的方式接受消息。 import java.util.concurrent.Executor; import java.util.concurrent.Executors; import javax.jms.Connection; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.Session; import javax.jms.TextMessage; public class Consumer { private String name; private String dest; private Connection conn; private MessageConsumer consumer; private Session session; private Executor executor = Executors.newFixedThreadPool(10); public Consumer(Connection conn, String dest, String name){ this.conn = conn; this.dest = dest; this.name = name; } public void start() throws JMSException{ //使用Consumer之前,必須調用conn的start方法建立連接。 conn.start(); session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); consumer = session.createConsumer(session.createQueue(dest)); } public void receive() { executor.execute(new Runnable() { @Override public void run() { while (true) { try { Message msg = consumer.receive(); if (msg instanceof TextMessage) { System.out.println(name + " receive message {" + ((TextMessage)msg).getText() + "}"); } else { System.out.println("msg: " + msg); } } catch (JMSException e) { e.printStackTrace(); } } } }); } }

下類描述了以5.2的方式接受消息。 import javax.jms.Connection; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Session; import javax.jms.TextMessage; public class Consumer2 implements MessageListener{ private String name; private String dest; private Connection conn; private MessageConsumer consumer; private Session session; public Consumer2(Connection conn, String dest, String name){ this.conn = conn; this.dest = dest; this.name = name; } public void start() throws JMSException{ //使用Consumer之前,必須調用conn的start方法建立連接。 conn.start(); session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); consumer = session.createConsumer(session.createQueue(dest)); consumer.setMessageListener(this); } @Override public void onMessage(Message msg) { try { System.out.println(name + " receive message {" + ((TextMessage)msg).getText() + "}"); } catch (JMSException e) { e.printStackTrace(); } } }消息隊列的特點是:

1.可以多個生產者對同一個消息隊列發送消息。

2.可以多個接受者監聽同一個消息對列。

3.消息隻能一次性被消費,一旦消息被Consumer1消費了,則Consumer2不可能再拿到這一消息,並且同時該消息被消息隊列移除。

4.持久性存儲,一旦消息沒有被消費,消息會一直保留在消息隊列中。 

利用消息隊列的這一特點,我們可以實現簡單的負載均衡,比如,我們可以部署幾個相同的Service到不同的機器上,讓他們監聽同一個Queue,那麼客戶的請求到來後,消息中間件會動態分配其到某一個Service處理。 

上一篇文章,我們介紹了創建連接對象的不同方法,這裏我們把這兩種方式做一個包裝:public class ConnFactory { private ConnectionFactory factory; public ConnFactory(){ try { Context context = new JndiFactory().getJndiContext(); this.factory = (ConnectionFactory) context.lookup("con1"); } catch (NamingException e) { this.factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_BROKER_URL); } } public Connection createConnection() throws JMSException{ return factory.createConnection(); } }  
創建不同的生產者對同一隊列發送消息。

public static void main(String[] args) throws JMSException { ConnFactory cf = new ConnFactory(); Producer producer1 = new Producer(cf.createConnection(), "Queue1", "Product1"); Producer producer2 = new Producer(cf.createConnection(), "Queue1", "Product2"); producer1.start(); producer2.start(); for(int i = 0; i < 50; i++){ producer1.send("message " + i); producer2.send("message " + i); } }

創建不同的消費者監聽同一對列。

public static void main(String[] args) throws JMSException { ConnFactory cf = new ConnFactory(); Consumer consumer1 = new Consumer(cf.createConnection(), "Queue1", "Consumer1"); Consumer consumer2 = new Consumer(cf.createConnection(), "Queue1", "Consumer2"); Consumer2 consumer3 = new Consumer2(cf.createConnection(), "Queue1", "Consumer3"); consumer1.start(); consumer2.start(); consumer3.start(); consumer1.receive(); consumer2.receive(); } 實驗結果如下(事實上,不同的生產者生產的消息被那個消費者接收到是不確定的):Consumer3 receive message {Product1: message 1} Consumer3 receive message {Product2: message 2} Consumer2 receive message {Product2: message 0} Consumer1 receive message {Product1: message 0} Consumer1 receive message {Product2: message 1} Consumer2 receive message {Product1: message 2} Consumer1 receive message {Product1: message 3} Consumer2 receive message {Product2: message 3} Consumer3 receive message {Product1: message 4} Consumer1 receive message {Product2: message 4} Consumer2 receive message {Product1: message 5} Consumer3 receive message {Product2: message 5} Consumer1 receive message {Product1: message 6} Consumer2 receive message {Product2: message 6} Consumer3 receive message {Product1: message 7} Consumer1 receive message {Product2: message 7} Consumer2 receive message {Product1: message 8} Consumer3 receive message {Product2: message 8} Consumer1 receive message {Product1: message 9} Consumer2 receive message {Product2: message 9} Consumer3 receive message {Product1: message 10} Consumer1 receive message {Product2: message 10} Consumer2 receive message {Product1: message 11} Consumer1 receive message {Product1: message 12} Consumer3 receive message {Product2: message 11} Consumer2 receive message {Product2: message 12} Consumer3 receive message {Product1: message 13} Consumer3 receive message {Product2: message 14} Consumer3 receive message {Product1: message 16} Consumer3 receive message {Product2: message 17} Consumer3 receive message {Product1: message 19} Consumer3 receive message {Product2: message 20} Consumer3 receive message {Product1: message 22} Consumer2 receive message {Product1: message 14} Consumer1 receive message {Product2: message 13} Consumer1 receive message {Product1: message 15} Consumer1 receive message {Product2: message 16} Consumer2 receive message {Product2: message 15} Consumer2 receive message {Product1: message 17} Consumer1 receive message {Product1: message 18} Consumer1 receive message {Product2: message 19} Consumer1 receive message {Product1: message 21} Consumer1 receive message {Product2: message 22} Consumer2 receive message {Product2: message 18} Consumer2 receive message {Product1: message 20} Consumer2 receive message {Product2: message 21} Consumer2 receive message {Product1: message 23} Consumer3 receive message {Product2: message 23} Consumer1 receive message {Product1: message 24} Consumer3 receive message {Product1: message 25} Consumer2 receive message {Product2: message 24} Consumer2 receive message {Product1: message 26} Consumer1 receive message {Product2: message 25} Consumer3 receive message {Product2: message 26} Consumer2 receive message {Product2: message 27} Consumer1 receive message {Product1: message 27} Consumer3 receive message {Product1: message 28} Consumer1 receive message {Product2: message 28} Consumer2 receive message {Product1: message 29} Consumer3 receive message {Product2: message 29} Consumer1 receive message {Product1: message 30} Consumer3 receive message {Product1: message 31} Consumer1 receive message {Product2: message 31} Consumer2 receive message {Product2: message 30} Consumer2 receive message {Product1: message 32} Consumer3 receive message {Product2: message 32} Consumer1 receive message {Product1: message 33} Consumer3 receive message {Product1: message 34} Consumer2 receive message {Product2: message 33} Consumer2 receive message {Product1: message 35} Consumer3 receive message {Product2: message 35} Consumer1 receive message {Product2: message 34} Consumer1 receive message {Product1: message 36} Consumer2 receive message {Product2: message 36} Consumer3 receive message {Product1: message 37} Consumer1 receive message {Product2: message 37} Consumer2 receive message {Product1: message 38} Consumer1 receive message {Product1: message 39} Consumer3 receive message {Product2: message 38} Consumer2 receive message {Product2: message 39} Consumer3 receive message {Product1: message 40} Consumer1 receive message {Product2: message 40} Consumer2 receive message {Product1: message 41} Consumer3 receive message {Product2: message 41} Consumer2 receive message {Product2: message 42} Consumer3 receive message {Product1: message 43} Consumer1 receive message {Product1: message 42} Consumer2 receive message {Product1: message 44} Consumer1 receive message {Product2: message 43} Consumer3 receive message {Product2: message 44} Consumer1 receive message {Product1: message 45} Consumer2 receive message {Product2: message 45} Consumer3 receive message {Product1: message 46} Consumer1 receive message {Product2: message 46} Consumer2 receive message {Product1: message 47} Consumer3 receive message {Product2: message 47} Consumer1 receive message {Product1: message 48} Consumer3 receive message {Product1: message 49} Consumer2 receive message {Product2: message 48} Consumer1 receive message {Product2: message 49}如果你先執行發送消息的程序,在啟動接受消息的程序,所有的消息都有可能被同一消費者消費,這是ActiveMQ為了提高效率,重用了同一個連接傳輸了所有的消息。其他的MQ產品未必會這麼做,SnoicMQ它就會以一種隨機的方式分發給不同的消費者。一旦你創建好消費者先監聽消息隊列,然後,再發送消息,由於這個時候,消費者與JMS Server之間的連接都已經建立,所以消息會隨機的分發到不同的消費者。 

最後更新:2017-04-02 04:01:42

  上一篇:go JavaScript核心參考教程--客戶端JavaScript
  下一篇:go 周思博趣談軟件——給計算機係學生的建議