EventBus源码研读(上)


EventBus 是一款针对Android优化的发布/订阅事件总线。主要功能是替代Intent, Handler, BroadCast 在 Fragment,Activity,Service,线程之间传递消息.优点是开销小,使用方便,可以很大程度上降低它们之间的耦合,使得我们的代码更加简洁,耦合性更低,提升我们的代码质量。
类似的库还有 Otto ,今天就带大家一起研读 EventBus 的源码.

在写这篇文章之前,我已经将本文相关的中文注释代码上传到了GitHub:https://github.com/kymjs/EventBus

第一篇 EventBus源码研读(上) http://kymjs.com/code/2015/12/12/01/
第二篇 EventBus源码研读(中) http://kymjs.com/code/2015/12/13/01/
第三篇 EventBus源码研读(下) http://kymjs.com/code/2015/12/16/01/

基础用法

在读代码之前,首先你得了解它的基本用法.如果你已经能够很熟练的使用EventBus等事件总线库了,那么你可以跳过本节.
首先引入依赖包,查看GitHub主页的说明:https://github.com/greenrobot/EventBus
在Gradle文件加入
compile 'de.greenrobot:eventbus:2.4.0'

用法与广播相同,且比广播更简单:

注册订阅者

首先你需要注册一个事件订阅者,为了方便理解你可以把他当成广播的广播接收者 你可以在任何一个类中使用如下代码注册以及解除注册

 
//把当前类注册为订阅者(接收者)
EventBus.getDefault().register(this);

//解除注册当前类(同广播一样,一定要调用,否则会内存泄露)
EventBus.getDefault().unregister(this);

注册了订阅者以后,我们需要创建一个回调方法onEvent,当我们订阅的事件发送的时候就会回调它

 
//其实命名不一定必须是onEvent(),但那属于高级用法了,这里我们只说最简单的
public void onEvent(Object event) {}

事件发送

当有了订阅者以后,我们的代码已经可以工作了.但是此时的代码是没有意义的,我们订阅的事件还没有发生. 就像广播需要一个sendBroadcast(),EventBus需要post(event)
你可以在任何一个类中使用如下代码发送事件:

 
/**
 * 这里的event类型必须和上面我们onEvent()方法的参数类型一致
 * (子父类关系也不行,必须是相同类型,原因我们下面看源码)
 */
EventBus.getDefault().post(event);

至此,EventBus就可以正常工作了.

进入源码世界

入口类EventBus类

我们从使用的流程来,首先看EventBus#getDefault()

 
public static EventBus getDefault() {
    if (defaultInstance == null) {
        synchronized (EventBus.class) {
            if (defaultInstance == null) {
                defaultInstance = new EventBus();
            }
        }
    }
    return defaultInstance;
}

只是简单的维护单例,调用构造方法,再看构造方法,调用重载的构造方法,重载的构造方法又需要一个EventBusBuilder对象

 
public EventBus() {
    this(DEFAULT_BUILDER);
}

EventBus(EventBusBuilder builder) {
}

EventBusBuilder类

看名字就知道,这个类是用来创建EventBus对象的.
开源实验室:图1

Builder类提供了这么多个可选的配置属性,这里变量含义大家直接看我的注释,就不多作解释了
我们主要来看最终的建造方法

 
/**
 * 根据参数创建对象,并赋值给EventBus.defaultInstance, 必须在默认的eventbus对象使用以前调用
 *
 * @throws EventBusException if there's already a default EventBus instance in place
 */
public EventBus installDefaultEventBus() {
    synchronized (EventBus.class) {
        if (EventBus.defaultInstance != null) {
            throw new EventBusException("Default instance already exists." +
                    " It may be only set once before it's used the first time to ensure " +
                    "consistent behavior.");
        }
        EventBus.defaultInstance = build();
        return EventBus.defaultInstance;
    }
}

/**
 * 根据参数创建对象
 */
public EventBus build() {
    return new EventBus(this);
}

EventBusBuilder类提供了两种建造方法,还记得之前的getDefault()方法吗,维护了一个单例对象,installDefaultEventBus() 方法建造的EventBus对象最终会赋值给那个单例对象,但是有一个前提就是我们之前并没有创建过那个单例对象.
这里大家思考一下,为什么如果EventBus.defaultInstance不为null以后程序要抛出异常?咱们之后说答案。
第二个方法就是默认的建造者方法了.

再回到我们的EventBus构造方法,根据提供的建造者初始化了一大堆属性
图3

我们继续看这些初始化的字段.

三个Poster类

先是一大堆Map,看不懂,跳过去,我们先来看这三个Poster,需要说明的一点就是:Poster只负责处理粘滞事件,原因我们之后看代码。

 
private final HandlerPoster mainThreadPoster; //前台发送者
private final BackgroundPoster backgroundPoster; //后台发送者
private final AsyncPoster asyncPoster;   //后台发送者(只让队列第一个待订阅者去响应)

其实从类名我们就能看出个大概了,就是三个发送事件的方法。
我们来看看他们的内部实现.
这几个Poster的设计可以说是整个EventBus的一个经典部分,越看越想继续多看几遍.

每个Poster中都有一个发送任务队列,PendingPostQueue queue;

进到队列里面再看 定义了两个节点,从字面上理解就是队列的头节点和尾节点

 
private PendingPost head; //待发送对象队列头节点
private PendingPost tail;//待发送对象队列尾节点

再看这个PendingPost类的实现:

 
//单例池,复用对象
private final static List<PendingPost> pendingPostPool = new ArrayList<PendingPost>();

Object event; //事件类型
Subscription subscription; //订阅者
PendingPost next; //队列下一个待发送对象

首先是提供了一个的设计,类似于我们的线程池,目的是为了减少对象创建的开销,当一个对象不用了,我们可以留着它,下次再需要的时候返回这个保留的而不是再去创建。
再看最后的变量,PendingPost next 非常典型的队列设计,队列中每个节点都有一个指向下一个节点的指针(sorry,数据结构用C学的)。

 
/**
 * 首先检查复用池中是否有可用,如果有则返回复用,否则返回一个新的
 *
 * @param subscription 订阅者
 * @param event        订阅事件
 * @return 待发送对象
 */
static PendingPost obtainPendingPost(Subscription subscription, Object event) {
    synchronized (pendingPostPool) {
        int size = pendingPostPool.size();
        if (size > 0) {
            PendingPost pendingPost = pendingPostPool.remove(size - 1);
            pendingPost.event = event;
            pendingPost.subscription = subscription;
            pendingPost.next = null;
            return pendingPost;
        }
    }
    return new PendingPost(event, subscription);
}
 
/**
 * 回收一个待发送对象,并加入复用池
 *
 * @param pendingPost 待回收的待发送对象
 */
static void releasePendingPost(PendingPost pendingPost) {
    pendingPost.event = null;
    pendingPost.subscription = null;
    pendingPost.next = null;
    synchronized (pendingPostPool) {
        // 防止池无限增长
        if (pendingPostPool.size() < 10000) {
            pendingPostPool.add(pendingPost);
        }
    }
}

obtainPendingPost(),对池复用的实现,每次新创建的节点尾指针都为 null 。
releasePendingPost(),回收pendingPost对象,既然有从池中取,当然需要有存。这里,原作非常细心的加了一次判断,if (pendingPostPool.size() < 10000) 其实我觉得10000都很大了,1000就够了,我们一次只可能创建一个pendingPost,如果ArrayList里面存了上千条都没有取走,那么肯定是使用出错了。

PendingPost的代码我们就看完了,再回到上一级,队列的设计:

接着是PendingPostQueue的入队方法

 
synchronized void enqueue(PendingPost pendingPost) {
	...
    if (tail != null) {
        tail.next = pendingPost;
        tail = pendingPost;
    } else if (head == null) {
        head = tail = pendingPost;
    } 
    ...
}

首先将当前节点的上一个节点(入队前整个队列的最后一个节点)的尾指针指向当期正在入队的节点(传入的参数pendingPost),并将队列的尾指针指向自己(自己变成队列的最后一个节点),这样就完成了入队。
如果是队列的第一个元素(队列之前是空的),那么直接将队列的头尾两个指针都指向自身就行了。
出队也是类似的队列指针操作

 
synchronized PendingPost poll() {
    PendingPost pendingPost = head;
    if (head != null) {
        head = head.next;
        if (head == null) {
            tail = null;
        }
    }
    return pendingPost;
}

首先将出队前的头节点保留一个临时变量(它就是要出队的节点),拿到这个将要出队的临时变量的下一个节点指针,将出队前的第二个元素(出队后的第一个元素)的赋值为现在队列的头节点,出队完成。
值得提一点的就是,PendingPostQueue的所有方法都声明了synchronized,这意味着在多线程下它依旧可以正常工作,细想想这也是必须的,对吗?

再回到上一级,接着是HandlerPoster的入队方法enqueue(),

 
/**
 * @param subscription 订阅者
 * @param event        订阅事件
 */
void enqueue(Subscription subscription, Object event) {
    PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
    synchronized (this) {
        queue.enqueue(pendingPost);
        if (!handlerActive) {
            handlerActive = true;
            if (!sendMessage(obtainMessage())) {
                throw new EventBusException("Could not send handler message");
            }
        }
    }
}

入队方法会根据参数创建 待发送对象 pendingPost 并加入队列,如果此时 handleMessage() 没有在运行中,则发送一条空消息让 handleMessage 响应
接着是handleMessage()方法

 
@Override
public void handleMessage(Message msg) {
    boolean rescheduled = false;
    try {
        long started = SystemClock.uptimeMillis();
        while (true) {
            PendingPost pendingPost = queue.poll();
            if (pendingPost == null) {
                synchronized (this) {
                    // 双重校验,类似单例中的实现
                    pendingPost = queue.poll();
                    if (pendingPost == null) {
                        handlerActive = false;
                        return;
                    }
                }
            }
            //如果订阅者没有取消注册,则分发消息
            eventBus.invokeSubscriber(pendingPost);
            
            //如果在一定时间内仍然没有发完队列中所有的待发送者,则退出
            long timeInMethod = SystemClock.uptimeMillis() - started;
            if (timeInMethod >= maxMillisInsideHandleMessage) {
                if (!sendMessage(obtainMessage())) {
                    throw new EventBusException("Could not send handler message");
                }
                rescheduled = true;
                return;
            }
        }
    } finally {
        handlerActive = rescheduled;
    }
}

handleMessage()不停的在待发送队列queue中去取消息。 需要说明的是在循环之外有个临时boolean变量rescheduled,最后是通过这个值去修改了handlerActive。而 handlerActive 是用来判断当前queue中是否有正在发送对象的任务,看到上面的入队方法enqueue(),如果已经有任务在跑着了,就不需要再去sendMessage()唤起我们的handleMessage()

最终通过eventBus对象的invokeSubscriber()最终发送出去,并回收这个pendingPost,让注册了的订阅者去响应(相当于回调),至于这个发送方法,我们之后再看。

看完了HandlePoster类,另外两个异步的发送者实现代码也差不多,唯一的区别就是另外两个是工作在异步,实现的Runnable接口,大家自己类比,这里就不帖代码了.

Poster工作原理

最后我们再来回顾一下PosterPendingPostQueuePendingPost这三个类,再看看下面这张图,是不是有种似曾相识的感觉。

开源实验室:图4

啊哈,那是HandleMessageLooper的工作原理,再看看Poster的
开源实验室:图5

至此,整个EventBus源码的发送接收核心部分已经分析完了。
还记得上面我们留下的那几个问题吗:
1、为什么如果EventBus.defaultInstance不为null以后程序要抛出异常?
2、Poster只对粘滞事件有效的说明代码在哪。
3、invokeSubscriber()最终的发送怎么实现的。
接下来我们继续分析它的注册流程以及粘滞事件的设计(那又是一个经典的地方)。

第二篇【EventBus源码研读(中)】已更新:http://kymjs.com/code/2015/12/13/01/



EventBus源码研读(上)


EventBus 是一款针对Android优化的发布/订阅事件总线。主要功能是替代Intent, Handler, BroadCast 在 Fragment,Activity,Service,线程之间传递消息.优点是开销小,使用方便,可以很大程度上降低它们之间的耦合,使得我们的代码更加简洁,耦合性更低,提升我们的代码质量。
类似的库还有 Otto ,今天就带大家一起研读 EventBus 的源码.

在写这篇文章之前,我已经将本文相关的中文注释代码上传到了GitHub:https://github.com/kymjs/EventBus

第一篇 EventBus源码研读(上) http://kymjs.com/code/2015/12/12/01/
第二篇 EventBus源码研读(中) http://kymjs.com/code/2015/12/13/01/
第三篇 EventBus源码研读(下) http://kymjs.com/code/2015/12/16/01/

基础用法

在读代码之前,首先你得了解它的基本用法.如果你已经能够很熟练的使用EventBus等事件总线库了,那么你可以跳过本节.
首先引入依赖包,查看GitHub主页的说明:https://github.com/greenrobot/EventBus
在Gradle文件加入
compile 'de.greenrobot:eventbus:2.4.0'

用法与广播相同,且比广播更简单:

注册订阅者

首先你需要注册一个事件订阅者,为了方便理解你可以把他当成广播的广播接收者 你可以在任何一个类中使用如下代码注册以及解除注册

 
//把当前类注册为订阅者(接收者)
EventBus.getDefault().register(this);

//解除注册当前类(同广播一样,一定要调用,否则会内存泄露)
EventBus.getDefault().unregister(this);

注册了订阅者以后,我们需要创建一个回调方法onEvent,当我们订阅的事件发送的时候就会回调它

 
//其实命名不一定必须是onEvent(),但那属于高级用法了,这里我们只说最简单的
public void onEvent(Object event) {}

事件发送

当有了订阅者以后,我们的代码已经可以工作了.但是此时的代码是没有意义的,我们订阅的事件还没有发生. 就像广播需要一个sendBroadcast(),EventBus需要post(event)
你可以在任何一个类中使用如下代码发送事件:

 
/**
 * 这里的event类型必须和上面我们onEvent()方法的参数类型一致
 * (子父类关系也不行,必须是相同类型,原因我们下面看源码)
 */
EventBus.getDefault().post(event);

至此,EventBus就可以正常工作了.

进入源码世界

入口类EventBus类

我们从使用的流程来,首先看EventBus#getDefault()

 
public static EventBus getDefault() {
    if (defaultInstance == null) {
        synchronized (EventBus.class) {
            if (defaultInstance == null) {
                defaultInstance = new EventBus();
            }
        }
    }
    return defaultInstance;
}

只是简单的维护单例,调用构造方法,再看构造方法,调用重载的构造方法,重载的构造方法又需要一个EventBusBuilder对象

 
public EventBus() {
    this(DEFAULT_BUILDER);
}

EventBus(EventBusBuilder builder) {
}

EventBusBuilder类

看名字就知道,这个类是用来创建EventBus对象的.
开源实验室:图1

Builder类提供了这么多个可选的配置属性,这里变量含义大家直接看我的注释,就不多作解释了
我们主要来看最终的建造方法

 
/**
 * 根据参数创建对象,并赋值给EventBus.defaultInstance, 必须在默认的eventbus对象使用以前调用
 *
 * @throws EventBusException if there's already a default EventBus instance in place
 */
public EventBus installDefaultEventBus() {
    synchronized (EventBus.class) {
        if (EventBus.defaultInstance != null) {
            throw new EventBusException("Default instance already exists." +
                    " It may be only set once before it's used the first time to ensure " +
                    "consistent behavior.");
        }
        EventBus.defaultInstance = build();
        return EventBus.defaultInstance;
    }
}

/**
 * 根据参数创建对象
 */
public EventBus build() {
    return new EventBus(this);
}

EventBusBuilder类提供了两种建造方法,还记得之前的getDefault()方法吗,维护了一个单例对象,installDefaultEventBus() 方法建造的EventBus对象最终会赋值给那个单例对象,但是有一个前提就是我们之前并没有创建过那个单例对象.
这里大家思考一下,为什么如果EventBus.defaultInstance不为null以后程序要抛出异常?咱们之后说答案。
第二个方法就是默认的建造者方法了.

再回到我们的EventBus构造方法,根据提供的建造者初始化了一大堆属性
图3

我们继续看这些初始化的字段.

三个Poster类

先是一大堆Map,看不懂,跳过去,我们先来看这三个Poster,需要说明的一点就是:Poster只负责处理粘滞事件,原因我们之后看代码。

 
private final HandlerPoster mainThreadPoster; //前台发送者
private final BackgroundPoster backgroundPoster; //后台发送者
private final AsyncPoster asyncPoster;   //后台发送者(只让队列第一个待订阅者去响应)

其实从类名我们就能看出个大概了,就是三个发送事件的方法。
我们来看看他们的内部实现.
这几个Poster的设计可以说是整个EventBus的一个经典部分,越看越想继续多看几遍.

每个Poster中都有一个发送任务队列,PendingPostQueue queue;

进到队列里面再看 定义了两个节点,从字面上理解就是队列的头节点和尾节点

 
private PendingPost head; //待发送对象队列头节点
private PendingPost tail;//待发送对象队列尾节点

再看这个PendingPost类的实现:

 
//单例池,复用对象
private final static List<PendingPost> pendingPostPool = new ArrayList<PendingPost>();

Object event; //事件类型
Subscription subscription; //订阅者
PendingPost next; //队列下一个待发送对象

首先是提供了一个的设计,类似于我们的线程池,目的是为了减少对象创建的开销,当一个对象不用了,我们可以留着它,下次再需要的时候返回这个保留的而不是再去创建。
再看最后的变量,PendingPost next 非常典型的队列设计,队列中每个节点都有一个指向下一个节点的指针(sorry,数据结构用C学的)。

 
/**
 * 首先检查复用池中是否有可用,如果有则返回复用,否则返回一个新的
 *
 * @param subscription 订阅者
 * @param event        订阅事件
 * @return 待发送对象
 */
static PendingPost obtainPendingPost(Subscription subscription, Object event) {
    synchronized (pendingPostPool) {
        int size = pendingPostPool.size();
        if (size > 0) {
            PendingPost pendingPost = pendingPostPool.remove(size - 1);
            pendingPost.event = event;
            pendingPost.subscription = subscription;
            pendingPost.next = null;
            return pendingPost;
        }
    }
    return new PendingPost(event, subscription);
}
 
/**
 * 回收一个待发送对象,并加入复用池
 *
 * @param pendingPost 待回收的待发送对象
 */
static void releasePendingPost(PendingPost pendingPost) {
    pendingPost.event = null;
    pendingPost.subscription = null;
    pendingPost.next = null;
    synchronized (pendingPostPool) {
        // 防止池无限增长
        if (pendingPostPool.size() < 10000) {
            pendingPostPool.add(pendingPost);
        }
    }
}

obtainPendingPost(),对池复用的实现,每次新创建的节点尾指针都为 null 。
releasePendingPost(),回收pendingPost对象,既然有从池中取,当然需要有存。这里,原作非常细心的加了一次判断,if (pendingPostPool.size() < 10000) 其实我觉得10000都很大了,1000就够了,我们一次只可能创建一个pendingPost,如果ArrayList里面存了上千条都没有取走,那么肯定是使用出错了。

PendingPost的代码我们就看完了,再回到上一级,队列的设计:

接着是PendingPostQueue的入队方法

 
synchronized void enqueue(PendingPost pendingPost) {
	...
    if (tail != null) {
        tail.next = pendingPost;
        tail = pendingPost;
    } else if (head == null) {
        head = tail = pendingPost;
    } 
    ...
}

首先将当前节点的上一个节点(入队前整个队列的最后一个节点)的尾指针指向当期正在入队的节点(传入的参数pendingPost),并将队列的尾指针指向自己(自己变成队列的最后一个节点),这样就完成了入队。
如果是队列的第一个元素(队列之前是空的),那么直接将队列的头尾两个指针都指向自身就行了。
出队也是类似的队列指针操作

 
synchronized PendingPost poll() {
    PendingPost pendingPost = head;
    if (head != null) {
        head = head.next;
        if (head == null) {
            tail = null;
        }
    }
    return pendingPost;
}

首先将出队前的头节点保留一个临时变量(它就是要出队的节点),拿到这个将要出队的临时变量的下一个节点指针,将出队前的第二个元素(出队后的第一个元素)的赋值为现在队列的头节点,出队完成。
值得提一点的就是,PendingPostQueue的所有方法都声明了synchronized,这意味着在多线程下它依旧可以正常工作,细想想这也是必须的,对吗?

再回到上一级,接着是HandlerPoster的入队方法enqueue(),

 
/**
 * @param subscription 订阅者
 * @param event        订阅事件
 */
void enqueue(Subscription subscription, Object event) {
    PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
    synchronized (this) {
        queue.enqueue(pendingPost);
        if (!handlerActive) {
            handlerActive = true;
            if (!sendMessage(obtainMessage())) {
                throw new EventBusException("Could not send handler message");
            }
        }
    }
}

入队方法会根据参数创建 待发送对象 pendingPost 并加入队列,如果此时 handleMessage() 没有在运行中,则发送一条空消息让 handleMessage 响应
接着是handleMessage()方法

 
@Override
public void handleMessage(Message msg) {
    boolean rescheduled = false;
    try {
        long started = SystemClock.uptimeMillis();
        while (true) {
            PendingPost pendingPost = queue.poll();
            if (pendingPost == null) {
                synchronized (this) {
                    // 双重校验,类似单例中的实现
                    pendingPost = queue.poll();
                    if (pendingPost == null) {
                        handlerActive = false;
                        return;
                    }
                }
            }
            //如果订阅者没有取消注册,则分发消息
            eventBus.invokeSubscriber(pendingPost);
            
            //如果在一定时间内仍然没有发完队列中所有的待发送者,则退出
            long timeInMethod = SystemClock.uptimeMillis() - started;
            if (timeInMethod >= maxMillisInsideHandleMessage) {
                if (!sendMessage(obtainMessage())) {
                    throw new EventBusException("Could not send handler message");
                }
                rescheduled = true;
                return;
            }
        }
    } finally {
        handlerActive = rescheduled;
    }
}

handleMessage()不停的在待发送队列queue中去取消息。 需要说明的是在循环之外有个临时boolean变量rescheduled,最后是通过这个值去修改了handlerActive。而 handlerActive 是用来判断当前queue中是否有正在发送对象的任务,看到上面的入队方法enqueue(),如果已经有任务在跑着了,就不需要再去sendMessage()唤起我们的handleMessage()

最终通过eventBus对象的invokeSubscriber()最终发送出去,并回收这个pendingPost,让注册了的订阅者去响应(相当于回调),至于这个发送方法,我们之后再看。

看完了HandlePoster类,另外两个异步的发送者实现代码也差不多,唯一的区别就是另外两个是工作在异步,实现的Runnable接口,大家自己类比,这里就不帖代码了.

Poster工作原理

最后我们再来回顾一下PosterPendingPostQueuePendingPost这三个类,再看看下面这张图,是不是有种似曾相识的感觉。

开源实验室:图4

啊哈,那是HandleMessageLooper的工作原理,再看看Poster的
开源实验室:图5

至此,整个EventBus源码的发送接收核心部分已经分析完了。
还记得上面我们留下的那几个问题吗:
1、为什么如果EventBus.defaultInstance不为null以后程序要抛出异常?
2、Poster只对粘滞事件有效的说明代码在哪。
3、invokeSubscriber()最终的发送怎么实现的。
接下来我们继续分析它的注册流程以及粘滞事件的设计(那又是一个经典的地方)。

第二篇【EventBus源码研读(中)】已更新:http://kymjs.com/code/2015/12/13/01/