之前已经介绍了一篇关于spring-boot集成kafka的案例.
我的博客系统采用的是rabbitmq,故这里在简单介绍下rabbitmq的集成,关键是想介绍下动态的绑定队列和监听.
Spring AMQP项目将核心Spring概念应用于基于AMQP的消息传递解决方案的开发
参考网站 https://docs.spring.io/spring-boot/docs/1.5.18.BUILD-SNAPSHOT/reference/htmlsingle/
简单了解之后开始动手搭建下
进入网站 https://start.spring.io/ 自动生成相关代码
开始编写消息生产者代码
开始编写消息消费者代码
分别运行消费者,生产者代码,可以发现消息已经被成功消费掉.
到此可以看到spring-boot确实非常的强.集成任何框架都很简洁快速.
从上面的代码也可以看到队列必须在使用之前创建好,但是spring-boot在启动的时候怎么才能做到动态的创建队列,并实现消息的发送呢 ?
package cn.majingjing.common.spring;
import cn.majingjing.common.EnumDict;
import cn.majingjing.common.IConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.beans.factory.support.BeanDefinitionRegistry;
import org.springframework.beans.factory.support.BeanDefinitionRegistryPostProcessor;
import org.springframework.context.annotation.Configuration;
/**
* Created by JingjingMa on 2018/10/15 10:42
* bean动态注册
*/
@Configuration
public class AutoBeanDefinitionRegistryPostProcessor implements BeanDefinitionRegistryPostProcessor {
private static Logger log = LoggerFactory.getLogger(AutoBeanDefinitionRegistryPostProcessor.class);
@Override
public void postProcessBeanDefinitionRegistry(BeanDefinitionRegistry registry) throws BeansException {
log.debug("AutoBeanDefinitionRegistryPostProcessor.postProcessBeanDefinitionRegistry---1");
}
@Override
public void postProcessBeanFactory(ConfigurableListableBeanFactory factory) throws BeansException {
log.debug("AutoBeanDefinitionRegistryPostProcessor.postProcessBeanFactory---2");
//动态创建消息队列的bean
EnumDict.MQType[] mqTypes = EnumDict.MQType.values();
for (int i = 0; i < mqTypes.length; i++) {
String queueName = IConstants.MQ_QUEUE +mqTypes[i].name();
Queue queue = new Queue(queueName);
registry.registerSingleton(queueName,queue);
}
}
}
在发送消息的时候就可以简单的进行封装下
package cn.majingjing.common;
import cn.majingjing.core.tool.Tools;
import com.alibaba.fastjson.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.AmqpTemplate;
public class MqSender {
private static Logger log = LoggerFactory.getLogger(MqSender.class);
public MqSender(AmqpTemplate rabbitTemplate){
this.rabbitTemplate = rabbitTemplate;
}
private AmqpTemplate rabbitTemplate;
public void send(EnumDict.MQType mqType, Object obj) {
MqRequest mqRequest = new MqRequest();
mqRequest.setMqid(Tools.guid());
mqRequest.setName(mqType.name());
mqRequest.setData(JSONObject.toJSONString(obj));
String mqData = JSONObject.toJSONString(mqRequest);
log.debug("MqSender.send:{}", mqData);
this.rabbitTemplate.convertAndSend(IConstants.MQ_QUEUE+mqType.name(), mqData);
}
}
在监听消息的时候也可以简单的进行封装下
package cn.majingjing.tm.blog.admin.service;
import cn.majingjing.common.EnumDict;
import cn.majingjing.common.IConstants;
import cn.majingjing.common.MqRequest;
import cn.majingjing.common.spring.SpringContext;
import cn.majingjing.tm.blog.admin.service.mq.IMqMessageHandler;
import com.alibaba.fastjson.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.annotation.RabbitListeners;
import org.springframework.stereotype.Component;
/**
* 说明
* @RabbitListeners 注解可以做到多线程同时处理的作用
* @RabbitListener 注解可以同时监控多个队列,但是是单线程阻塞操作
*
* 此处使用 @RabbitListeners 是为了各个队列单独操作互不影响
*/
@Component
@RabbitListeners({
@RabbitListener(queues = IConstants.MQ_QUEUE+"aaa"),
@RabbitListener(queues = IConstants.MQ_QUEUE+"bbb"),
@RabbitListener(queues = IConstants.MQ_QUEUE+"ccc")
})
public class MqListener {
private static Logger log = LoggerFactory.getLogger(MqListener.class);
@RabbitHandler
public void process(String mqReqJson) {
log.debug("mqRabbit-receive-start:{}", mqReqJson);
MqRequest mqRequest = JSONObject.parseObject(mqReqJson, MqRequest.class);
// try {
// log.debug("mqRabbit-receive-start-name:{}", mqRequest.getName());
// Thread.sleep(1000*60);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
if (EnumDict.MQType.containsMqType(mqRequest.getName())) {
IMqMessageHandler mqMessageHandler = SpringContext.getBean(mqRequest.getName()+IConstants.MQ_HANDLER_BEANNAME);
mqMessageHandler.handler(mqRequest);
} else {
log.warn("没有处理该队列的handler, reqId:{}", mqRequest.getRid());
}
log.debug("mqRabbit-receive-end:{}", mqRequest.getRid());
}
// @RabbitListeners({
// @RabbitListener(queues = IConstants.MQ_QUEUE+"aaa"),
// @RabbitListener(queues = IConstants.MQ_QUEUE+"bbb"),
// @RabbitListener(queues = IConstants.MQ_QUEUE+"ccc")
// })
// public void RabbitListeners(String mqReqJson) {
// process(mqReqJson);
// }
}
特别说明下
@ RabbitListeners
注解可以做到多线程同时处理的作用
@ RabbitListener
注解可以同时监控多个队列,但是是单线程阻塞操作
@ RabbitListener
可以直接放在方法上来进行监听
@ RabbitListener
直接放在类上,方法上需要加 @ RabbitHandler
来进行监听