spring-boot(二十一)集成rabbitmq

分类: spring-boot
阅读:813
作者:majingjing
发布:2018-10-17 12:02:40

之前已经介绍了一篇关于spring-boot集成kafka的案例.

我的博客系统采用的是rabbitmq,故这里在简单介绍下rabbitmq的集成,关键是想介绍下动态的绑定队列和监听.

Spring AMQP项目将核心Spring概念应用于基于AMQP的消息传递解决方案的开发

参考网站 https://docs.spring.io/spring-boot/docs/1.5.18.BUILD-SNAPSHOT/reference/htmlsingle/

1.png

简单了解之后开始动手搭建下

进入网站 https://start.spring.io/ 自动生成相关代码

2.png

开始编写消息生产者代码

3.png

开始编写消息消费者代码

4.png

分别运行消费者,生产者代码,可以发现消息已经被成功消费掉.

5.png

6.png

到此可以看到spring-boot确实非常的强.集成任何框架都很简洁快速.


从上面的代码也可以看到队列必须在使用之前创建好,但是spring-boot在启动的时候怎么才能做到动态的创建队列,并实现消息的发送呢 ?

package cn.majingjing.common.spring;

import cn.majingjing.common.EnumDict;
import cn.majingjing.common.IConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.beans.factory.support.BeanDefinitionRegistry;
import org.springframework.beans.factory.support.BeanDefinitionRegistryPostProcessor;
import org.springframework.context.annotation.Configuration;

/**
 * Created by JingjingMa on 2018/10/15 10:42
 * bean动态注册
 */
@Configuration
public class AutoBeanDefinitionRegistryPostProcessor implements BeanDefinitionRegistryPostProcessor {

    private static Logger log = LoggerFactory.getLogger(AutoBeanDefinitionRegistryPostProcessor.class);

    @Override
    public void postProcessBeanDefinitionRegistry(BeanDefinitionRegistry registry) throws BeansException {
        log.debug("AutoBeanDefinitionRegistryPostProcessor.postProcessBeanDefinitionRegistry---1");
    }

    @Override
    public void postProcessBeanFactory(ConfigurableListableBeanFactory factory) throws BeansException {
        log.debug("AutoBeanDefinitionRegistryPostProcessor.postProcessBeanFactory---2");

        //动态创建消息队列的bean
        EnumDict.MQType[] mqTypes = EnumDict.MQType.values();
        for (int i = 0; i < mqTypes.length; i++) {
            String queueName = IConstants.MQ_QUEUE +mqTypes[i].name();
            Queue queue = new Queue(queueName);
            registry.registerSingleton(queueName,queue);
        }
    }
}

在发送消息的时候就可以简单的进行封装下

package cn.majingjing.common;

import cn.majingjing.core.tool.Tools;
import com.alibaba.fastjson.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.AmqpTemplate;

public class MqSender {

    private static Logger log = LoggerFactory.getLogger(MqSender.class);

    public MqSender(AmqpTemplate rabbitTemplate){
        this.rabbitTemplate = rabbitTemplate;
    }

    private AmqpTemplate rabbitTemplate;

    public void send(EnumDict.MQType mqType, Object obj) {
        MqRequest mqRequest = new MqRequest();
        mqRequest.setMqid(Tools.guid());
        mqRequest.setName(mqType.name());
        mqRequest.setData(JSONObject.toJSONString(obj));
        String mqData = JSONObject.toJSONString(mqRequest);
        log.debug("MqSender.send:{}", mqData);
        this.rabbitTemplate.convertAndSend(IConstants.MQ_QUEUE+mqType.name(), mqData);
    }


}

在监听消息的时候也可以简单的进行封装下

package cn.majingjing.tm.blog.admin.service;

import cn.majingjing.common.EnumDict;
import cn.majingjing.common.IConstants;
import cn.majingjing.common.MqRequest;
import cn.majingjing.common.spring.SpringContext;
import cn.majingjing.tm.blog.admin.service.mq.IMqMessageHandler;
import com.alibaba.fastjson.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.annotation.RabbitListeners;
import org.springframework.stereotype.Component;

/**
 * 说明
 * @RabbitListeners 注解可以做到多线程同时处理的作用
 * @RabbitListener 注解可以同时监控多个队列,但是是单线程阻塞操作
 *
 * 此处使用 @RabbitListeners 是为了各个队列单独操作互不影响
 */
@Component
@RabbitListeners({
        @RabbitListener(queues = IConstants.MQ_QUEUE+"aaa"),
        @RabbitListener(queues = IConstants.MQ_QUEUE+"bbb"),
        @RabbitListener(queues = IConstants.MQ_QUEUE+"ccc")
})
public class MqListener {

    private static Logger log = LoggerFactory.getLogger(MqListener.class);

    @RabbitHandler
    public void process(String mqReqJson) {
        log.debug("mqRabbit-receive-start:{}", mqReqJson);
        MqRequest mqRequest = JSONObject.parseObject(mqReqJson, MqRequest.class);
//        try {
//            log.debug("mqRabbit-receive-start-name:{}", mqRequest.getName());
//            Thread.sleep(1000*60);
//        } catch (InterruptedException e) {
//            e.printStackTrace();
//        }
        if (EnumDict.MQType.containsMqType(mqRequest.getName())) {
            IMqMessageHandler mqMessageHandler = SpringContext.getBean(mqRequest.getName()+IConstants.MQ_HANDLER_BEANNAME);
            mqMessageHandler.handler(mqRequest);
        } else {
            log.warn("没有处理该队列的handler, reqId:{}", mqRequest.getRid());
        }
        log.debug("mqRabbit-receive-end:{}", mqRequest.getRid());
    }

//    @RabbitListeners({
//            @RabbitListener(queues = IConstants.MQ_QUEUE+"aaa"),
//            @RabbitListener(queues = IConstants.MQ_QUEUE+"bbb"),
//            @RabbitListener(queues = IConstants.MQ_QUEUE+"ccc")
//    })
//    public void RabbitListeners(String mqReqJson) {
//       process(mqReqJson);
//    }

}

特别说明下

@ RabbitListeners 注解可以做到多线程同时处理的作用 @ RabbitListener 注解可以同时监控多个队列,但是是单线程阻塞操作

@ RabbitListener 可以直接放在方法上来进行监听 @ RabbitListener 直接放在类上,方法上需要加 @ RabbitHandler 来进行监听