rabbitmq的工作队列
示例参考网站 http://www.rabbitmq.com/tutorials/tutorial-two-java.html
在上一篇中,我们编写了程序来发送和接收来自命名队列的消息。在这个中,我们将创建一个工作队列,用于在多个工作人员之间分配耗时的任务。
工作队列(又称:任务队列)背后的主要思想是避免立即执行资源密集型任务,并且必须等待它完成。相反,我们安排任务稍后完成。我们将任务封装 为消息并将其发送到队列。在后台运行的工作进程将弹出任务并最终执行作业。当您运行许多工作程序时,它们之间将共享任务。
这个概念在Web应用程序中特别有用,因为在短的HTTP请求窗口中无法处理复杂的任务。
此示例代码在(rabbitmq之hello-world基础上进行改造)
模式一(轮询分发)
生产者代码
public class Send {
private static final String QUEUE_NAME = "queue_two";
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
for (int i = 0; i < 20; i++) {
String message = "Hello World!--" + i;
channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + message + "'");
Thread.sleep(10 * i);
}
channel.close();
connection.close();
}
}
消费者1代码
public class Recv1 {
private static final String QUEUE_NAME = "queue_two";
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [1-x] Received '" + message + "'");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
boolean autoAck = true;
channel.basicConsume(QUEUE_NAME, autoAck, consumer);
}
}
消费者2代码
public class Recv2 {
private static final String QUEUE_NAME = "queue_two";
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [2-x] Received '" + message + "'");
try {
Thread.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
boolean autoAck=true;
channel.basicConsume(QUEUE_NAME,autoAck , consumer);
}
}
循环调度
使用任务队列的一个优点是能够轻松地并行工作。如果我们正在积压工作积压,我们可以添加更多工人,这样就可以轻松扩展。
启动Recv1,Recv2,Send,然后查看控制台日志
从日志结果我们可以看出,两个消费者在消费消息时分别处理1000毫秒和300毫秒,但是消费的消息数量是相同的都是10条。
这种方式就叫做 轮训分发
模式二(公平分发) 您可能已经注意到调度仍然无法完全按照我们的意愿运行。例如,在有两个工人的情况下,当所有奇怪的消息都很重,甚至消息很轻时,一个工人将经常忙碌而另一个工作人员几乎不会做任何工作。好吧,RabbitMQ对此一无所知,仍然会均匀地发送消息。
发生这种情况是因为RabbitMQ只是在消息进入队列时调度消息。它不会查看消费者未确认消息的数量。它只是盲目地向第n个消费者发送每个第n个消息。
为了打败我们可以使用basicQos方法和 prefetchCount = 1设置。这告诉RabbitMQ不要一次向一个worker发送一条消息。或者,换句话说,在处理并确认前一个消息之前,不要向工作人员发送新消息。相反,它会将它发送给下一个仍然不忙的工人。
备注:这里需要了解下两个概念 消息确认,消息持久性
- 消息确认
channel.basicQos(1); //一次只接受一条未包含的消息(见下文)
...
channel.basicAck(envelope.getDeliveryTag(),false);
...
boolean autoAck = false ;
channel.basicConsume(TASK_QUEUE_NAME,autoAck,consumer);
- 消息持久性
boolean durable = true ;
channel.queueDeclare(“hello”,durable,false,false,null);
备注:RabbitMQ不允许您使用不同的参数重新定义现有队列,并将向尝试执行此操作的任何程序返回错误。但是有一个快速的解决方法 - 让我们声明一个具有不同名称的队列,例如task_queue:
生产者代码
public class Send {
private static final String QUEUE_NAME = "queue_two_fail";
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
for (int i = 0; i < 20; i++) {
String message = "Hello World!--" + i;
channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + message + "'");
Thread.sleep(10 * i);
}
channel.close();
connection.close();
}
}
消费者1代码
public class Recv1 {
private static final String QUEUE_NAME = "queue_two_fail";
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtils.getConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.basicQos(1);
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [1-x] Received '" + message + "'");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println(" [1-x] Done");
channel.basicAck(envelope.getDeliveryTag(),false);
}
}
};
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME, autoAck, consumer);
}
}
消费者2代码
public class Recv2 {
private static final String QUEUE_NAME = "queue_two_fail";
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtils.getConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.basicQos(1);
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [2-x] Received '" + message + "'");
try {
Thread.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println(" [2-x] Done");
channel.basicAck(envelope.getDeliveryTag(),false);
}
}
};
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME, autoAck, consumer);
}
}
启动Recv1,Recv2,Send,然后查看控制台日志
从日志可以看出消费者2(业务处理时间短,消费能力强)消费的消息数量明显多于消费者1.
工作队列的简单示例已经搭建完成。具体细节请参照官网示例及说明进行深入了解。