);
: run : 1
: onMessageEvent1 :
: run : 2
: onMessageEvent1 :
非主线程(非阻塞式):
final String text = "长江长江我是黄河";
new Thread(new Runnable() {
@Override
public void run() {
Log.d(TAG, "run : 1");
EventBus.getDefault().post(text);//发送一个事件
Log.d(TAG, "run : 2");
EventBus.getDefault().post(text);//发送一个事件
}).start();
日志输出:
run : 1
run : 2
onMessageEvent1 :
onMessageEvent1 :
MAIN_ORDERED
和MAIN差不多,主线程调用,和 MAIN 不同的是他保证了 post 是非阻塞式的(默认走 MAIN 的非主线程的逻辑,所以可以做到非阻塞)
BACKGROUND
在子线程调用,如果发布在子线程那么直接在发布线程调用,如果发布在主线程那么将开启一个子线程来调用,这个子线程是阻塞式的,按顺序交付所有事件,所以也不适合做耗时任务,因为多个事件共用这一个后台线程
ASYNC
在子线程调用,总是开启一个新的线程来调用,适用于做耗时任务,比如数据库操作,网络请求等,不适合做计算任务,会导致开启大量线程
6、原理分析:
这里并不打算分析具体的源码逻辑,而是个人在看了源码之后的笔记。帮助自己更好的理解 eventbus 的实现原理,梳理清楚每一条逻辑。
想看源码分析可以参考这篇文章:
/** Convenience singleton for apps using a process-wide EventBus instance. */
public static EventBus getDefault() {
if (defaultInstance == null) {
synchronized (EventBus.class) {
if (defaultInstance == null) {
defaultInstance = new EventBus();
return defaultInstance;
这里在生成单例的时候使用了双重检验,避免多线程过程中重复创建。其次这里使用到了,类缩而非对象锁。
对象锁是用来控制实例方法之间的同步,而类锁是用来控制静态方法(或者静态变量互斥体)之间的同步的。
类锁只是一个概念上的东西,并不是真实存在的,他只是用来帮助我们理解锁定实例方法和静态方法的区别的。
java 类可能会有很多对象,但是只有一个 Class (字节码)对象,也就是说类的不同实例之间共享该类的 Class 对象。Class 对象其实也仅仅是 1 个 java 对象,只不过有点特殊而已。
由于每个 java 对象都有1个互斥锁,而类的静态方法是需要 Class 对象。所以所谓的类锁,只不过是 Class 对象的锁而已。
6.2 以 class 为 key 来存储方法信息
例如一个 activity 里面注册了一个 eventbus。我们每次进入activity 的时候,都会把 this 传进去,然后走一遍注册逻辑,所以你觉得内部是如何存储注册对象的呢?是按照 this 来的?
其实内部是通过 class 来存储的。
public void register(Object subscriber) {
Class<?> subscriberClass = subscriber.getClass();
//subscriberMethods返回的是subscriber这个类中所有的订阅方法
List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);
synchronized (this) {
for (SubscriberMethod subscriberMethod : subscriberMethods) {
subscribe(subscriber, subscriberMethod);//分类保存后面有分析
通过上面代码,我们可以看到会去获取当前对象的类名,然后在通过反射的形式获取该类的所有方法,从中找到订阅方法,方便以后发布消息。
如果采用对象保存,每次进入,都是一个不同的对象,然后通过对象再去获取方法信息,这样做太费力,也太耗内存了。通过类名的方式,只是第一次比较耗时,后面就方便了。
添加新方法,或者新的事件的时候,会重新编译,重新获取一遍新的数据的。
PS : 注册本身还是挂在对象上的,当对象销毁的时候,也会进行注销。
6.3 如何保存订阅同一事件的不同类
根据事件类型,将注册同一个事件类型的 class 放在一起。
// Must be called in synchronized block
private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
Class<?> eventType = subscriberMethod.eventType;//订阅函数参数类型
//这一步很简单就是在构造函数中记录下订阅者和订阅方法
Subscription newSubscription = new Subscription(subscriber, subscriberMethod);
//CopyOnWriteArrayList是java.util包下的,他使用了写时复制的方法来实现,其效率并不高,但可以保证在多线程环境下最终(强调是最终)数据的一致性
CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);//subscriptionsByEventType可以根据参数类型来获取到订阅事件
//在操作第一个订阅事件时肯定是==null的
if (subscriptions == null) {
subscriptions = new CopyOnWriteArrayList<>();
subscriptionsByEventType.put(eventType, subscriptions);
} else {
if (subscriptions.contains(newSubscription)) {
throw new EventBusException("Subscriber " + subscriber.getClass() + " already registered to event "
+ eventType);//已经注册的事件不允许再次注册
int size = subscriptions.size();
for (int i = 0; i <= size; i++) {
//根据优先级来添加
if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) {
subscriptions.add(i, newSubscription);
break;
//typesBySubscriber可以根据订阅者来获取到所有的订阅方法参数类型
List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber);
if (subscribedEvents == null) {
subscribedEvents = new ArrayList<>();
typesBySubscriber.put(subscriber, subscribedEvents);
subscribedEvents.add(eventType);
if (subscriberMethod.sticky) {//粘性事件的处理逻辑在最后再分析,因为其内容包含了post流程
if (eventInheritance) {
// Existing sticky events of all subclasses of eventType have to be considered.
// Note: Iterating over all events may be inefficient with lots of sticky events,
// thus data structure should be changed to allow a more efficient lookup
// (e.g. an additional map storing sub classes of super classes: Class -> List<Class>).
Set<Map.Entry<Class<?>, Object>> entries = stickyEvents.entrySet();
for (Map.Entry<Class<?>, Object> entry : entries) {
Class<?> candidateEventType = entry.getKey();
if (eventType.isAssignableFrom(candidateEventType)) {
Object stickyEvent = entry.getValue();
checkPostStickyEventToSubscription(newSubscription, stickyEvent);
} else {
Object stickyEvent = stickyEvents.get(eventType);
checkPostStickyEventToSubscription(newSubscription, stickyEvent);
对 subscriptionsByEventType typesBySubscriber 完成数据初始化,subscriptionsByEventType 根据参数类型存储订阅者和订阅方法,typesBySubscriber 根据订阅者存储了所有的参数类型,subscriptionsByEventType 主要是 post 时使用,因为其存储了订阅者和订阅事件这两个参数在反射时要用到,typesBySubscriber 在反注册时可以根据订阅者获取到存储的事件类型就可以从 subscriptionsByEventType 中获取到对应的订阅者和订阅方法释放资源,还可以用来判断是否注册。
6.4 如何找到订阅方法
从缓存中获取订阅方法列表,如果缓存中不存在则通过反射获取到订阅者所有的函数,遍历再通过权限修饰符,参数长度(只允许一个参数),注解(@Subscribe) 来判断是否是具备成为订阅函数的前提,具备则构建一个 SubscriberMethod (订阅方法,其相当于一个数据实体类,包含方法,threadmode,参数类型,优先级,是否粘性事件这些参数),循环结束订阅函数列表构建完成添加进入缓存
6.5 如何在子线程发布消息后在主线程处理
HandlerPoster(EventBus eventBus, Looper looper, int maxMillisInsideHandleMessage) {
super(looper);
this.eventBus = eventBus;
this.maxMillisInsideHandleMessage = maxMillisInsideHandleMessage;
queue = new PendingPostQueue();//采用独立队列,与backgroundPoster一致
可以看到,HandlerPoster 自身携带一个 looper,主要传入 mainLooper,就可以处理主线程的事物了。
6.6 是如何调用订阅方法的
通过反射的形式调用。
void invokeSubscriber(Subscription subscription, Object event) {
try {
//这里最后说明一下subscription中包含了订阅者和订阅方法 event是Post的参数 这里通过反射直接调用订阅者的订阅方法 完成本次通信
subscription.subscriberMethod.method.invoke(subscription.subscriber, event);
} catch (InvocationTargetException e) {
handleSubscriberException(subscription, event, e.getCause());
} catch (IllegalAccessException e) {
throw new IllegalStateException("Unexpected exception", e);
6.7 如何确定优先级
每次添加的时候,就会根据优先级来添加,优先级越高的,添加在最前面。
Subscription newSubscription = new Subscription(subscriber, subscriberMethod);
//CopyOnWriteArrayList是java.util包下的,他使用了写时复制的方法来实现,其效率并不高,但可以保证在多线程环境下最终(强调是最终)数据的一致性
CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);//subscriptionsByEventType可以根据参数类型来获取到订阅事件
//在操作第一个订阅事件时肯定是==null的
if (subscriptions == null) {
subscriptions = new CopyOnWriteArrayList<>();
subscriptionsByEventType.put(eventType, subscriptions);
} else {
if (subscriptions.contains(newSubscription)) {
throw new EventBusException("Subscriber " + subscriber.getClass() + " already registered to event "
+ eventType);//已经注册的事件不允许再次注册
int size = subscriptions.size();
for (int i = 0; i <= size; i++) {
//根据优先级来添加
if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) {
subscriptions.add(i, newSubscription);
break;
6.8 既然存在多线程,是如何保存数据的?
public void post(Object event) {
//currentPostingThreadState是ThreadLocal,ThreadLocal可以解决多线程的并发访问问题,他会为每一个线程提供一个独立的变量副本,可以隔离多个线程对数据的访问冲突
PostingThreadState postingState = currentPostingThreadState.get();
......
ThreadLocal 的是一个本地线程副本变量工具类。主要用于将私有线程和该线程存放的副本对象做一个映射,各个线程之间的变量互不干扰,在高并发场景下,可以实现无状态的调用,特别适用于各个线程依赖不通的变量值完成操作的场景。
final static class PostingThreadState {
// 通过post方法参数传入的事件集合
final List<Object> eventQueue = new ArrayList<Object>();
boolean isPosting; // 是否正在执行postSingleEvent()方法
boolean isMainThread;
Subscription subscription;
Object event;
boolean canceled;
Subscription(Object subscriber, SubscriberMethod subscriberMethod) {
this.subscriber = subscriber;
this.subscriberMethod = subscriberMethod;
active = true;
订阅方法的信息:
public SubscriberMethod(String methodName, Class<?> eventType, ThreadMode threadMode,
int priority, boolean sticky) {
this.methodName = methodName;
this.threadMode = threadMode;
this.eventType = eventType;
this.priority = priority;
this.sticky = sticky;
可以发现,基本上所有的信息都被包含在 PostingThreadState 中了,这样在 post 的方法中就不要额外依赖其他数据了。
6.9 发送消息逻辑过程是怎样的
post () 发送消息,首先得获取当前线程的一个发送队列。从队列里面依次取出 event ,根据 event.getClass()来获取保存的订阅者。
synchronized (this) {
//这里根据我们注册的时候总结 这个容器中装的是订阅者和订阅方法,现在根据发送事件的类型来获取到对应的订阅者和订阅方法这些参数是反射必须要用到的
subscriptions = subscriptionsByEventType.get(eventClass);
找到订阅者以后,依次循环,对每个订阅者进行处理:
//这里根据是否在主线程和threadmode来判断
private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
switch (subscription.subscriberMethod.threadMode) {
case POSTING:
invokeSubscriber(subscription, event);//post在什么线程就直接调用 不需要切换线程
break;
case MAIN:
if (isMainThread) {
invokeSubscriber(subscription, event);
} else {
mainThreadPoster.enqueue(subscription, event);//如果Post在主线程直接调用,反之通过handler来切换到主线程再调用反射
break;
case MAIN_ORDERED:
if (mainThreadPoster != null) {//默认走这里的逻辑和MAIN一致 事件排队等待调用,非阻塞式
mainThreadPoster.enqueue(subscription, event);
} else {
// temporary: technically not correct as poster not decoupled from subscriber
invokeSubscriber(subscription, event);
break;
case BACKGROUND:
if (isMainThread) {
backgroundPoster.enqueue(subscription, event);//如果在主线程则开启一条线程 事件将排队在同一条线程执行
} else {//如果post在子线程直接在Post线程调用
invokeSubscriber(subscription, event);
break;
case ASYNC:
asyncPoster.enqueue(subscription, event);//总是开启线程来调用
break;
default:
throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
调用 enqueue 后,用于切换线程来处理事件,最后还是会通过反射的形式进行调用。
6.10 黏性事件如何保存和发送
主要使用场景是:当订阅者尚未创建,先调用 EventBus.getDefault().postSticky() 方法发送一个 sticky 事件,该事件会被 stickyEvents 缓存起来,当订阅该事件的类调用 register() 方法时,最终会将保存的事件全部发给新注册的订阅者一份,因此,新的订阅者同样可以收到该事。
private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
//获取subsrciberMethod传递的自定义EventType参数的运行时的类
Class eventType = subscriberMethod.eventType;
//Subscription用于绑定subscriber和sucriberMethod,一个订阅者可以有多个subscriberMethod
Subscription newSubscription = new Subscription(subscriber, subscriberMethod);
//根据EventType的运行时类取到该类所有的subscriptioins,subscriptionsByEventType是HashMap中的key
CopyOnWriteArrayList subscriptions = subscriptionsByEventType.get(eventType);
if (subscriptions == null) {
subscriptions = new CopyOnWriteArrayList<>();
//若根据EventType找不到subscriptions,则eventType作key,subscriptions作value添加到subscriptionByEventType中。
subscriptionsByEventType.put(eventType, subscriptions);
} else {
if (subscriptions.contains(newSubscription)) {
//已经存在newSubscription,抛出异常该订阅者已经注册,不可重复注册同一个subscriber
throw new EventBusException("Subscriber " + subscriber.getClass() + " already registered to event "
+ eventType);
int size = subscriptions.size();
for (int i = 0; i <= size; i++) {
//循环subscriptions,根据标记优先级的priority从高到低,将新的subscription插入到subscriptions中
if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) {
subscriptions.add(i, newSubscription);
break;
//typesBySubscriber是一个HashMap,根据subscriber做key,获取该subscriber对应的所有的订阅事件的类型
List> subscribedEvents = typesBySubscriber.get(subscriber);
if (subscribedEvents == null) {
subscribedEvents = new ArrayList<>();
//该订阅者之前的订阅事件类型列表为空,则将当前订阅类型添加到typesBySubscriber中
typesBySubscriber.put(subscriber, subscribedEvents);
subscribedEvents.add(eventType);
//如果该方法被标识为sticky事件
if (subscriberMethod.sticky) {
if (eventInheritance) { eventInheritance标识是否考虑EventType的类层次结构
//循环所有的sticky黏性事件
Set, Object>> entries = stickyEvents.entrySet();
for (Map.Entry, Object> entry : entries) {
Class candidateEventType = entry.getKey();
//如果当前事件是其他事件的同类型的或者是他们的父类
if (eventType.isAssignableFrom(candidateEventType)) {
Object stickyEvent = entry.getValue();
heckPostStickyEventToSubscription(newSubscription, stickyEvent);
} else {
Object stickyEvent = stickyEvents.get(eventType);
checkPostStickyEventToSubscription(newSubscription, stickyEvent);
从上面我们可以知道,最后都会调用一个方法:
private void checkPostStickyEventToSubscription(Subscription newSubscription, Object stickyEvent) {
if (stickyEvent != null) {
// If the subscriber is trying to abort the event, it will fail (event is not tracked in posting state)
// --> Strange corner case, which we don't take care of here.
postToSubscription(newSubscription, stickyEvent, Looper.getMainLooper() == Looper.myLooper());
最后,也会调用到所有事件不管是不是黏性都会走的一个方法:
private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
//根据@subscriber中threadMode进行区分,POSTING为当前线程执行,
//MAIN为主线程,BACKGROUND为子进程,ASYNC为异步执行。
switch (subscription.subscriberMethod.threadMode) {
case POSTING:
invokeSubscriber(subscription, event);
break;
case MAIN:
if (isMainThread) {
invokeSubscriber(subscription, event);
} else {
mainThreadPoster.enqueue(subscription, event);
break;
case BACKGROUND:
if (isMainThread) {
backgroundPoster.enqueue(subscription, event);
} else {
invokeSubscriber(subscription, event);
break;
case ASYNC:
asyncPoster.enqueue(subscription, event);
break;
default:
throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
最后调用的逻辑还是一样的。
最后,附上一张 eventbus 的思维导图,帮助你们更好的去理解 eventbus。