RabbitBootstrapConfiguration 配置类的作用,就是定义了RabbitListenerAnnotationBeanPostProcessor 和RabbitListenerEndpointRegistry 两个bean。
看到RabbitListenerAnnotationBeanPostProcessor 这个类名,就可以猜到,该类的实例bean就是用来扫描加了@RabbitListener 的类,并做一些加工。
(“RabbitListenerAnnotationBean”——针对添加了@RabbitListener注解的bean; “PostProcessor”——后置加工)
// 实现了BeanPostProcessor、Ordered、BeanFactoryAware、BeanClassLoaderAware、EnvironmentAware和SmartInitializingSingleton 6个接口
public class RabbitListenerAnnotationBeanPostProcessor
implements BeanPostProcessor, Ordered, BeanFactoryAware, BeanClassLoaderAware, EnvironmentAware,
SmartInitializingSingleton {
.......
// 完成初始化bean之后,调用该方法
@Override
public Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException {
Class<?> targetClass = AopUtils.getTargetClass(bean);
TypeMetadata metadata = this.typeCache.get(targetClass);
if (metadata == null) {
metadata = buildMetadata(targetClass);
this.typeCache.putIfAbsent(targetClass, metadata);
for (ListenerMethod lm : metadata.listenerMethods) {
for (RabbitListener rabbitListener : lm.annotations) {
processAmqpListener(rabbitListener, lm.method, bean, beanName);
if (metadata.handlerMethods.length > 0) {
processMultiMethodListeners(metadata.classAnnotations, metadata.handlerMethods, bean, beanName);
return bean;
// 根据Class,获取元数据
private TypeMetadata buildMetadata(Class<?> targetClass) {
Collection<RabbitListener> classLevelListeners = findListenerAnnotations(targetClass);
final boolean hasClassLevelListeners = classLevelListeners.size() > 0;
final List<ListenerMethod> methods = new ArrayList<ListenerMethod>();
final List<Method> multiMethods = new ArrayList<Method>();
ReflectionUtils.doWithMethods(targetClass, new ReflectionUtils.MethodCallback() {
@Override
public void doWith(Method method) throws IllegalArgumentException, IllegalAccessException {
Collection<RabbitListener> listenerAnnotations = findListenerAnnotations(method);
if (listenerAnnotations.size() > 0) {
methods.add(new ListenerMethod(method,
listenerAnnotations.toArray(new RabbitListener[listenerAnnotations.size()])));
if (hasClassLevelListeners) {
RabbitHandler rabbitHandler = AnnotationUtils.findAnnotation(method, RabbitHandler.class);
if (rabbitHandler != null) {
multiMethods.add(method);
}, ReflectionUtils.USER_DECLARED_METHODS);
if (methods.isEmpty() && multiMethods.isEmpty()) {
return TypeMetadata.EMPTY;
return new TypeMetadata(
methods.toArray(new ListenerMethod[methods.size()]),
multiMethods.toArray(new Method[multiMethods.size()]),
classLevelListeners.toArray(new RabbitListener[classLevelListeners.size()]));
// 检查一下是否使用jdk代理,使用jdk代理方式必须实现了接口
// new一个MethodRabbitListenerEndpoint对象,交由processListener方法进行处理
protected void processAmqpListener(RabbitListener rabbitListener, Method method, Object bean, String beanName) {
Method methodToUse = checkProxy(method, bean);
MethodRabbitListenerEndpoint endpoint = new MethodRabbitListenerEndpoint();
endpoint.setMethod(methodToUse);
endpoint.setBeanFactory(this.beanFactory);
processListener(endpoint, rabbitListener, bean, methodToUse, beanName);
// 前面大半代码都是对MethodRabbitListenerEndpoint对象的属性设置:处理消息的bean、消息处理方法的工厂类、监听的队列名。。。。
// 通过beanFactory获取RabbitListenerContainerFactory类的bean
// 调用RabbitListenerEndpointRegistar的registerEndpoint方法注册mq消息消费端点
protected void processListener(MethodRabbitListenerEndpoint endpoint, RabbitListener rabbitListener, Object bean,
Object adminTarget, String beanName) {
endpoint.setBean(bean);
endpoint.setMessageHandlerMethodFactory(this.messageHandlerMethodFactory);
endpoint.setId(getEndpointId(rabbitListener));
endpoint.setQueueNames(resolveQueues(rabbitListener));
String group = rabbitListener.group();
if (StringUtils.hasText(group)) {
Object resolvedGroup = resolveExpression(group);
if (resolvedGroup instanceof String) {
endpoint.setGroup((String) resolvedGroup);
endpoint.setExclusive(rabbitListener.exclusive());
String priority = resolve(rabbitListener.priority());
if (StringUtils.hasText(priority)) {
try {
endpoint.setPriority(Integer.valueOf(priority));
catch (NumberFormatException ex) {
throw new BeanInitializationException("Invalid priority value for " +
rabbitListener + " (must be an integer)", ex);
String rabbitAdmin = resolve(rabbitListener.admin());
if (StringUtils.hasText(rabbitAdmin)) {
Assert.state(this.beanFactory != null, "BeanFactory must be set to resolve RabbitAdmin by bean name");
try {
endpoint.setAdmin(this.beanFactory.getBean(rabbitAdmin, RabbitAdmin.class));
catch (NoSuchBeanDefinitionException ex) {
throw new BeanInitializationException("Could not register rabbit listener endpoint on [" +
adminTarget + "], no " + RabbitAdmin.class.getSimpleName() + " with id '" +
rabbitAdmin + "' was found in the application context", ex);
RabbitListenerContainerFactory<?> factory = null;
String containerFactoryBeanName = resolve(rabbitListener.containerFactory());
if (StringUtils.hasText(containerFactoryBeanName)) {
Assert.state(this.beanFactory != null, "BeanFactory must be set to obtain container factory by bean name");
try {
factory = this.beanFactory.getBean(containerFactoryBeanName, RabbitListenerContainerFactory.class);
catch (NoSuchBeanDefinitionException ex) {
throw new BeanInitializationException("Could not register rabbit listener endpoint on [" +
adminTarget + "] for bean " + beanName + ", no " +
RabbitListenerContainerFactory.class.getSimpleName() + " with id '" +
containerFactoryBeanName + "' was found in the application context", ex);
this.registrar.registerEndpoint(endpoint, factory);
........
这个类的代码比较长,只贴部分比较主要的部分,其他的,可以自己查看源码进行了解。
RabbitListenerAnnotationBeanPostProcessor实现了BeanPostProcessor(bean初始化后的后置处理)、Ordered(后置处理的排序)、BeanFactoryAware(注入BeanFactory)、BeanClassLoaderAware(注入BeanClassLoader)、EnvironmentAware(注入spring环境)和SmartInitializingSingleton(单例bean初始化后的回调) 6个接口。
我们需要关注的是BeanPostProcessor接口定义的方法,看postProcessAfterInitialization方法的代码,大致流程为:
1、通过AopUtils得到bean代理的对象的class
2、判断缓存中是否有该class的类型元数据,如果没有则调用buildMetadata方法生成类型元数据并放入缓存
3、遍历加了@RabbitListener注解的方法,调用processAmqpListener方法进行处理
4、调用processMultiMethodListeners方法对加了@RabbitHandler的方法进行处理
关于buildMetadata方法:
代码不复杂,就是利用反射,拿到class中,添加了@RabbitListener和@RabbitHandler注解的方法。另外,从代码中也可以看出,@RabbitHandler注解要生效,必须在class上增加@RabbitListener注解
关于processAmqpListener方法:
没有什么实际内容,就干两个事情:
1、检查一下是否使用jdk代理,使用jdk代理方式必须实现了接口
2、new一个MethodRabbitListenerEndpoint对象,交由processListener方法进行处理
关于processListener方法:
1、前面大半代码都是对MethodRabbitListenerEndpoint对象的属性设置:处理消息的bean、消息处理方法的工厂类、监听的队列名。。。。
其中要关注一下setMessageHandlerMethodFactory方法,查看MessageHandlerMethodFactory接口的源码
public interface MessageHandlerMethodFactory {
InvocableHandlerMethod createInvocableHandlerMethod(Object bean, Method method);
从入参和返回值可以看出来,这个工厂的作用就是将spring的bean对象和方法包装成一个InvocableHandlerMethod对象,也就是我们上面提到的(对象+方法)。
2、通过beanFactory获取RabbitListenerContainerFactory类的bean。
3、调用RabbitListenerEndpointRegistar的registerEndpoint方法注册mq消息消费端点。
继续往下追,看一下RabbitListenerEndpointRegistar的代码:
public class RabbitListenerEndpointRegistrar implements BeanFactoryAware, InitializingBean {
// 将整个endpointDescriptors数组进行注册
protected void registerAllEndpoints() {
synchronized (this.endpointDescriptors) {
for (AmqpListenerEndpointDescriptor descriptor : this.endpointDescriptors) {
this.endpointRegistry.registerListenerContainer(
descriptor.endpoint, resolveContainerFactory(descriptor));
this.startImmediately = true; // trigger immediate startup
// 解析得到RabbitListenerContainerFactory
// 如果AmqpListenerEndpointDescriptor 的containerFactory属性不为空,直接返回containerFactory
// 如果为空,尝试从beanFactory获取
private RabbitListenerContainerFactory<?> resolveContainerFactory(AmqpListenerEndpointDescriptor descriptor) {
if (descriptor.containerFactory != null) {
return descriptor.containerFactory;
else if (this.containerFactory != null) {
return this.containerFactory;
else if (this.containerFactoryBeanName != null) {
Assert.state(this.beanFactory != null, "BeanFactory must be set to obtain container factory by bean name");
this.containerFactory = this.beanFactory.getBean(
this.containerFactoryBeanName, RabbitListenerContainerFactory.class);
return this.containerFactory; // Consider changing this if live change of the factory is required
else {
throw new IllegalStateException("Could not resolve the " +
RabbitListenerContainerFactory.class.getSimpleName() + " to use for [" +
descriptor.endpoint + "] no factory was given and no default is set.");
// new一个AmqpListenerEndpointDescriptor对象
// 如果立即启动,则调用RabbitListenerEndpointRegistry注册器来注册消息监听
// 如果不是立即启动,则添加到endpointDescriptors列表中,后面通过registerAllEndpoints方法统一启动
public void registerEndpoint(RabbitListenerEndpoint endpoint, RabbitListenerContainerFactory<?> factory) {
Assert.notNull(endpoint, "Endpoint must be set");
Assert.hasText(endpoint.getId(), "Endpoint id must be set");
// Factory may be null, we defer the resolution right before actually creating the container
AmqpListenerEndpointDescriptor descriptor = new AmqpListenerEndpointDescriptor(endpoint, factory);
synchronized (this.endpointDescriptors) {
if (this.startImmediately) { // Register and start immediately
this.endpointRegistry.registerListenerContainer(descriptor.endpoint,
resolveContainerFactory(descriptor), true);
else {
this.endpointDescriptors.add(descriptor);
从上面的代码可以看出,我们关心的内容,应该是在RabbitListenerEndpointRegistry类的registerListenerContainer方法!!
public class RabbitListenerEndpointRegistry implements DisposableBean, SmartLifecycle, ApplicationContextAware,
ApplicationListener<ContextRefreshedEvent> {
// 检查是否被注册过,注册过就不能注册第二次
// 调用createListenerContainer创建消息监听
// 关于分组消费的,我们不关心
// 是否立即启动,是的话,同步调用startIfNecessary方法
public void registerListenerContainer(RabbitListenerEndpoint endpoint, RabbitListenerContainerFactory<?> factory,
boolean startImmediately) {
Assert.notNull(endpoint, "Endpoint must not be null");
Assert.notNull(factory, "Factory must not be null");
String id = endpoint.getId();
Assert.hasText(id, "Endpoint id must not be empty");
synchronized (this.listenerContainers) {
Assert.state(!this.listenerContainers.containsKey(id),
"Another endpoint is already registered with id '" + id + "'");
MessageListenerContainer container = createListenerContainer(endpoint, factory);
this.listenerContainers.put(id, container);
if (StringUtils.hasText(endpoint.getGroup()) && this.applicationContext != null) {
List<MessageListenerContainer> containerGroup;
if (this.applicationContext.containsBean(endpoint.getGroup())) {
containerGroup = this.applicationContext.getBean(endpoint.getGroup(), List.class);
else {
containerGroup = new ArrayList<MessageListenerContainer>();
this.applicationContext.getBeanFactory().registerSingleton(endpoint.getGroup(), containerGroup);
containerGroup.add(container);
if (startImmediately) {
startIfNecessary(container);
// 其实就是调用了RabbitListenerContainerFactory的createListenerContainer生成了一个MessageListenerContainer对象
protected MessageListenerContainer createListenerContainer(RabbitListenerEndpoint endpoint,
RabbitListenerContainerFactory<?> factory) {
MessageListenerContainer listenerContainer = factory.createListenerContainer(endpoint);
if (listenerContainer instanceof InitializingBean) {
try {
((InitializingBean) listenerContainer).afterPropertiesSet();
catch (Exception ex) {
throw new BeanInitializationException("Failed to initialize message listener container", ex);
int containerPhase = listenerContainer.getPhase();
if (containerPhase < Integer.MAX_VALUE) { // a custom phase value
if (this.phase < Integer.MAX_VALUE && this.phase != containerPhase) {
throw new IllegalStateException("Encountered phase mismatch between container factory definitions: " +
this.phase + " vs " + containerPhase);
this.phase = listenerContainer.getPhase();
return listenerContainer;
createListenerContainer方法调用了RabbitListenerContainerFactory接口的createListenerContainer方法创建一个MessageListenerContainer对象。
在这里,如果是通过RabbitAutoConfiguration自动配置的,那么RabbitListenerContainerFactory接口的具体实现类是SimpleRabbitListenerContainerFactory,MessageListenerContainer接口的具体实现类是SimpleMessageListenerContainer。有兴趣的话,可以去看下rabbitMq自动配置的几个类。
RabbitListenerContainerFactory接口的createListenerContainer方法是由AbstractRabbitListenerContainerFactory抽象类实现,代码如下:
@Override
public C createListenerContainer(RabbitListenerEndpoint endpoint) {
C instance = createContainerInstance();
if (this.connectionFactory != null) {
instance.setConnectionFactory(this.connectionFactory);
if (this.errorHandler != null) {
instance.setErrorHandler(this.errorHandler);
if (this.messageConverter != null) {
instance.setMessageConverter(this.messageConverter);
if (this.acknowledgeMode != null) {
instance.setAcknowledgeMode(this.acknowledgeMode);
if (this.channelTransacted != null) {
instance.setChannelTransacted(this.channelTransacted);
if (this.autoStartup != null) {
instance.setAutoStartup(this.autoStartup);
if (this.phase != null) {
instance.setPhase(this.phase);
instance.setListenerId(endpoint.getId());
// 最重要的一行!!!
endpoint.setupListenerContainer(instance);
initializeContainer(instance);
return instance;
乍一看,都是对MessageListenerContainer实例的初始化,实际上有一行,相当重要“ endpoint.setupListenerContainer(instance); ”,这一行最终是走到
AbstractRabbitListenerEndpoint.setupListenerContainer
public abstract class AbstractRabbitListenerEndpoint implements RabbitListenerEndpoint, BeanFactoryAware {
......
// 设置MessageListenerContainer,最重要的就是设置监听的队列名称!!!
@Override
public void setupListenerContainer(MessageListenerContainer listenerContainer) {
SimpleMessageListenerContainer container = (SimpleMessageListenerContainer) listenerContainer;
boolean queuesEmpty = getQueues().isEmpty();
boolean queueNamesEmpty = getQueueNames().isEmpty();
if (!queuesEmpty && !queueNamesEmpty) {
throw new IllegalStateException("Queues or queue names must be provided but not both for " + this);
if (queuesEmpty) {
Collection<String> names = getQueueNames();
container.setQueueNames(names.toArray(new String[names.size()]));
else {
Collection<Queue> instances = getQueues();
container.setQueues(instances.toArray(new Queue[instances.size()]));
container.setExclusive(isExclusive());
if (getPriority() != null) {
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-priority", getPriority());
container.setConsumerArguments(args);
if (getAdmin() != null) {
container.setRabbitAdmin(getAdmin());
setupMessageListener(listenerContainer);
// 创建MessageListener
protected abstract MessageListener createMessageListener(MessageListenerContainer container);
// 创建MessageListener,设置到MessageListenerContainer 里
private void setupMessageListener(MessageListenerContainer container) {
MessageListener messageListener = createMessageListener(container);
Assert.state(messageListener != null, "Endpoint [" + this + "] must provide a non null message listener");
container.setupMessageListener(messageListener);
......
用@RabbitLinstener注解的方法,使用的endpoint是MethodRabbitListenerEndpoint继承自AbstractRabbitListenerEndpoint,所以看看AbstractRabbitListenerEndpoint的createMessageListener方法
public class MethodRabbitListenerEndpoint extends AbstractRabbitListenerEndpoint {
......
@Override
protected MessagingMessageListenerAdapter createMessageListener(MessageListenerContainer container) {
Assert.state(this.messageHandlerMethodFactory != null,
"Could not create message listener - MessageHandlerMethodFactory not set");
MessagingMessageListenerAdapter messageListener = createMessageListenerInstance();
messageListener.setHandlerMethod(configureListenerAdapter(messageListener));
String replyToAddress = getDefaultReplyToAddress();
if (replyToAddress != null) {
messageListener.setResponseAddress(replyToAddress);
MessageConverter messageConverter = container.getMessageConverter();
if (messageConverter != null) {
messageListener.setMessageConverter(messageConverter);
if (getBeanResolver() != null) {
messageListener.setBeanResolver(getBeanResolver());
return messageListener;
protected MessagingMessageListenerAdapter createMessageListenerInstance() {
return new MessagingMessageListenerAdapter(this.bean, this.method);
......
从上面代码可以看出,createMessageListener方法返回了一个MessagingMessageListenerAdapter实例,MessagingMessageListenerAdapter实现了MessageListener接口
到这里,我们就能得出一些结论:
1、有@RabbitListener注解的方法,会生成MethodRabbitListenerEndpoint对象
2、通过MethodRabbitListenerEndpoint对象和SimpleRabbitListenerContainerFactory工厂bean,生成SimpleMessageListenerContainer对象
3、SimpleMessageListenerContainer对象保存了要监听的队列名,创建了用于处理消息的MessagingMessageListenerAdapter实例
4、MessagingMessageListenerAdapter持有@RabbitListener注解的对象和方法,起到一个适配器的作用
SimpleMessageListenerContainer是相当重要的一个类,,包装了整个mq消息消费需要的信息:
1、保存了要监听的队列名,启动的时候,根据队列名创建从服务器拉取消息的consumer——BlockingQueueConsumer
2、创建了一个MessagingMessageListenerAdapter对象,当consumer从服务器拿到消息后,由MessagingMessageListenerAdapter进行处理
3、谁来做数据转换?
是MessagingMessageListenerAdapter,有兴趣的,可以看看MessagingMessageListenerAdapter转换参数的源码~~
以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。
您可能感兴趣的文章:
Springboot的yml配置文件用法 2023-03-03
Maven聚合开发实例详解 2023-03-03
springcloud整合openfeign使用实例详解 2023-03-03
解读@RabbitListener起作用的原理 2023-03-03
Springboot多数据源配置之整合dynamic-datasource方 2023-03-03
解读动态数据源dynamic-datasource-spring-boot- 2023-03-03
Windows 10卸载JDK1.8超详细图文教程 2023-03-03
美国设下计谋,用娘炮文化重塑日本,已影响至中国 2021-11-19 时空伴随者是什么意思?时空伴随者介绍 2021-11-09 工信部称网盘企业免费用户最低速率应满足基本下载需求,天翼云盘回应:坚决支持,始终 2021-11-05 2022年放假安排出炉:五一连休5天 2022年所有节日一览表 2021-10-26
电脑版 - 返回首页
2006-2023 脚本之家 JB51.Net , All Rights Reserved. 苏ICP备14036222号