RabbitMQ-Java版本生產與消費
---------Maven依賴---------
<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>4.1.0</version>
</dependency>
---------消息生產---------
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
public class TestSend
{
//隊列名稱:testtransient testdurable
//server IP地址:116.62.207.36
private final static String QUEUE_NAME = "testdurable";
private final static String QUEUE_IP = "139.224.34.164";
private final static int QUEUE_PORT = 5672;
private final static String QUEUE_USER = "testuser2";
private final static String QUEUE_PWD = "123456";
public static void main(String[] argv) throws java.io.IOException, TimeoutException
{
/**
* 創建連接連接到MabbitMQ
*/
ConnectionFactory factory = new ConnectionFactory();
//設置MabbitMQ所在主機ip或者主機名
factory.setHost(QUEUE_IP);
factory.setPort(QUEUE_PORT);// MQ端口
factory.setUsername(QUEUE_USER);// MQ用戶名
factory.setPassword(QUEUE_PWD);// MQ密碼
//創建一個連接
Connection connection = factory.newConnection();
//創建一個頻道
Channel channel = connection.createChannel();
//指定一個隊列
//channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//如QUEUE_NAME是一個transient的queue,第二個參數必須是false;重啟rabbit後QUEUE_NAME會被刪除掉
//如QUEUE_NAME是一個durability的queue,第二個參數必須是true;重啟rabbit後QUEUE_NAME不會被刪除掉
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
//發送的消息
String message = "hello world!";
//往隊列中發出一條消息
int j=0;
Long start = System.currentTimeMillis();
for(int i=j;i<j+10000;i++)
{
//將消息保存起來,重啟rabbit後待消費的消息不會被刪除
channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, (message+i).getBytes());
//不保存消息,重啟rabbit後待消費的消息都將丟失
//channel.basicPublish("", QUEUE_NAME, null, (message+i).getBytes());
}
System.out.println("發送完成:"+(System.currentTimeMillis() - start));
//關閉頻道和連接
channel.close();
connection.close();
}
}
---------消息消費---------
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
public class TestRead
{
//隊列名稱:testtransient testdurable
//server IP地址:116.62.207.36
private final static String QUEUE_NAME = "testdurable";
private final static String QUEUE_IP = "139.224.34.164";
private final static int QUEUE_PORT = 5672;
private final static String QUEUE_USER = "testuser2";
private final static String QUEUE_PWD = "123456";
public static void main(String[] argv) throws java.io.IOException,
java.lang.InterruptedException, TimeoutException
{
//打開連接和創建頻道,與發送端一樣
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(QUEUE_IP);
factory.setPort(QUEUE_PORT);// MQ端口
factory.setUsername(QUEUE_USER);// MQ用戶名
factory.setPassword(QUEUE_PWD);// MQ密碼
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//聲明隊列,主要為了防止消息接收者先運行此程序,隊列還不存在時創建隊列。
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
System.out.println("Waiting for messages. To exit press CTRL+C");
//創建隊列消費者
QueueingConsumer consumer = new QueueingConsumer(channel);
//指定消費隊列
channel.basicConsume(QUEUE_NAME, true, consumer);
Long start = System.currentTimeMillis();
while (true)
{
//nextDelivery是一個阻塞方法(內部實現其實是阻塞隊列的take方法)
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println("Received '" + message + "' "+(System.currentTimeMillis() - start));
}
}
}
最後更新:2017-08-20 11:32:18