RabbitMQ之Work-Queues

分类: RabbitMQ
阅读:431
作者:majingjing
发布:2018-08-18 23:29:42

rabbitmq的工作队列

rabbitmq-4-1.png

示例参考网站 http://www.rabbitmq.com/tutorials/tutorial-two-java.html

在上一篇中,我们编写了程序来发送和接收来自命名队列的消息。在这个中,我们将创建一个工作队列,用于在多个工作人员之间分配耗时的任务。

工作队列(又称:任务队列)背后的主要思想是避免立即执行资源密集型任务,并且必须等待它完成。相反,我们安排任务稍后完成。我们将任务封装 为消息并将其发送到队列。在后台运行的工作进程将弹出任务并最终执行作业。当您运行许多工作程序时,它们之间将共享任务。

这个概念在Web应用程序中特别有用,因为在短的HTTP请求窗口中无法处理复杂的任务。

此示例代码在(rabbitmq之hello-world基础上进行改造)

模式一(轮询分发)

生产者代码

public class Send {

    private static final String QUEUE_NAME = "queue_two";

    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        for (int i = 0; i < 20; i++) {
            String message = "Hello World!--" + i;
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
            System.out.println(" [x] Sent '" + message + "'");

            Thread.sleep(10 * i);
        }

        channel.close();
        connection.close();
    }

}

消费者1代码

public class Recv1 {
    private static final String QUEUE_NAME = "queue_two";

    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println(" [1-x] Received '" + message + "'");

                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

            }
        };

        boolean autoAck = true;
        channel.basicConsume(QUEUE_NAME, autoAck, consumer);
    }

}

消费者2代码

public class Recv2 {
    private static final String QUEUE_NAME = "queue_two";

    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println(" [2-x] Received '" + message + "'");

                try {
                    Thread.sleep(300);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

            }
        };

        boolean autoAck=true;
        channel.basicConsume(QUEUE_NAME,autoAck , consumer);
    }


}

循环调度

使用任务队列的一个优点是能够轻松地并行工作。如果我们正在积压工作积压,我们可以添加更多工人,这样就可以轻松扩展。

启动Recv1,Recv2,Send,然后查看控制台日志

rabbitmq-4-2.png

rabbitmq-4-3.png

rabbitmq-4-4.png

从日志结果我们可以看出,两个消费者在消费消息时分别处理1000毫秒和300毫秒,但是消费的消息数量是相同的都是10条。

这种方式就叫做 轮训分发

模式二(公平分发) 您可能已经注意到调度仍然无法完全按照我们的意愿运行。例如,在有两个工人的情况下,当所有奇怪的消息都很重,甚至消息很轻时,一个工人将经常忙碌而另一个工作人员几乎不会做任何工作。好吧,RabbitMQ对此一无所知,仍然会均匀地发送消息。

发生这种情况是因为RabbitMQ只是在消息进入队列时调度消息。它不会查看消费者未确认消息的数量。它只是盲目地向第n个消费者发送每个第n个消息。

rabbitmq-4-5.png

为了打败我们可以使用basicQos方法和 prefetchCount = 1设置。这告诉RabbitMQ不要一次向一个worker发送一条消息。或者,换句话说,在处理并确认前一个消息之前,不要向工作人员发送新消息。相反,它会将它发送给下一个仍然不忙的工人。

备注:这里需要了解下两个概念 消息确认,消息持久性

  • 消息确认
channel.basicQos(1); //一次只接受一条未包含的消息(见下文)
...
channel.basicAck(envelope.getDeliveryTag(),false);
...
boolean autoAck = false ;
channel.basicConsume(TASK_QUEUE_NAME,autoAck,consumer);
  • 消息持久性
boolean durable = true ;
channel.queueDeclare(“hello”,durable,false,false,null);

备注:RabbitMQ不允许您使用不同的参数重新定义现有队列,并将向尝试执行此操作的任何程序返回错误。但是有一个快速的解决方法 - 让我们声明一个具有不同名称的队列,例如task_queue:

生产者代码

public class Send {

    private static final String QUEUE_NAME = "queue_two_fail";

    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        for (int i = 0; i < 20; i++) {
            String message = "Hello World!--" + i;
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
            System.out.println(" [x] Sent '" + message + "'");

            Thread.sleep(10 * i);
        }

        channel.close();
        connection.close();
    }

}

消费者1代码

public class Recv1 {
    private static final String QUEUE_NAME = "queue_two_fail";

    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtils.getConnection();
        final Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        channel.basicQos(1);
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println(" [1-x] Received '" + message + "'");

                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    System.out.println(" [1-x] Done");
                    channel.basicAck(envelope.getDeliveryTag(),false);
                }

            }
        };

        boolean autoAck = false;
        channel.basicConsume(QUEUE_NAME, autoAck, consumer);
    }

}

消费者2代码

public class Recv2 {
    private static final String QUEUE_NAME = "queue_two_fail";

    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtils.getConnection();
        final Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        channel.basicQos(1);
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println(" [2-x] Received '" + message + "'");

                try {
                    Thread.sleep(300);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    System.out.println(" [2-x] Done");
                    channel.basicAck(envelope.getDeliveryTag(),false);
                }

            }
        };

        boolean autoAck = false;
        channel.basicConsume(QUEUE_NAME, autoAck, consumer);
    }


}

启动Recv1,Recv2,Send,然后查看控制台日志

rabbitmq-4-6.png

rabbitmq-4-7.png

rabbitmq-4-8.png

从日志可以看出消费者2(业务处理时间短,消费能力强)消费的消息数量明显多于消费者1.

工作队列的简单示例已经搭建完成。具体细节请参照官网示例及说明进行深入了解。