閱讀1003 返回首頁    go 阿裏雲 go 技術社區[雲棲]


RabbitMQ:四種ExChange用法

RabbitMQ發送消息時,都是先把消息發送給ExChange(交換機),然後再分發給有相應RoutingKey(路由)關係的Queue(隊列)。
ExChange和Queue之前是多對多的關係。
RabbitMQ 3.0之後創建ExChange時,有四種類型可選“fanout、direct、topic、headers”。

消費者

----------------消息生產者----------------
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(S_RabbitMQ.QUEUE_IP);// MQ主機
factory.setPort(S_RabbitMQ.QUEUE_PORT);// MQ端口
factory.setUsername(S_RabbitMQ.QUEUE_USER);// MQ用戶名
factory.setPassword(S_RabbitMQ.QUEUE_PWD);// MQ密碼

Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

// 聲明路由名字和類型
channel.exchangeDeclare(EXCHANGE_NAME, "fanout", true, false, null);
String message = "hello world! ";

for(int i=0;i<100;i++)
{
channel.basicPublish(EXCHANGE_NAME, "", null, (message+i).getBytes());
}

System.out.println("Sent msg finish");

channel.close();
connection.close();

----------------消息消費者----------------
ConnectionFactory factory = new ConnectionFactory();

factory.setHost(S_RabbitMQ.QUEUE_IP);// MQ主機
factory.setPort(S_RabbitMQ.QUEUE_PORT);// MQ端口
factory.setUsername(S_RabbitMQ.QUEUE_USER);// MQ用戶名
factory.setPassword(S_RabbitMQ.QUEUE_PWD);// MQ密碼

Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

//聲明路由名字和類型
channel.exchangeDeclare(EXCHANGE_NAME, "fanout", true, false, null);
//聲明隊列
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
//綁定路由和隊列
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "routkey2", null);

System.out.println(" Waiting for msg....");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,  AMQP.BasicProperties properties, byte[] body) {
String message = "";
try
{
message = new String(body, "UTF-8");
}
catch (UnsupportedEncodingException e)
{
e.printStackTrace();
}
catch (Throwable ex)
{
ex.printStackTrace();
}

System.out.println("Received msg='" + message + "'");
}
};
channel.basicConsume(QUEUE_NAME, true, consumer);

二、direct
擁有相應RoutingKey的隊列,每個隊列接收到的消息是一樣的;

----------------消息生產者----------------
ConnectionFactory factory = new ConnectionFactory();

factory.setHost(S_RabbitMQ.QUEUE_IP);// MQ主機
factory.setPort(S_RabbitMQ.QUEUE_PORT);// MQ端口
factory.setUsername(S_RabbitMQ.QUEUE_USER);// MQ用戶名
factory.setPassword(S_RabbitMQ.QUEUE_PWD);// MQ密碼

Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

// 聲明路由名字和類型
channel.exchangeDeclare(EXCHANGE_NAME, "direct", true, false, null);
String message = "hello world! ";

for(int i=0;i<100;i++)
{
channel.basicPublish(EXCHANGE_NAME, "routingkey1", null, (message+i).getBytes());
}

System.out.println("Sent msg is '" + message + "'");

channel.close();
connection.close();

----------------消息消費者----------------
ConnectionFactory factory = new ConnectionFactory();

factory.setHost(S_RabbitMQ.QUEUE_IP);// MQ主機
factory.setPort(S_RabbitMQ.QUEUE_PORT);// MQ端口
factory.setUsername(S_RabbitMQ.QUEUE_USER);// MQ用戶名
factory.setPassword(S_RabbitMQ.QUEUE_PWD);// MQ密碼

Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

//聲明路由名字和類型
channel.exchangeDeclare(EXCHANGE_NAME, "direct", true, false, null);
//聲明隊列
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
//綁定路由和隊列
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "routingkey1", null);

System.out.println(" Waiting for msg....");
Consumer consumer = new DefaultConsumer(channel)
{
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body)
{
String message = "";
try
{
message = new String(body, "UTF-8");
}
catch (UnsupportedEncodingException e)
{
e.printStackTrace();
}
catch (Throwable ex)
{
ex.printStackTrace();
}

System.out.println("1 Received msg='" + message + "'");
}
};

channel.basicConsume(QUEUE_NAME, true, consumer);

三、topic
當向一個topic發送一個消息時擁有相應RoutingKey的隊列,每個隊列接收到的消息是一樣的;
消費者

----------------消息生產者----------------
ConnectionFactory factory = new ConnectionFactory();

factory.setHost(S_RabbitMQ.QUEUE_IP);// MQ主機
factory.setPort(S_RabbitMQ.QUEUE_PORT);// MQ端口
factory.setUsername(S_RabbitMQ.QUEUE_USER);// MQ用戶名
factory.setPassword(S_RabbitMQ.QUEUE_PWD);// MQ密碼

Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

// 聲明路由名字和類型
channel.exchangeDeclare(EXCHANGE_NAME, "topic", true, false, null);
String message = "hello world! ";

// int i=101;
for (int i = 0; i < 100; i++)
{
channel.basicPublish(EXCHANGE_NAME, "routingkey1", null, (message + i).getBytes());
}

System.out.println("Sent msg is '" + message + "'");

channel.close();
connection.close();

----------------消息消費者----------------
ConnectionFactory factory = new ConnectionFactory();

factory.setHost(S_RabbitMQ.QUEUE_IP);// MQ主機
factory.setPort(S_RabbitMQ.QUEUE_PORT);// MQ端口
factory.setUsername(S_RabbitMQ.QUEUE_USER);// MQ用戶名
factory.setPassword(S_RabbitMQ.QUEUE_PWD);// MQ密碼

Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

// 聲明路由名字和類型
channel.exchangeDeclare(EXCHANGE_NAME, "topic", true, false, null);
//聲明隊列
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
//綁定路由和隊列// 把隊列綁定到路由上並指定headers
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "routingkey1", null);

System.out.println("1 Waiting for msg....");
Consumer consumer = new DefaultConsumer(channel)
{
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
{
String message = "";
try
{
message = new String(body, "UTF-8");
}
catch (UnsupportedEncodingException e)
{
e.printStackTrace();
}
catch (Throwable ex)
{
ex.printStackTrace();
}

System.out.println("1 Received msg='" + message + "'");
}
};
channel.basicConsume(QUEUE_NAME, true, consumer);

四、headers
當向一個headers發送一個消息時擁有相應RoutingKey或者headers的隊列,每個隊列接收到的消息是一樣的;
消費者

----------------消息生產者----------------
ConnectionFactory factory = new ConnectionFactory();

factory.setHost(S_RabbitMQ.QUEUE_IP);// MQ主機
factory.setPort(S_RabbitMQ.QUEUE_PORT);// MQ端口
factory.setUsername(S_RabbitMQ.QUEUE_USER);// MQ用戶名
factory.setPassword(S_RabbitMQ.QUEUE_PWD);// MQ密碼

Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

// 聲明路由名字和類型
channel.exchangeDeclare(EXCHANGE_NAME, "headers", true, false, null);

// 設置消息頭鍵值對信息
Map<String, Object> headers = new Hashtable<String, Object>();
headers.put("name", "jack");
headers.put("age", 31);
Builder builder = new Builder();
builder.headers(headers);

String message = "hello world! ";

// int i=101;
for (int i = 0; i < 100; i++)
{
channel.basicPublish(EXCHANGE_NAME, "routingkey1", builder.build(), (message + i).getBytes());
}

System.out.println("Sent msg is '" + message + "'");

channel.close();
connection.close();

----------------消息消費者----------------
ConnectionFactory factory = new ConnectionFactory();

factory.setHost(S_RabbitMQ.QUEUE_IP);
factory.setPort(S_RabbitMQ.QUEUE_PORT);// MQ端口
factory.setUsername(S_RabbitMQ.QUEUE_USER);// MQ用戶名
factory.setPassword(S_RabbitMQ.QUEUE_PWD);// MQ密碼

Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

// 聲明路由名字和類型
channel.exchangeDeclare(EXCHANGE_NAME, "headers", true, false, null);
// 聲明隊列
channel.queueDeclare(QUEUE_NAME, true, false, false, null);

// 設置消息頭鍵值對信息
Map<String, Object> headers = new Hashtable<String, Object>();
// 這裏x-match有兩種類型
// all:表示所有的鍵值對都匹配才能接受到消息
// any:表示隻要有鍵值對匹配就能接受到消息
headers.put("x-match", "all");
headers.put("name", "jack");
headers.put("age", 30);

// 把隊列綁定到路由上並指定headers
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "routingkey1", headers);

System.out.println(" Waiting for msg....");
Consumer consumer = new DefaultConsumer(channel)
{
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException
{

System.out.println("Received start --------------");

for (Entry<String, Object> entry : properties.getHeaders().entrySet())
{
System.out.println(entry.getKey() + "=" + entry.getValue());
}

String message = new String(body, "UTF-8");

System.out.println("msg='" + message + "'");
System.out.println("Received end --------------");
}
};
channel.basicConsume(QUEUE_NAME, true, consumer);

最後更新:2017-08-21 18:02:18

  上一篇:go  阿裏雲最新優惠:領取阿裏雲幸運券,抽取阿裏雲代金券
  下一篇:go  PostgreSQL服務器管理:管理數據庫