一文读懂Guava EventBus(订阅\发布事件)

一文读懂Guava EventBus(订阅\发布事件)

​作者:京东科技 刘子洋

背景

最近项目出现同一消息发送多次的现象,对下游业务方造成困扰,经过排查发现使用EventBus方式不正确。也借此机会学习了下EventBus并进行分享。以下为分享内容,本文主要分为五个部分,篇幅较长,望大家耐心阅读。

  • 1、简述:简单介绍EventBus及其组成部分。
  • 2、原理解析:主要对listener注册流程及Event发布流程进行解析。
  • 3、使用指导:EventBus简单的使用指导。
  • 4、注意事项:在使用EventBus中需要注意的一些隐藏逻辑。
  • 5、分享时提问的问题
  • 6、项目中遇到的问题:上述问题进行详细描述并复现场景。

1、简述

1.1、概念

下文摘自EventBus源码注释,从注释中可以直观了解到他的 功能、特性、注意事项

【源码注释】

Dispatches events to listeners, and provides ways for listeners to register themselves.
The EventBus allows publish-subscribe-style communication between components without requiring the components to explicitly register with one another (and thus be aware of each other). It is designed exclusively to replace traditional Java in-process event distribution using explicit registration. It is not a general-purpose publish-subscribe system, nor is it intended for interprocess communication.
Receiving Events
To receive events, an object should:

Expose a public method, known as the event subscriber, which accepts a single argument of the type of event desired;

Mark it with a Subscribe annotation;

Pass itself to an EventBus instance's register(Object) method.

Posting Events
To post an event, simply provide the event object to the post(Object) method. The EventBus instance will determine the type of event and route it to all registered listeners.
Events are routed based on their type — an event will be delivered to any subscriber for any type to which the event is assignable. This includes implemented interfaces, all superclasses, and all interfaces implemented by superclasses.
When post is called, all registered subscribers for an event are run in sequence, so subscribers should be reasonably quick. If an event may trigger an extended process (such as a database load), spawn a thread or queue it for later. (For a convenient way to do this, use an AsyncEventBus.)
Subscriber Methods
Event subscriber methods must accept only one argument: the event.
Subscribers should not, in general, throw. If they do, the EventBus will catch and log the exception. This is rarely the right solution for error handling and should not be relied upon; it is intended solely to help find problems during development.
The EventBus guarantees that it will not call a subscriber method from multiple threads simultaneously, unless the method explicitly allows it by bearing the AllowConcurrentEvents annotation. If this annotation is not present, subscriber methods need not worry about being reentrant, unless also called from outside the EventBus.
Dead Events
If an event is posted, but no registered subscribers can accept it, it is considered "dead." To give the system a second chance to handle dead events, they are wrapped in an instance of DeadEvent and reposted.
If a subscriber for a supertype of all events (such as Object) is registered, no event will ever be considered dead, and no DeadEvents will be generated. Accordingly, while DeadEvent extends Object, a subscriber registered to receive any Object will never receive a DeadEvent.
This class is safe for concurrent use.
See the Guava User Guide article on EventBus.
Since:
10.0
Author:
Cliff Biffle

1.2、系统流程



1.3、组成部分


1.3.1、调度器

EventBus、AsyncEventBus都是一个调度的角色,区别是一个同步一个异步。

  • EventBus
源码注释:
> Dispatches events to listeners, and provides ways for listeners to register themselves.
意思是说EventBus分发事件(Event)给listeners处理,并且提供listeners注册自己的方法。从这里我们可以看出EventBus主要是一个调度的角色。
EventBus总结
- 1.同步执行,事件发送方在发出事件之后,会等待所有的事件消费方执行完毕后,才会回来继续执行自己后面的代码。
- 2.事件发送方和事件消费方会在同一个线程中执行,消费方的执行线程取决于发送方。
- 3.同一个事件的多个订阅者,在接收到事件的顺序上面有不同。谁先注册到EventBus的,谁先执行,如果是在同一个类中的两个订阅者一起被注册到EventBus的情况,收到事件的顺序跟方法名有关。
 


  • AsyncEventBus
 源码注释:
> An {@link EventBus} that takes the Executor of your choice and uses it to dispatch events, allowing dispatch to occur asynchronously.
意思是说AsyncEventBus就是EventBus,只不过AsyncEventBus使用你指定的线程池(不指定使用默认线程池)去分发事件(Event),并且是异步进行的。
**AsyncEventBus总结**
- 1.异步执行,事件发送方异步发出事件,不会等待事件消费方是否收到,直接执行自己后面的代码。
- 2.在定义AsyncEventBus时,构造函数中会传入一个线程池。事件消费方收到异步事件时,消费方会从线程池中获取一个新的线程来执行自己的任务。
- 3.同一个事件的多个订阅者,它们的注册顺序跟接收到事件的顺序上没有任何联系,都会同时收到事件,并且都是在新的线程中,**异步并发**的执行自己的任务。
 


1.3.2、事件承载器

  • Event

事件主体,用于承载消息。

  • DeadEvent
 源码注释:
>Wraps an event that was posted, but which had no subscribers and thus could not be delivered, Registering a DeadEvent subscriber is useful for debugging or logging, as it can detect misconfigurations in a system's event distribution.
意思是说DeadEvent就是一个被包装的event,只不过是一个没有订阅者无法被分发的event。我们可以在开发时注册一个DeadEvent,因为它可以检测系统事件分布中的错误配置。
 


1.3.3、事件注册中心

 源码注释:
>  Registry of subscribers to a single event bus.
意思是说SubscriberRegistry是单个事件总线(EventBus)的订阅者注册表。

1.3.4、事件分发器

Dispatcher

源码注释:
>Handler for dispatching events to subscribers, providing different event ordering guarantees that make sense for different situations.
>Note: The dispatcher is orthogonal to the subscriber's Executor. The dispatcher controls the order in which events are dispatched, while the executor controls how (i.e. on which thread) the subscriber is actually called when an event is dispatched to it.
意思是说Dispatcher主要任务是将事件分发到订阅者,并且可以不同的情况,按不同的顺序分发。


Dispatcher有三个子类,用以满足不同的分发情况

1.PerThreadQueuedDispatcher

 源码注释:
> Returns a dispatcher that queues events that are posted reentrantly on a thread that is already dispatching an event, guaranteeing that all events posted on a single thread are dispatched to all subscribers in the order they are posted.
> When all subscribers are dispatched to using a direct executor (which dispatches on the same thread that posts the event), this yields a breadth-first dispatch order on each thread. That is, all subscribers to a single event A will be called before any subscribers to any events B and C that are posted to the event bus by the subscribers to A.
意思是说一个线程在处理事件过程中又发布了一个事件,PerThreadQueuedDispatcher会将后面这个事件放到最后,从而保证在单个线程上发布的所有事件都按其发布顺序分发给订阅者。**注意,每个线程都要自己存储事件的队列。**
第二段是说PerThreadQueuedDispatcher按**广度优先**分发事件。并给了一个例子:
代码中发布了事件A,订阅者收到后,在执行过程中又发布了事件B和事件C,PerThreadQueuedDispatcher会确保事件A分发给所有订阅者后,再分发B、C事件。
 


2.LegacyAsyncDispatcher

源码注释:
> Returns a dispatcher that queues events that are posted in a single global queue. This behavior matches the original behavior of AsyncEventBus exactly, but is otherwise not especially useful. For async dispatch, an immediate dispatcher should generally be preferable.
意思是说LegacyAsyncDispatcher有一个全局队列用于存放所有事件,LegacyAsyncDispatcher特性与AsyncEventBus特性完全相符,除此之外没有其他什么特性。如果异步分发的话,最好用immediate dispatcher。

3.ImmediateDispatcher

源码注释:
> Returns a dispatcher that dispatches events to subscribers immediately as they're posted without using an intermediate queue to change the dispatch order. This is effectively a depth-first dispatch order, vs. breadth-first when using a queue.
意思是说ImmediateDispatcher在发布事件时立即将事件分发给订阅者,而不使用中间队列更改分发顺序。这实际上是**深度优先**的调度顺序,而不是使用队列时的**广度优先**。

1.3.4、订阅者

  • Subscriber
源码注释:
> A subscriber method on a specific object, plus the executor that should be used for dispatching events to it.
Two subscribers are equivalent when they refer to the same method on the same object (not class). This property is used to ensure that no subscriber method is registered more than once.