閱讀423 返回首頁    go 阿裏雲 go 技術社區[雲棲]


RabbitMQ-Java版本生產與消費

---------Maven依賴---------
<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>4.1.0</version>
</dependency>


---------消息生產---------
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.Channel;  
import com.rabbitmq.client.Connection;  
import com.rabbitmq.client.ConnectionFactory;  
import com.rabbitmq.client.MessageProperties;

public class TestSend
{
//隊列名稱:testtransient   testdurable
//server IP地址:116.62.207.36
    private final static String QUEUE_NAME = "testdurable";
    private final static String QUEUE_IP = "139.224.34.164";
    private final static int QUEUE_PORT = 5672;
    private final static String QUEUE_USER = "testuser2";
    private final static String QUEUE_PWD = "123456";

    public static void main(String[] argv) throws java.io.IOException, TimeoutException
    {
        /**
         * 創建連接連接到MabbitMQ
         */
        ConnectionFactory factory = new ConnectionFactory();
        //設置MabbitMQ所在主機ip或者主機名
        factory.setHost(QUEUE_IP);
        factory.setPort(QUEUE_PORT);// MQ端口
        factory.setUsername(QUEUE_USER);// MQ用戶名
        factory.setPassword(QUEUE_PWD);// MQ密碼
        //創建一個連接
        Connection connection = factory.newConnection();
        //創建一個頻道
        Channel channel = connection.createChannel();
        //指定一個隊列
        //channel.queueDeclare(QUEUE_NAME, false, false, false, null);
         //如QUEUE_NAME是一個transient的queue,第二個參數必須是false;重啟rabbit後QUEUE_NAME會被刪除掉
         //如QUEUE_NAME是一個durability的queue,第二個參數必須是true;重啟rabbit後QUEUE_NAME不會被刪除掉
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);
        //發送的消息
        String message = "hello world!";
        //往隊列中發出一條消息
        int j=0;
        Long start = System.currentTimeMillis();
        for(int i=j;i<j+10000;i++)
        {
         //將消息保存起來,重啟rabbit後待消費的消息不會被刪除
         channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, (message+i).getBytes());

         //不保存消息,重啟rabbit後待消費的消息都將丟失
         //channel.basicPublish("", QUEUE_NAME, null, (message+i).getBytes());
        }
        System.out.println("發送完成:"+(System.currentTimeMillis() - start));
        //關閉頻道和連接
        channel.close();
        connection.close();
     }
}

---------消息消費---------
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;

public class TestRead
{
//隊列名稱:testtransient   testdurable
//server IP地址:116.62.207.36
    private final static String QUEUE_NAME = "testdurable";
    private final static String QUEUE_IP = "139.224.34.164";
    private final static int QUEUE_PORT = 5672;
    private final static String QUEUE_USER = "testuser2";
    private final static String QUEUE_PWD = "123456";

    public static void main(String[] argv) throws java.io.IOException,
            java.lang.InterruptedException, TimeoutException
    {
        //打開連接和創建頻道,與發送端一樣
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(QUEUE_IP);
        factory.setPort(QUEUE_PORT);// MQ端口
        factory.setUsername(QUEUE_USER);// MQ用戶名
        factory.setPassword(QUEUE_PWD);// MQ密碼
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        //聲明隊列,主要為了防止消息接收者先運行此程序,隊列還不存在時創建隊列。
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);
        System.out.println("Waiting for messages. To exit press CTRL+C");
        
        //創建隊列消費者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        //指定消費隊列
        channel.basicConsume(QUEUE_NAME, true, consumer);
        Long start = System.currentTimeMillis();
        while (true)
        {
            //nextDelivery是一個阻塞方法(內部實現其實是阻塞隊列的take方法)
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println("Received '" + message + "'    "+(System.currentTimeMillis() - start));
        }
    }
}

最後更新:2017-08-20 11:32:18

  上一篇:go  CentOS7常用環境設置
  下一篇:go  SpringData操作MongoDB