RabbitMQ之Topics 主题
示例参考网站 http://www.rabbitmq.com/tutorials/tutorial-five-java.html
发送到topic exchange的消息不能具有任意 routing_key - 它必须是由点
分隔的单词列表。单词可以是任何内容,但通常它们指定与消息相关的一些功能。一些有效的路由密钥示例:"stock.usd.nyse"
, "nyse.vmw"
, "quick.orange.rabbit"
。路由密钥中可以包含任意数量的单词,最多可达255个字节。
绑定密钥也必须采用相同的形式。topic exchange背后的逻辑 类似于直接交换- 使用特定路由密钥发送的消息将被传递到与匹配绑定密钥绑定的所有队列。但是,绑定键有两个重要的特殊情况:
*(星号)可以替代一个单词。 #(hash)可以替换零个或多个单词。
如上图我们创建了三个绑定:Q1绑定了绑定键 ".orange.",Q2 绑定了"..rabbit" 和"lazy.#"
路由密钥设置为“ quick.orange.rabbit ”的消息将传递到两个队列。消息“ lazy.orange.elephant ”也将同时发送给他们。而“ quick.orange.fox ”只会进入第一个队列,“ lazy.brown.fox ”只会进入第二个队列。“ lazy.pink.rabbit ”将仅传递到第二个队列一次,即使它匹配两个绑定。“ quick.brown.fox ”与任何绑定都不匹配,因此它将被丢弃。
下面使用代码来实现下
生产者代码
public class Send {
private static final String EXCHANGE_NAME = "test_exchange_topic";
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
//There are a few exchange types available: direct, topic, headers and fanout
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
String[] routingKeys = {"quick.orange.rabbit", "lazy.orange.elephant", "quick.orange.fox", "lazy.brown.fox", "lazy.pink.rabbit", "quick.brown.fox", "quick.orange.male.rabbit", "lazy.orange.male.rabbit"};
for(String routingKey:routingKeys){
String message = "Hello World!--five" + "-->" + routingKey;
channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + message + "' --routingKey:" + routingKey);
}
channel.close();
connection.close();
}
}
消费者1代码
public class Recv1 {
private static final String EXCHANGE_NAME="test_exchange_topic";
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtils.getConnection();
final Channel channel = connection.createChannel();
//使用匿名队列,因为消息是发送到交换机上,由交换机进行选择发送到绑定的队列,所以此时队列的名字其实没有什么意义,并且匿名队列在断开连接后会自动删除
String queueName = channel.queueDeclare().getQueue();
//将队列绑定到交换机上
channel.queueBind(queueName,EXCHANGE_NAME,"*.orange.*");
channel.basicQos(1);
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [1-x] Received '" + message + "'");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println(" [1-x] Done");
channel.basicAck(envelope.getDeliveryTag(),false);
}
}
};
boolean autoAck = false;
channel.basicConsume(queueName, autoAck, consumer);
}
}
消费者2代码
public class Recv2 {
private static final String EXCHANGE_NAME="test_exchange_topic";
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtils.getConnection();
final Channel channel = connection.createChannel();
//使用匿名队列,因为消息是发送到交换机上,由交换机进行选择发送到绑定的队列,所以此时队列的名字其实没有什么意义,并且匿名队列在断开连接后会自动删除
String queueName = channel.queueDeclare().getQueue();
//将队列绑定到交换机上
channel.queueBind(queueName,EXCHANGE_NAME,"*.*.rabbit");
channel.queueBind(queueName,EXCHANGE_NAME,"lazy.#");
channel.basicQos(1);
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [2-x] Received '" + message + "'");
try {
Thread.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println(" [2-x] Done");
channel.basicAck(envelope.getDeliveryTag(),false);
}
}
};
boolean autoAck = false;
channel.basicConsume(queueName, autoAck, consumer);
}
}
启动Recv1,Recv2,Send,查看日志
其实只要理解exchange的类型和routingKey的规则,就能在各种场景中灵活的使用rabbitmq