相关文章推荐
腼腆的手链  ·  【极米投影仪】怎么样 推荐 测评-什么值得买·  6 月前    · 
要出家的签字笔  ·  尚小云故居_百度百科·  12 月前    · 
月球上的小摩托  ·  《指环王》中最打动我的七件中土旧事-中青在线·  1 年前    · 
快乐的卤蛋  ·  江淮面包车 - 抖音·  1 年前    · 
聪明伶俐的煎鸡蛋  ·  前妻,别来无恙漫画免费 - ...·  1 年前    · 
Code  ›  Spring 源码分析衍生篇十三 :事务扩展机制 TransactionSynchronization
https://blog.csdn.net/qq_36882793/article/details/122101340
从容的鼠标
11 月前
  • 一、前言
  • 二、TransactionSynchronization
    • 1. TransactionSynchronization
      • 1.1 TransactionSynchronization 的定义
      • 1.2 TransactionSynchronizationManager
    • 2. TransactionSynchronization 原理简述
      • 2.1 TransactionSynchronization#beforeCommit
      • 2.2 TransactionSynchronization#beforeCompletion
      • 2.3 TransactionSynchronization#afterCommit
      • 2.4 TransactionSynchronization#afterCompletion
    • 3. 总结
  • 三、TransactionalEventListener
    • 1. TransactionalEventListener
    • 2. 执行流程
      • 2.1 EventListenerMethodProcessor
        • 2.1.1 EventListenerMethodProcessor 的注入
        • 2.1.2 EventListenerMethodProcessor 的调用
      • 2.2 EventListenerFactory
      • 2.3 TransactionalApplicationListenerMethodAdapter
        • 2.3.1 TransactionalApplicationListenerSynchronization
        • 2.3.2 ApplicationListenerMethodAdapter#processEvent

        本文是 Spring源码分析的衍生文章。主要是因为本人菜鸡,在分析源码的过程中还有一些其他的内容不理解,故开设衍生篇来完善内容以学习。

        全集目录: Spring源码分析:全集整理

        背景不知道写啥

        二、TransactionSynchronization

        1. TransactionSynchronization

        我们这里以一个 Demo 为例,如下:当调用 DemoService#testTransactionSynchronization 方法时会往sys_role 插入role_id = 1 和 role_id = 2 的两条记录。同时该方法通过 @Transactional(rollbackFor = Exception.class) 开启了事务。

        @Service
        public class DemoServiceImpl implements DemoService {
             * 节省空间 addRole 和 addPermission 的实现就不给出. 就是简单的往表里插入一条记录
             * @return
            @Transactional(rollbackFor = Exception.class)
            @Override
            public String testTransactionSynchronization() {
            	// step1 : 在 sys_role 表中插入一条 role_id = 1 的记录
                addRole(1);
                // step2 : 在 sys_role 表中插入一条 role_id = 2 的记录
                addRole(2);
                System.out.println("end");
                return "";
            //  在 sys_role 表中插入一条 role_id = id 的记录
            @Override
            public void addRole(int id) {
                ...
        	// step3 : 在 sys_rermission 表中插入一条 permission_id = id 的记录。节省空间不给出实现
        	@Override
        	public void addPermission(int id) {
        		...
        

        那么可以预想到,step1 和 step 2 要么同时成功,要么同时失败、现在更改一下需求 :增加 step3 在 DemoService#testTransactionSynchronization 方法的事务提交后才执行。

        实现方法有若干种,这里介绍本文的 TransactionSynchronization 的使用。借助 TransactionSynchronization,可以实现在事务挂起、恢复、提交前后、提交后等时机进行完成指定的操作。具体实现如下:

            @Transactional(rollbackFor = Exception.class)
            @Override
            public String testTransactionSynchronization() {
            	// step1 : 在 sys_role 表中插入一条 role_id = 1 的记录
                addRole(1);
                // step2 : 在 sys_role 表中插入一条 role_id = 2 的记录
                addRole(2);
                // 向事务同步管理器注册一个 TransactionSynchronization 匿名实现类,作用是当前事务提交后,执行 addPermission 方法。
                TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
                	// 当前事务提交后触发该方法
                	// step3 : 在 sys_rermission 表中插入一条 permission_id = id 的记录。
                    @Override
                    public void afterCommit() {
                        addPermission(1);
                });
                System.out.println("end");
                return "";
        

        上面的实际即是:当 testTransactionSynchronization 的事务提交后,才会执行 TransactionSynchronizationManager#registerSynchronization 注册的TransactionSynchronization。这里我们实现了 TransactionSynchronization#afterCommit,即会在事务提交后执行。

        看到了上面的 Demo,下面我们来了解一下涉及到的 TransactionSynchronization 和 TransactionSynchronizationManager

        1.1 TransactionSynchronization 的定义

        TransactionSynchronization 作为事务同步回调的接口,可以实现 Ordered 接口来影响它们的执行顺序。 未实现 Ordered 接口的同步将附加到同步链的末尾。TransactionSynchronization 提供了很多方法,定义如下 :

        public interface TransactionSynchronization extends Flushable {
        	/** Completion status in case of proper commit. */
        	// 正确提交时的完成状态
        	int STATUS_COMMITTED = 0;
        	/** Completion status in case of proper rollback. */
        	// 在正确回滚的情况下完成状态
        	int STATUS_ROLLED_BACK = 1;
        	/** Completion status in case of heuristic mixed completion or system errors. */
        	// 在启发式混合完成或系统错误的情况下的完成状态
        	int STATUS_UNKNOWN = 2;
        	// 事务挂起时调用。 如果管理任何资源,应该从 TransactionSynchronizationManager 取消绑定资源。
        	default void suspend() {
        	// 事务恢复时调用。如果管理任何资源,应该将资源重新绑定到 TransactionSynchronizationManager
        	default void resume() {
        	// 将底层会话刷新到数据存储区(如果适用):例如,Hibernate/JPA 会话。
        	@Override
        	default void flush() {
        	// 事务提交前调用。此处若发生异常,会导致回滚。
        	default void beforeCommit(boolean readOnly) {
        	// 事务提交前, 在 beforeCommit 后调用。此处若发生异常,不会导致回滚。
        	default void beforeCompletion() {
        	// 事务提交后调用
        	default void afterCommit() {
        	// 事务提交 或回滚后执行。
        	default void afterCompletion(int status) {
        

        1.2 TransactionSynchronizationManager

        TransactionSynchronizationManager 事务同步管理器,保存的是各个线程中的事务信息。Spring 在事务过程中通过此类来管理事务。部分定义如下:

        public abstract class TransactionSynchronizationManager {
             //线程上下文中保存着【线程池对象:ConnectionHolder】的Map对象。线程可以通过该属性获取到同一个Connection对象。
            private static final ThreadLocal<Map<Object, Object>> resources = new NamedThreadLocal<>("Transactional resources");
            //事务同步器,是Spring交由程序员进行扩展的代码,每个线程可以注册N个事务同步器。
            private static final ThreadLocal<Set<TransactionSynchronization>> synchronizations = new NamedThreadLocal<>("Transaction synchronizations");
            // 事务的名称  
            private static
        
        
        
        
            
         final ThreadLocal<String> currentTransactionName = new NamedThreadLocal<>("Current transaction name");
            // 事务是否是只读  
            private static final ThreadLocal<Boolean> currentTransactionReadOnly = new NamedThreadLocal<>("Current transaction read-only status");
            // 事务的隔离级别
            private static final ThreadLocal<Integer> currentTransactionIsolationLevel = new NamedThreadLocal<>("Current transaction isolation level");
            // 事务是否开启   actual:真实的
            private static final ThreadLocal<Boolean> actualTransactionActive = new NamedThreadLocal<>("Actual transaction active");
        	....
        

        其中 TransactionSynchronizationManager#registerSynchronization 实现如下:

        	public static void registerSynchronization(TransactionSynchronization synchronization)
        			throws IllegalStateException {
        		Assert.notNull(synchronization, "TransactionSynchronization must not be null");
        		// 获取当前线程绑定的 TransactionSynchronization 集合。
        		Set<TransactionSynchronization> synchs = synchronizations.get();
        		if (synchs == null) {
        			throw new IllegalStateException("Transaction synchronization is not active");
        		// 将当前 新增的  TransactionSynchronization 添加到集合中
        		synchs.add(synchronization);
        

        可以看到,在 TransactionSynchronizationManager#registerSynchronization中会将注册的TransactionSynchronization 添加到 TransactionSynchronizationManager#synchronizations集合中,当事务执行到指定阶段后进行触发,我们下面来详细看一看。

        2. TransactionSynchronization 原理简述

        我们这里不关注 事务的挂起恢复等场景(太复杂,写一半放弃了 ),只关注 beforeCommit、beforeCompletion、afterCommit、afterCompletion 四个常用的方法 的调用时机 。常规的调用顺序为 :

        业务代码  -> beforeCommit -> beforeCompletion -> afterCommit -> afterCompletion 
        

        在 Spring 中事务提交会调用 AbstractPlatformTransactionManager#processCommit 方法,上述四个方法都在该方法中有调用,其实现如下:

        	private void processCommit(DefaultTransactionStatus status) throws TransactionException {
        		try {
        			boolean beforeCompletionInvoked = false;
        			try {
        				boolean unexpectedRollback = false;
        				// 预留操作
        				prepareForCommit(status);
        				// 调用自定义触发器的方法
        				// 1. 触发 TransactionSynchronization#beforeCommit
        				triggerBeforeCommit(status);
        				// 2.1 触发 TransactionSynchronization#beforeCompletion
        				triggerBeforeCompletion(status);
        				beforeCompletionInvoked = true;
        				// 如果设置了保存点信息
        				if (status.hasSavepoint()) {
        					// 清除保存点信息
        					unexpectedRollback = status.isGlobalRollbackOnly();
        					status.releaseHeldSavepoint();
        				// 如果是新事物
        				else if (status.isNewTransaction()) {
        					unexpectedRollback = status.isGlobalRollbackOnly();
        					// 如果是独立事务则直接提交
        					doCommit(status);
        				// 不是新事物并不会直接提交,而是等最外围事务进行提交。
        				else if (isFailEarlyOnGlobalRollbackOnly()) {
        					unexpectedRollback = status.isGlobalRollbackOnly();
        				// Throw UnexpectedRollbackException if we have a global rollback-only
        				// marker but still didn't get a corresponding exception from commit.
        				if (unexpectedRollback) {
        					throw new UnexpectedRollbackException(
        							"Transaction silently rolled back because it has been marked as rollback-only");
        			catch (UnexpectedRollbackException ex) {
        				// can only be caused by doCommit
        				// 4.1 触发 TransactionSynchronization#afterCompletion
        				triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
        				throw ex;
        			catch (TransactionException ex) {
        				// can only be caused by doCommit
        				if (isRollbackOnCommitFailure()) {
        					// 2.2 触发 TransactionSynchronization#beforeCompletion
        					doRollbackOnCommitException(status, ex);
        				else {
        					// 4.2 触发 TransactionSynchronization#afterCompletion
        					triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
        				throw ex;
        			catch (RuntimeException | Error ex) {
        				if (!beforeCompletionInvoked) {
        					triggerBeforeCompletion(status);
        				// 提交过程中会出现异常则回滚
        				doRollbackOnCommitException(status, ex);
        				throw ex;
        			// Trigger afterCommit callbacks, with an exception thrown there
        			// propagated to callers but the transaction still considered as committed.
        			try {
        				// 3. 触发 TransactionSynchronization#afterCommit
        				triggerAfterCommit(status);
        			finally {
        				// 4.3 触发 TransactionSynchronization#afterCompletion
        				triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED);
        		finally {
        			// 清理事务信息
        			cleanupAfterCompletion(status);
        

        上面的代码中我们标注了每个方法的调用,下面我们具体来看

        2.1 TransactionSynchronization#beforeCommit

        TransactionSynchronization#beforeCommit 在事务提交前触发,在当前方法抛出异常会导致整个事务回滚。

        上面代码注释标明在 AbstractPlatformTransactionManager#triggerBeforeCommit 方法中调用了该方法,其实现如下:

        	protected final void triggerBeforeCommit(DefaultTransactionStatus status) {
        		if (status.isNewSynchronization())
        
        
        
        
            
         {
        			// 触发 TransactionSynchronization#beforeCommit 方法。入参是当前事务是否只读
        			TransactionSynchronizationUtils.triggerBeforeCommit(status.isReadOnly());
        

        TransactionSynchronizationUtils#triggerBeforeCommit 实现如下:

        	public static void triggerBeforeCommit(boolean readOnly) {
        		// 从 TransactionSynchronizationManager 中获取所有注册的 TransactionSynchronization 触发 beforeCommit 方法
        		for (TransactionSynchronization synchronization : TransactionSynchronizationManager.getSynchronizations()) {
        			synchronization.beforeCommit(readOnly);
        

        2.2 TransactionSynchronization#beforeCompletion

        TransactionSynchronization#beforeCompletion 在事务提交前,在 TransactionSynchronization#beforeCommit 后调用。在AbstractPlatformTransactionManager#triggerBeforeCompletion 方法调用时自身捕获了异常打印了 debug 级别日志,所以如果该方法抛出异常并不会导致事务回滚。

        AbstractPlatformTransactionManager#triggerBeforeCompletion 实现如下:

        	protected final void triggerBeforeCompletion(DefaultTransactionStatus status) {
        		if (status.isNewSynchronization()) {
        			if (status.isDebug()) {
        				logger.trace("Triggering beforeCompletion synchronization");
        			TransactionSynchronizationUtils.triggerBeforeCompletion();
        

        TransactionSynchronizationUtils#triggerBeforeCompletion 实现如下

        	public static void triggerBeforeCompletion() {
        		for (TransactionSynchronization synchronization : TransactionSynchronizationManager.getSynchronizations()) {
        			try {
        				synchronization.beforeCompletion();
        			catch (Throwable ex) {
        				// 捕获beforeCompletion抛出的异常,打印 debug 日志、
        				logger.debug("TransactionSynchronization.beforeCompletion threw exception", ex);
        

        这里注意 beforeCompletion 的调用场景有两处。

        • 代码注释 2.1 处是常规流程的执行
        • 代码注释 2.2 处则是执行出现异常时,即当代码执行了 代码注释1处后抛出异常时执行。

        2.3 TransactionSynchronization#afterCommit

        TransactionSynchronization#afterCommit 在事务提交后调用。该方法抛出异常也不会导致事务回滚但是会触发 TransactionSynchronization#afterCompletion 方法。

        AbstractPlatformTransactionManager#triggerAfterCommit 实现如下:

        	private void triggerAfterCommit(DefaultTransactionStatus status) {
        		if (status.isNewSynchronization()) {
        			TransactionSynchronizationUtils.triggerAfterCommit();
        

        TransactionSynchronizationUtils#triggerAfterCommit 实现如下:

        	public static void triggerAfterCommit() {
        		invokeAfterCommit(TransactionSynchronizationManager.getSynchronizations());
        	public static void invokeAfterCommit(@Nullable List<TransactionSynchronization> synchronizations) {
        		if (synchronizations != null) {
        			for (TransactionSynchronization synchronization : synchronizations) {
        				synchronization.afterCommit();
        

        2.4 TransactionSynchronization#afterCompletion

        TransactionSynchronization#afterCompletion 在 TransactionSynchronization#afterCommit 执行之后调用。无论事务正常提交还是异常回滚,都会触发该方法。

        需要注意的是 该方法在 AbstractPlatformTransactionManager#processRollback 中也有调用。AbstractPlatformTransactionManager#processRollback 方法是事务回滚时执行的方法,其内部也是通过 AbstractPlatformTransactionManager#triggerAfterCompletion 来触发 afterCompletion 方法,因此在这里就不再展开。

        AbstractPlatformTransactionManager#triggerAfterCompletion

        	private void triggerAfterCompletion(DefaultTransactionStatus status, int completionStatus) {
        		if (status.isNewSynchronization()) {
        			List<TransactionSynchronization> synchronizations = TransactionSynchronizationManager.getSynchronizations();
        			TransactionSynchronizationManager.clearSynchronization();
        			if (!status.hasTransaction() || status.isNewTransaction()) {
        				// No transaction or new transaction for the current scope ->
        				// invoke the afterCompletion callbacks immediately
        				// 当前作用域没有事务或有新事务, 触发 TransactionSynchronization#afterCompletion  方法
        				invokeAfterCompletion(synchronizations, completionStatus);
        			else if (!synchronizations.isEmpty()) {
        				// Existing transaction that we participate in, controlled outside
        				// of the scope of this Spring transaction manager -> try to register
        				// an afterCompletion callback with the existing (JTA) transaction.
        				// 我们参与的现有事务,在此 Spring 事务管理器范围之外控制 -> 尝试使用现有(JTA)事务注册 afterCompletion 回调。
        				registerAfterCompletionWithExistingTransaction(status.getTransaction(), synchronizations);
        	protected void registerAfterCompletionWithExistingTransaction(
        			Object transaction, List<TransactionSynchronization> synchronizations) throws TransactionException {
        		// 状态置为未知
        		invokeAfterCompletion(synchronizations, TransactionSynchronization.STATUS_UNKNOWN);
        	protected final void invokeAfterCompletion(List<TransactionSynchronization> synchronizations, int completionStatus) {
        		TransactionSynchronizationUtils.invokeAfterCompletion(synchronizations, completionStatus);
        

        3. 总结

        总体来说 TransactionSynchronization 的逻辑还是比较简单的,如下:

        1. TransactionSynchronizationManager#registerSynchronization 将 TransactionSynchronization 注册到 TransactionSynchronizationManager#synchronizations 中。
        2. 当当前线程准备提交或回滚时会通过TransactionSynchronizationManager#getSynchronizations 获取到 TransactionSynchronizationManager#synchronizations 中的事务同步类。随后执行相应的方法。

        三、TransactionalEventListener

        在 Spring framework 4.2 之后还可以使用@TransactionalEventListener处理数据库事务提交成功后再执行操作,这种方式比 TransactionSynchronization 更加优雅。我们仍以上面的Demo为例加以改造。

        如下: 同样的功能,事务内在 sys_role 表中插入一条 role_id = 10 和 20 两条记录。事务提交后执行DemoServiceImpl#onApplicationEvent 方法调用 addPermission,在 sys_rermission 表中插入一条记录。需要注意,这里不需要再实现 ApplicationListener 接口,否则会调用两次。

        @Service
        public class DemoServiceImpl implements
        
        
        
        
            
         DemoService, ApplicationContextAware {
            @Autowired
            private SysPermissionDao sysPermissionDao;
            @Autowired
            private SysRoleDao sysRoleDao;
            private ApplicationContext applicationContext;
        	@Transactional(rollbackFor = Exception.class)
            @Override
            public String testTransactionalEventListener() {
            	// step1 : 在 sys_role 表中插入一条 role_id = 10 的记录
                addRole(10);
                // step2 : 在 sys_role 表中插入一条 role_id = 20 的记录
                addRole(20);
                this.applicationContext.publishEvent(new DemoApplicationEvent(10));
                System.out.println("end");
                return "";
             * 在 sys_role 表中插入一条记录
            @Override
            public void addRole(int id) {
            	...
             * 在 sys_rermission 表中插入一条记录
            @Override
            public void addPermission(int id) {
        		...
            @Override
            public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
                this.applicationContext = applicationContext;
            public static class DemoApplicationEvent extends ApplicationEvent {
                @Getter
                private Integer id;
                public DemoApplicationEvent(Integer source) {
                    super(source);
                    this.id = source;
        	// phase 指定了执行阶段为 事务提交后。
            @TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
            public void onApplicationEvent(DemoApplicationEvent demoApplicationEvent) {
            	// step3 : 在 sys_rermission 表中插入一条记录
                addPermission(demoApplicationEvent.getId());
        

        TransactionalEventListener 通过Spring监听器的方式实现了 TransactionSynchronization 的功能,下面我们来看一看具体原理。

        1. TransactionalEventListener

        首先来看一下 TransactionalEventListener 的定义,如下:

        @Target({ElementType.METHOD, ElementType.ANNOTATION_TYPE})
        @Retention(RetentionPolicy.RUNTIME)
        @Documented
        @EventListener
        public @interface TransactionalEventListener {
        	// 执行执行阶段,默认事务提交后、除此之外还有 BEFORE_COMMIT、AFTER_ROLLBACK、AFTER_COMPLETION
        	TransactionPhase phase() default TransactionPhase.AFTER_COMMIT;
        	// 监听器的唯一id,可为空
        	String id() default "";
        	// 如果没有事务正在运行,是否应处理事件
        	boolean fallbackExecution() default false;
        	 * EventListener classes 属性的别名, 指定监听事件类型
        	 * 此侦听器处理的事件类。如果使用单个值指定此属性,则带注释的方法可以选择接受单个参数。 但是,如果此属性指定了多个值,则注释方法不得声明任何参数。
        	@AliasFor(annotation = EventListener.class, attribute = "classes")
        	Class<?>[] value() default {};
        	 * EventListener classes 属性的别名, 指定监听事件类型
        	 * 此侦听器处理的事件类。如果使用单个值指定此属性,则带注释的方法可以选择接受单个参数。 但是,如果此属性指定了多个值,则注释方法不得声明任何参数。
        	@AliasFor(annotation = EventListener.class, attribute = "classes")
        	Class<?>[] classes() default {};
        	 * 用于使事件处理有条件的 Spring 表达式语言 (SpEL) 属性。
        	 * 默认值为"" ,表示始终处理事件。
        	String condition() default "";
        

        2. 执行流程

        下面我们来看看 TransactionalEventListener 注解是如何实现的。

        2.1 EventListenerMethodProcessor

        首先我们需要知道,一个被 @TransactionalEventListener 注解修饰的普通方法,如何具有监听器的效果?原因就在于 EventListenerMethodProcessor 类。

        EventListenerMethodProcessor 的结构如下:
        在这里插入图片描述

        可以看到 EventListenerMethodProcessor 实现了三个关键接口 :

        • SmartInitializingSingleton :afterSingletonsInstantiated 方法会在容器初始化所有非惰性Bean之后调用。EventListenerMethodProcessor 在 afterSingletonsInstantiated 方法中为被 @TransactionalEventListener 注解修饰的方法动态生成了 ApplicationListener 并添加到容器中。当执行的事件发送时,动态生成的 ApplicationListener 监听器则会触发,通过反射调用 注解方法。
        • ApplicationContextAware :EventListenerMethodProcessor 通过 setApplicationContext 方法 获取到 ApplicationContext 实例。
        • BeanFactoryPostProcessor : EventListenerMethodProcessor 通过 postProcessBeanFactory 方法获取到了Spring容器中的所有 EventListenerFactory 实例对象,保存到 EventListenerFactory#eventListenerFactories 中。

        下面我们来看看 EventListenerMethodProcessor 的执行过程,如下:

        2.1.1 EventListenerMethodProcessor 的注入

        Spring 容器启动时会在通过 SpringApplication#createApplicationContext 创建应用上下文,其中调用链如下:

        SpringApplication#createApplicationContext -> 
        AnnotationConfigServletWebServerApplicationContext构造函数 -> 
        AnnotatedBeanDefinitionReader构造函数 -> 
        AnnotationConfigUtils#registerAnnotationConfigProcessors
        

        在 AnnotationConfigUtils#registerAnnotationConfigProcessors 中会向容器中注册 EventListenerMethodProcessor 的 BeanDefinition,如下图:
        在这里插入图片描述

        2.1.2 EventListenerMethodProcessor 的调用

        Spring 容器启动时会在 AbstractApplicationContext#finishBeanFactoryInitialization 方法中完成非惰性加载 Bean的 的初始化。初始化结束后会调用SmartInitializingSingleton#afterSingletonsInstantiated 方法。EventListenerMethodProcessor#afterSingletonsInstantiated 的实现如下:

        	@Override
        	public void afterSingletonsInstantiated() {
        		ConfigurableListableBeanFactory beanFactory = this.beanFactory;
        		Assert.state(this.beanFactory != null, "No ConfigurableListableBeanFactory set");
        		// 获取容器中的所有 beanName 
        		String[] beanNames = beanFactory.getBeanNamesForType(Object.class);
        		for (String beanName : beanNames) {
        			if (!ScopedProxyUtils.isScopedTarget(beanName)) {
        				Class<?> type = null;
        				try {
        					// 获取 bean的 类型
        					type = AutoProxyUtils.determineTargetClass
        
        
        
        
            
        (beanFactory, beanName);
        				catch (Throwable ex) {
        					... 日志打印
        				if (type != null) {
        					// 如果 bean 是 ScopedObject类型,进一步获取真实类型
        					if (ScopedObject.class.isAssignableFrom(type)) {
        						try {
        							Class<?> targetClass = AutoProxyUtils.determineTargetClass(
        									beanFactory, ScopedProxyUtils.getTargetBeanName(beanName));
        							if (targetClass != null) {
        								type = targetClass;
        						catch (Throwable ex) {
        							... 日志打印
        					try {
        						// 处理bean
        						processBean(beanName, type);
        					catch (Throwable ex) {
        						... 抛出异常
        

        可以看到关键逻辑在 EventListenerMethodProcessor#processBean 中,其实现如下:

        	private void processBean(final String beanName, final Class<?> targetType) {
        		// nonAnnotatedClasses 缓存未命中当前类 && 给定的类是可携带指定注释的候选者 && 不是 org.springframework. 包下的类或被@Component 注解修饰
        		if (!this.nonAnnotatedClasses.contains(targetType) &&
        				AnnotationUtils.isCandidateClass(targetType, EventListener.class) &&
        				!isSpringContainerClass(targetType)) {
        			Map<Method, EventListener> annotatedMethods = null;
        			try {
        				// 寻找被 @EventListener 注解修饰的方法
        				annotatedMethods = MethodIntrospector.selectMethods(targetType,
        						(MethodIntrospector.MetadataLookup<EventListener>) method ->
        								AnnotatedElementUtils.findMergedAnnotation(method, EventListener.class));
        			catch (Throwable ex) {
        				... 日志打印
        			if (CollectionUtils.isEmpty(annotatedMethods)) {
        				// 缓存没有被 @EventListener 注解修饰的类
        				this.nonAnnotatedClasses.add(targetType);	
        				... 日志打印
        			else {
        				// Non-empty set of methods
        				// 到这里则说明当前类一定有方法被  @EventListener 注解修饰
        				ConfigurableApplicationContext context = this.applicationContext;
        				Assert.state(context != null, "No ApplicationContext set");
        				// 在 postProcessBeanFactory 方法中获取到的容器中的 EventListenerFactory 实现类
        				List<EventListenerFactory> factories = this.eventListenerFactories;
        				Assert.state(factories != null, "EventListenerFactory List not initialized");
        				// 遍历 @EventListener 注解修饰的方法
        				for (Method method : annotatedMethods.keySet()) {
        					// 遍历所有的 EventListenerFactory 
        					for (EventListenerFactory factory : factories) {
        						// 如果 EventListenerFactory 支持当前方法
        						if (factory.supportsMethod(method)) {
        							// 获取可调用的方法:context.getType(beanName) 获取到的可能是代理类,这里会找到真正要调用的诶类的方法
        							Method methodToUse = AopUtils.selectInvocableMethod(method, context.getType(beanName));
        							// 通过 EventListenerFactory#createApplicationListener 方法为找到的方法创建一个 ApplicationListener实现类
        							ApplicationListener<?> applicationListener =
        									factory.createApplicationListener(beanName, targetType, methodToUse);
        							// ApplicationListenerMethodAdapter类型则调用 init 完成初始化
        							if (applicationListener instanceof ApplicationListenerMethodAdapter) {
        								((ApplicationListenerMethodAdapter) applicationListener).init(context, this.evaluator);
        							// applicationListener 添加到 Spring 容器中
        							context.addApplicationListener(applicationListener);
        							break;
        				... 日志打印
        

        总结如下:

        1. EventListenerMethodProcessor 在 EventListenerMethodProcessor#postProcessBeanFactory 中获取到了所有 EventListenerFactory 类型的 bean。(在ConfigurationClassPostProcessor 中将所有bean 的BeanDefinition 扫描生成。而 ConfigurationClassPostProcessor 的优先级在EventListenerMethodProcessor 之前 )。
        2. 随后在EventListenerMethodProcessor#afterSingletonsInstantiated 方法中获取所有bean,筛选出被 @EventListener 注解修饰的类或方法的类。
        3. 随后找到 EventListenerFactory,通过 EventListenerFactory#supportsMethod 判断是可以处理当前类,如果可以则通过 EventListenerFactory#createApplicationListener 为当前类生成一个 ApplicationListener (ApplicationListenerMethodAdapter 或 TransactionalApplicationListenerMethodAdapter)类并添加到 容器中。
        4. 当有指定事件触发后触发容器中的 ApplicationListenerMethodAdapter 或 TransactionalApplicationListenerMethodAdapter 的 onApplicationEvent 方法。在这些方法中会通过反射调用被 @EventListener 修饰的方法。

        2.2 EventListenerFactory

        EventListenerFactory 接口定义如下

        public interface EventListenerFactory {
        	// 是否支持处理当前指定方法
        	boolean supportsMethod(Method method);
        	// 为当前指定类的指定方法创建一个 ApplicationListener
        	ApplicationListener<?> createApplicationListener(String beanName, Class<?> type, Method method);
        

        默认情况下 EventListenerFactory 有 DefaultEventListenerFactory 和 TransactionalEventListenerFactory 两个实现类。我们这里看的是 @TransactionalEventListener 注解,所以来看一下 TransactionalEventListenerFactory 的实现,如下:

        public class TransactionalEventListenerFactory implements EventListenerFactory, Ordered {
        	...
        	@Override
        	public boolean supportsMethod(Method method)
        
        
        
        
            
         {
        		// 判断当前方式是否被 TransactionalEventListener 注解修饰
        		return AnnotatedElementUtils.hasAnnotation(method, TransactionalEventListener.class);
        	@Override
        	public ApplicationListener<?> createApplicationListener(String beanName, Class<?> type, Method method) {
        		// 创建一个 TransactionalApplicationListenerMethodAdapter 实现类
        		return new TransactionalApplicationListenerMethodAdapter(beanName, type, method);
        

        这里我们可以了解到 TransactionalEventListenerFactory 会判断:如果方法被TransactionalEventListener 注解修饰,则会为其创建一个 TransactionalApplicationListenerMethodAdapter类作为 ApplicationListener。

        2.3 TransactionalApplicationListenerMethodAdapter

        TransactionalApplicationListenerMethodAdapter 结构如下:
        在这里插入图片描述

        这里我们简单看一下 TransactionalApplicationListenerMethodAdapter#onApplicationEvent 的实现

        	@Override
        	public void onApplicationEvent(ApplicationEvent event) {
        		// 如果当前存在事务. 则通过 TransactionSynchronizationManager 注册TransactionalApplicationListenerSynchronization
        		if (TransactionSynchronizationManager.isSynchronizationActive() &&
        				TransactionSynchronizationManager.isActualTransactionActive()) {
        			TransactionSynchronizationManager.registerSynchronization(
        					new TransactionalApplicationListenerSynchronization<>(event, this, this.callbacks));
        		// 没有事务,判断是否执行事件逻辑
        		else if (this.annotation.fallbackExecution()) { 
        			// ...
        			processEvent(event);
        		else {
        			... 日志打印
        

        这里我们看到

        1. 如果当前线程存在活跃事务, 则通过 TransactionSynchronizationManager 注册一个 TransactionalApplicationListenerSynchronization 实例。当事务到达指定阶段后会触发TransactionalApplicationListenerSynchronization 中的阶段方法,而这些方法中则还会调用 TransactionalApplicationListenerMethodAdapter#processEvent 方法。
        2. 如果当前线程不存在活跃事务,则根据 TransactionalEventListener#fallbackExecution 判断,如果为 true,则执行 TransactionalApplicationListenerMethodAdapter#processEvent。其中 processEvent 的实现在其父类 ApplicationListenerMethodAdapter中。

        下面我们来详细看一看

        2.3.1 TransactionalApplicationListenerSynchronization

        TransactionalApplicationListenerSynchronization 的实现如下,其中主要实现了 beforeCommit 和 afterCompletion 两个方法。

        class TransactionalApplicationListenerSynchronization<E extends ApplicationEvent>
        		implements TransactionSynchronization {
        	...
        	// 事务提交前判断
        	@Override
        	public void beforeCommit(boolean readOnly) {
        		// 如果指定阶段为事务提交前,则执行 processEventWithCallbacks
        		if (this.listener.getTransactionPhase() == TransactionPhase.BEFORE_COMMIT) {
        			processEventWithCallbacks();
        	// 事务提交后判断
        	@Override
        	public void afterCompletion(int status) {
        		TransactionPhase phase = this.listener.getTransactionPhase();
        		// 如果执行阶段为事务提交后 && 事务正常提交
        		if (phase == TransactionPhase.AFTER_COMMIT && status == STATUS_COMMITTED) {
        			processEventWithCallbacks();
        		// 如果执行阶段为事务回滚后 && 事务发生回滚
        		else if (phase == TransactionPhase.AFTER_ROLLBACK && status == STATUS_ROLLED_BACK) {
        			processEventWithCallbacks();
        		// 如果执行阶段为事务结束 && 事务发生回滚
        		else if (phase == TransactionPhase.AFTER_COMPLETION) {
        			processEventWithCallbacks();
        	private void processEventWithCallbacks() {
        		// 执行回调类的回调预处理方法
        		this.callbacks.forEach(callback -> callback.preProcessEvent(this.event));
        		try {
        			// 执行监听事件,这里的 listener 即为上层的 TransactionalApplicationListenerMethodAdapter 实例对象
        			this.listener.processEvent(this.event);
        		catch (RuntimeException | Error ex) {
        			// 执行回调类的回调后置处理方法
        			this.callbacks.forEach(callback -> callback.postProcessEvent(this.event, ex));
        			throw ex;
        		// 执行回调类的回调后置处理方法
        		this.callbacks.forEach(callback -> callback.postProcessEvent(this.event, null));
        
        2.3.2 ApplicationListenerMethodAdapter#processEvent

        ApplicationListenerMethodAdapter#processEvent 实现如下,逻辑比较简单,这里不再赘述。

        	public void processEvent(ApplicationEvent event) {
        		// 解析 method 的参数。method 指的是被 TransactionalEventListener  注解修饰的方法
        		Object[] args = resolveArguments(event);
        		// 判断是否可以处理。根据注解 TransactionalEventListener#condition 属性判断
        		if (shouldHandle(event, args)) {
        			// 反射调用 指定的方法,以 args 为入参
        			Object result = doInvoke(args);
        			if (result != null) {
        				// 如果监听方法有返回值,则对返回值处理。会将返回值作为 event 再次发送
        				handleResult(result);
        			else {
        				logger.trace("No result object given - no result to handle");
        

        以上:如有侵扰,联系删除。 内容仅用于自我记录学习使用。如有错误,欢迎指正

        一、前言二、TransactionSynchronizationManagerTransactionSynchronizationManager 事务同步管理器。Spring 在事务过程中通过此类来管理事务。TransactionSynchronizationManager 中保存的是各个线程中的事务信息。public abstract class TransactionSynchronizationManager { //线程上下文中保存着【线程池对象:ConnectionHolde
        TransactionSynchronization是一个回调接口,用于在事务执行过程中的重要时间点执行额外的逻辑,AbstractPlatformTransactionManager中通过TransactionSynchronizationManager间接调用绑定到当前事务所在线程的TransactionSynchronization集合的对应的回调方法; TransactionSynchro...
        public abstract class TransactionSynchronizationManager { TransactionSynchronizationManager是管理需要同步的事务资源,也可以管理线程,可以把资源绑定到当前线程当中。 下面介绍一下TransactionSynchronizationManager类的一些关键方法: public static boo...
        通过spring控制事务——使用TransactionSynchronizationManager 在开发spring应用时,有时我们需要根据业务需要控制事务,以此去满足特定业务。比如创建或更新某个数据后,然后启动一个同步任务执行关联的处理操作等等。这时如果将这些逻辑写在一个事务中时,事务还没提交,数据库里找不到对应数据,也就无法启动对应处理任务了。 有没有办法在确保事务提交后,再去发送这个消息呢?一般有以下几个方式: 1、把启动任务的代码写到事务外面; 2、编程式事务; 3、使用TransactionSy
        @Aync是通过AsyncAnnotationBeanPostProcessor来处理。其在setBeanFactory会设置AsyncAnnotationAdvisor,其代码如下 public void setBeanFactory(BeanFactory beanFactory) { super.setBeanFactory(beanFactory); AsyncAnnotationAdvisor advisor = new AsyncAnnotationAdvisor(this.exe
        Spring 源码分析是一个相对复杂和庞大的话题,无法在一次对话中详细讨论。不过,我可以给你提供一些关于 Spring 源码分析的一般指导和建议。 1. 了解 Spring 的核心模块:Spring 框架是模块化的,每个模块都有不同的功能和职责。在开始源码分析之前,你需要先了解 Spring 的核心模块,如 Spring Core、Spring MVC、Spring Data 等。 2. 阅读官方文档和源码注释:Spring 框架的官方文档和源码注释是你学习和理解源码的重要资源。官方文档提供了对 Spring 各个模块的详细说明,源码注释则解释了代码的作用和实现细节。 3. 调试和跟踪代码:在进行源码分析时,调试器是你的好帮手。通过设置断点、单步跟踪以及观察变量的值,你可以深入理解代码的执行流程和逻辑。 4. 理解设计模式和原理:Spring 框架采用了许多设计模式和原理来实现其功能。在分析源码时,你需要熟悉这些设计模式和原理,例如依赖注入、AOP、工厂模式等。 5. 参考开源社区和博客:Spring 框架是一个非常活跃的开源社区,许多开发者在博客和论坛上分享了他们的源码分析和理解。阅读这些文章可以帮助你更好地理解 Spring 框架的实现细节。 请记住,深入分析 Spring 源码需要耐心和时间投入,同时也需要有一定的 Java 和设计模式的基础。希望这些指导对你有所帮助!如果你有具体的问题或者需要更详细的信息,欢迎继续提问。
 
推荐文章
腼腆的手链  ·  【极米投影仪】怎么样 推荐 测评-什么值得买
6 月前
要出家的签字笔  ·  尚小云故居_百度百科
12 月前
月球上的小摩托  ·  《指环王》中最打动我的七件中土旧事-中青在线
1 年前
快乐的卤蛋  ·  江淮面包车 - 抖音
1 年前
聪明伶俐的煎鸡蛋  ·  前妻,别来无恙漫画免费 - 前妻,别来无恙漫画 - 漫画在线全集免费阅读 - 腾讯动漫
1 年前
今天看啥   ·   Py中国   ·   codingpro   ·   小百科   ·   link之家   ·   卧龙AI搜索
删除内容请联系邮箱 2879853325@qq.com
Code - 代码工具平台
© 2024 ~ 沪ICP备11025650号