class Temp {
    suspend fun fetchData(argument: String): Boolean {
        val result = netRequest(argument)
        return result == 0
    // 模拟网络请求
    suspend fun netRequest(argument: String): Int {
        delay(1000)
        return argument.length

这两个方法都使用了suspend关键字修饰,我们将这个文件的字节码反编译为等同效果的Java代码:

public final class Temp {
   @Nullable
   public final Object fetchData(@NotNull String argument, @NotNull Continuation var2) {
      Object $continuation;
      label25: {
         if (var2 instanceof <undefinedtype>) {
            $continuation = (<undefinedtype>)var2;
            if ((((<undefinedtype>)$continuation).label & Integer.MIN_VALUE) != 0) {
               ((<undefinedtype>)$continuation).label -= Integer.MIN_VALUE;
               break label25;
         $continuation = new ContinuationImpl(var2) {
            // $FF: synthetic field
            Object result;
            int label;
            @Nullable
            public final Object invokeSuspend(@NotNull Object $result) {
               this.result = $result;
               this.label |= Integer.MIN_VALUE;
               return Temp.this.fetchData((String)null, this);
      Object $result = ((<undefinedtype>)$continuation).result;
      Object var6 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
      Object var10000;
      switch(((<undefinedtype>)$continuation).label) {
      case 0:
         ResultKt.throwOnFailure($result);
         ((<undefinedtype>)$continuation).label = 1;
         var10000 = this.netRequest(argument, (Continuation)$continuation);
         if (var10000 == var6) {
            return var6;
         break;
      case 1:
         ResultKt.throwOnFailure($result);
         var10000 = $result;
         break;
      default:
         throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
      int result = ((Number)var10000).intValue();
      return Boxing.boxBoolean(result == 0);
   @Nullable
   public final Object netRequest(@NotNull String argument, @NotNull Continuation var2) {
      Object $continuation;
      label20: {
         if (var2 instanceof <undefinedtype>) {
            $continuation = (<undefinedtype>)var2;
            if ((((<undefinedtype>)$continuation).label & Integer.MIN_VALUE) != 0) {
               ((<undefinedtype>)$continuation).label -= Integer.MIN_VALUE;
               break label20;
         $continuation = new ContinuationImpl(var2) {
            // $FF: synthetic field
            Object result;
            int label;
            Object L$0;
            @Nullable
            public final Object invokeSuspend(@NotNull Object $result) {
               this.result = $result;
               this.label |= Integer.MIN_VALUE;
               return Temp.this.netRequest((String)null, this);
      Object $result = ((<undefinedtype>)$continuation).result;
      Object var5 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
      switch(((<undefinedtype>)$continuation).label) {
      case 0:
         ResultKt.throwOnFailure($result);
         ((<undefinedtype>)$continuation).L$0 = argument;
         ((<undefinedtype>)$continuation).label = 1;
         if (DelayKt.delay(1000L, (Continuation)$continuation) == var5) {
            return var5;
         break;
      case 1:
         argument = (String)((<undefinedtype>)$continuation).L$0;
         ResultKt.throwOnFailure($result);
         break;
      default:
         throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
      return Boxing.boxInt(argument.length());

几行协程相关的代码,竟然对应了这么多的Java代码,可见kotlin编译器为我们做了很多事情

上面代码的可读性不高,例如有<undefinedtype>这种未定义的类型,我使用jd-guiTemp.class文件再进行了一次反编译,获取到了更多信息,我将上面的反编译的一大串代码和jd-gui反编译获取的信息进行整合,并且对一些类和变量进行适当的重命名,得出信息更完整且可读性更高的「Temp.class反编译后对应的Java代码」,首先是fetchData相关的:

public final Object fetchData(@NotNull String argument,@NotNull Continuation completion) {
    Object $continuation;
    label25:
        if (completion instanceof FetchDataStateMachine) {
            $continuation = (FetchDataStateMachine) completion;
            if (($continuation.label & Integer.MIN_VALUE) != 0) {
                $continuation.label -= Integer.MIN_VALUE;
                break label25;
        $continuation = new FetchDataStateMachine(completion);
    Object $result = $continuation.result;
    Object resultTemp;
    switch ($continuation.label) {
        case 0:
            ResultKt.throwOnFailure($result);
            $continuation.label = 1;
            resultTemp = this.netRequest(argument, (Continuation) $continuation);
            if (resultTemp == COROUTINE_SUSPENDED) {
                return COROUTINE_SUSPENDED;
            break;
        case 1:
            ResultKt.throwOnFailure($result);
            resultTemp = $result;
            break;
        default:
            throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
    int result = ((Number) resultTemp).intValue();
    return Boxing.boxBoolean(result == 0);
static final class FetchDataStateMachine extends ContinuationImpl {
    Object result;
    int label;
    FetchDataStateMachine(Continuation $completion) {
        super($completion);
    @Nullable
    public final Object invokeSuspend(@NotNull Object $result) {
        this.result = $result;
        this.label |= Integer.MIN_VALUE;
        return Temp.this.fetchData(null, (Continuation<? super Boolean>) this);

netRequest相关的代码,与fetchData相关的代码,在结构和形式上类似:

public final Object netRequest(@NotNull String argument,@NotNull Continuation completion) {
    Object $continuation;
    label20:
        if (completion instanceof NetRequestStateMachine) {
            $continuation = (NetRequestStateMachine) completion;
            if (($continuation.label & Integer.MIN_VALUE) != 0) {
                $continuation.label -= Integer.MIN_VALUE;
                break label20;
        $continuation = new NetRequestStateMachine(completion);
    Object $result = $continuation.result;
    switch ($continuation.label) {
        case 0:
            ResultKt.throwOnFailure($result);
            $continuation.functionParameter = argument;
            $continuation.label = 1;
            if (DelayKt.delay(1000L, (Continuation) $continuation) == COROUTINE_SUSPENDED) {
                return COROUTINE_SUSPENDED;
            break;
        case 1:
            argument = (String) ($continuation.functionParameter);
            ResultKt.throwOnFailure($result);
            break;
        default:
            throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
    return Boxing.boxInt(argument.length());
static final class NetRequestStateMachine extends ContinuationImpl {
    Object result;
    int label;
    Object functionParameter;
    NetRequestStateMachine(Continuation $completion) {
        super($completion);
    @Nullable
    public final Object invokeSuspend(@NotNull Object $result) {
        this.result = $result;
        this.label |= Integer.MIN_VALUE;
        return Temp.this.netRequest(null, (Continuation<? super Integer>) this);

可以发现,反编译后的Java代码中,fetchDatanetRequest方法都多了一个Continuation completion参数,这是Kotlin Compiler帮我们做的,对于suspend修饰的函数,编译的时候Kotlin Compiler会帮我们在该函数中传入一个Continuation参数,使用Continuation参数代替了suspend修饰符,这个参数有什么含义呢?

续体是理解协程工作原理的一个关键。

先看传统的网络请求:

data class User(val id: Long, val name: String)
interface Callback {
    fun success(user: User)
    fun failure(t: Throwable)
class Model {
    fun getUserInfo(callback: Callback) {
        Thread.sleep(1000) // 模拟网络请求
        callback.success(User(1, "giagor"))
class Business {
    val model = Model()
    fun getUserInfo() {
        model.getUserInfo(object : Callback {
            override fun success(user: User) {
                showMsg(user.toString())
            override fun failure(t: Throwable) {
                showMsg(t.message ?: "")
    fun showMsg(msg: String) {
        // ...

在使用Model进行网络请求的时候,使用Callback接收网络请求的结果,我们这时候可以将Callback看作一个续体,即网络请求的续体,用于接收网络请求的结果。

在协程中使用Continuation接口表示一个续体,它代表一个挂起点之后的延续,即 挂起点之后的剩余应执行的代码

public interface Continuation<in T> {
    // 与该续体对应的协程的上下文
    public val context: CoroutineContext
    // 恢复对应协程的执行,并且传递一个表示成功或失败的result作为最后一个挂起点的返回值
    public fun resumeWith(result: Result<T>)

Kotlin 1.3,也有可以方便地调用resumeWith的扩展函数:

public inline fun <T> Continuation<T>.resume(value: T): Unit =
    resumeWith(Result.success(value))
public inline fun <T> Continuation<T>.resumeWithException(exception: Throwable): Unit =
    resumeWith(Result.failure(exception))

正如前面所说,对于suspend修饰的函数,Kotlin Compiler会帮我们在该函数中传入一个Continuation参数,使用Continuation参数代替了suspend修饰符,通过Continuation参数,Kotlin Compiler可以将我们的协程代码转化为等价的回调代码,也就是说,Kt编译器帮我们写好了那些回调的代码,至于怎么帮我们写的后面会分析,这种通过传递Continuation来控制异步调用流程被称作CPS变换(Continuation-Passing-Style Transformation

fetchData函数编译时会生成下面的一个静态内部类(续体):

static final class FetchDataStateMachine extends ContinuationImpl {
    Object result;
    int label;
    FetchDataStateMachine(Continuation $completion) {
        super($completion);
    @Nullable
    public final Object invokeSuspend(@NotNull Object $result) {
        this.result = $result;
        this.label |= Integer.MIN_VALUE;
        return Temp.this.fetchData(null, (Continuation<? super Boolean>) this);

FetchDataStateMachine的继承关系如下:

FetchDataStateMachine -> ContinuationImpl -> BaseContinuationImpl -> Continuation

FetchDataStateMachine接收一个名称为$completionContinuation参数,$completion被保存在父类BaseContinuationImpl中:

internal abstract class BaseContinuationImpl(
    public val completion: Continuation<Any?>?
) : Continuation<Any?>, CoroutineStackFrame, Serializable {...}

通过$completion可以将fetchData函数的执行结果传递回给调用fetchData的函数,有了$completion,才有能力去实现回调

状态机FetchDataStateMachine声明了resultlabel两个变量

  • result表示上一个Continuation的结果,比如有函数AB,函数内部分别声明了ContinuationAContinuationBA调用B并且将ContinuationA传入B中保存。在后续回调的过程中,ContinuationA可以从result变量中拿到ContinuationB::invokeSuspend的执行结果
  • labelKotlin Compiler可以识别函数内部哪个地方会挂起,每一个挂起点(suspension point)被表示为状态机的一个状态(state),这些状态通过switch case语句表示出来。label表示当前应该执行状态机的哪一个状态,具体来说就是要进入哪一个case,通过label变量就记录下了状态机当前的状态
  • 再看下fetchData的前半部分代码:

    public final Object fetchData(@NotNull String argument,@NotNull Continuation completion) {
        Object $continuation;
        label25:
            if (completion instanceof FetchDataStateMachine) {
                $continuation = (FetchDataStateMachine) completion;
                if (($continuation.label & Integer.MIN_VALUE) != 0) {
                    $continuation.label -= Integer.MIN_VALUE;
                    break label25;
            $continuation = new FetchDataStateMachine(completion);
    

    它会判断传入的completion是否为FetchDataStateMachine类型,若是则对它的label变量做些操作,若不是则直接创建一个FetchDataStateMachine并且传入completioncompletion会被保存下来)。

    再看下fetchData的后半部分代码:

    public final Object fetchData(@NotNull String argument,@NotNull Continuation completion) {
        Object $continuation;
        Object $result = $continuation.result;
        Object resultTemp;
        switch ($continuation.label) {
            case 0:
                ResultKt.throwOnFailure($result);
                $continuation.label = 1;
                resultTemp = this.netRequest(argument, (Continuation) $continuation);
                if (resultTemp == COROUTINE_SUSPENDED) {
                    return COROUTINE_SUSPENDED;
                break;
            case 1:
                ResultKt.throwOnFailure($result);
                resultTemp = $result;
                break;
            default:
                throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
        int result = ((Number) resultTemp).intValue();
        return Boxing.boxBoolean(result == 0);
    

    fetchData方法原先的代码语句会被划分为switch下的多个case语句,在这里就是

    FetchDataStateMachine中的label变量就是控制当前要执行哪个case分支。

    可见,函数与续体构成了一个有限状态机(FSM,即 Finite-State Machine),来控制协程代码的执行

    何为「非阻塞式挂起」?

    netRequest方法中,调用了delay(1000)挂起了当前的协程,简单看下delay方法反编译后的代码:

    public static final Object delay(long timeMillis, @NotNull Continuation $completion) {
        if (timeMillis <= 0L) {
            return Unit.INSTANCE;
        } else {
            // 实现类
            CancellableContinuationImpl cancellableContinuationImpl = new CancellableContinuationImpl(IntrinsicsKt.intercepted($completion), 1);
            cancellableContinuationImpl.initCancellability();
            // 向上转型
            CancellableContinuation cont = (CancellableContinuation)cancellableContinuationImpl;
            if (timeMillis < Long.MAX_VALUE) {
                // 延时操作
                getDelay(cont.getContext()).scheduleResumeAfterDelay(timeMillis, cont);
    		// 获取执行结果
            Object result = cancellableContinuationImpl.getResult();
            if (result == COROUTINE_SUSPENDED) {
                DebugProbesKt.probeCoroutineSuspended($completion);
    		// 返回结果
            return result;
    

    在该方法里会执行延时操作,如果需要挂起,就会返回COROUTINE_SUSPENDED值给调用者。

    结合fetchDatanetRequestdelay反编译的代码,我们可以得出下面的这个调用图:

    图中红色的线表示函数返回COROUTINE_SUSPENDED,需要挂起。当delay方法需要挂起的时候,它返回COROUTINE_SUSPENDED,接着netRequest方法返回COROUTINE_SUSPENDED,接着fetchData方法返回COROUTINE_SUSPENDED,重复这个过程直到调用栈的最上层。

    通过这种「结束方法调用」的方式,让协程暂时不在这个线程上面执行,让线程可以去处理其它的任务(包括执行其它的协程),这也就是为什么协程的挂起不会阻塞当前的线程,这也是「非阻塞式挂起」的由来

    如何恢复?

    既然协程挂起了,那就有相应的协程的恢复。先说结论:协程恢复的实质是对续体进行回调

    暂时还没有研究delay函数的具体实现,但是delay函数会在某个子线程执行等待操作,等延时时间到达之后,就会调用传给delay函数的$completionresumeWith方法,也就是调用NetRequestStateMachineresumeWith方法NetRequestStateMachine的继承关系、父类如下:

    NetRequestStateMachine -> ContinuationImpl -> BaseContinuationImpl -> Continuation
    

    BaseContinuationImpl目前是我们分析的一个重点,它主要做了下面的几件事情:

  • 保存completion:它保存了fetchData方法的FetchDataStateMachine实例,使得可以一级一级地向上回调续体。
  • 重写resumeWith方法:BaseContinuationImpl重写了Continuation接口的resumeWith方法,该方法用于恢复协程,也是协程恢复的核心逻辑。
  • 我们查看BaseContinuationImpl类的定义:

    internal abstract class BaseContinuationImpl(
        public val completion: Continuation<Any?>?
    ) : Continuation<Any?>, CoroutineStackFrame, Serializable {
        // This implementation is final. This fact is used to unroll resumeWith recursion.
        public final override fun resumeWith(result: Result<Any?>) {
            // This loop unrolls recursion in current.resumeWith(param) to make saner and shorter stack traces on resume
            var current = this
            var param = result
            while (true) {
                // 在每个恢复的continuation进行调试探测,使得调试库可以精确跟踪挂起的调用栈中哪些部分
                // 已经恢复了。
                probeCoroutineResumed(current)
                with(current) {
                    val completion = completion!! // fail fast when trying to resume continuation without completion
                    val outcome: Result<Any?> =
                        try {
                            val outcome = invokeSuspend(param)
                            if (outcome === COROUTINE_SUSPENDED) return
                            Result.success(outcome)
                        } catch (exception: Throwable) {
                            Result.failure(exception)
                    releaseIntercepted() // this state machine instance is terminating
                    if (completion is BaseContinuationImpl) {
                        // unrolling recursion via loop
                        current = completion
                        param = outcome
                    } else {
                        // top-level completion reached -- invoke and return
                        completion.resumeWith(outcome)
                        return
        protected abstract fun invokeSuspend(result: Result<Any?>): Any?
        protected open fun releaseIntercepted() {
            // does nothing here, overridden in ContinuationImpl
    

    重点是resumeWith方法的实现,它在一个while(true)循环下面执行回调的逻辑。我们结合前面给出的fetchDatanetRequest反编译后的代码,看看delay函数的延时时间到达时调用NetRequestStateMachineresumeWith方法,后续的执行流程是怎样的:

  • 执行NetRequestStateMachine父类BaseContinuationImplresumeWith方法。
  • 执行当前续体也就是NetRequestStateMachineinvokeSuspend方法(NetRequestStateMachine有实现该方法,忘记了的话可以回头看看之前的反编译代码)。
  • NetRequestStateMachineinvokeSuspend方法调用了netRequest方法,并且将续体自身作为参数传入。
  • netRequest方法中,由于completion的类型就是NetRequestStateMachine因此可以直接使用该续体,不用像之前第一次进入netRequest方法那样需要创建一个新的续体。此时续体的label值为1,于是进入netRequestcase 1语句分支。
  • 实际上这个过程有对续体的label进行一些运算转化的操作,但是最终label的值都是1,做的运算转化操作不影响我们的分析,因此并不是重点

  • 从续体中取出一开始传入netRequest方法的参数,也就是argument,返回argument.length。为了方便后面阐述,这里将该返回值argument.length记为netRequest-Return
  • 接着netRequest方法结束,NetRequestStateMachine::invokeSuspend方法也执行结束,netRequest-Return也作为invokeSuspend方法的返回值,该返回值会传递到BaseContinuationImplresumeWith方法中,在resumeWith方法中,将netRequest-Return包装为Result保存到outcome变量中。
  • 判断NetRequestStateMachine持有的completion是否为BaseContinuationImpl类型,我们知道它持有的实例其实就是FetchDataStateMachine,因此肯定是BaseContinuationImpl,于是进行了变量的更新
  •     // 把current更新为FetchDataStateMachine实例
    	current = completion
    	// 把param更新为outcome(包装了netRequest-Return的Result)
        param = outcome
    

    通过这种方式,其实就可以实现回调,我们继续往后看。

  • 继续进行下一轮while循环,在with块中会执行FetchDataStateMachine::invokeSuspend,在invokeSuspend里,将传入的参数param保存到result变量里(其实这和传统的回调类似,传统的回调中也是要将下层的执行结果回调给上层),接着调用了fetchData方法。
  • fetchData方法中,由于传入的completion已是FetchDataStateMachine类型,因此无需再去创建新的续体。由于此时续体label的值为1,所以会进入case 1语句,并且将netRequest方法的执行结果保存在resultTemp变量中,最终fetchData方法结束并返回结果result == 0,为了方便阐述,将fetchData方法的执行结果记为fetchData-Return
  • FetchDataStateMachine::invokeSuspend方法也会结束并返回fetchData-Return,然后在BaseContinuationImplresumeWith方法中将fetchData-Return包装为Result。然后会判断FetchDataStateMachine持有的completion是否为BaseContinuationImpl类型。
  • 代码的后续走向,我们目前是不清楚的,我们得知道在协程中调用fetchData方法的时候会做些什么,才能清楚后续的代码走向
  • 从上面的流程分析中,我们对协程的恢复有了一个基本的认识,下面给出流程图进行总结:

    再看看上面续体的调用过程,其实就是层层往上地调用续体的invokeSuspend方法,从过程来看有点像递归调用,但是BaseContinuationImpl::resumeWith的实现却和递归不太一样,它的实现是在while(true)循环中,对续体调用一次invokeSuspend方法,然后记录它的返回结果,将这个返回结果作为下一个续体invokeSuspend的方法参数。

    简单来讲,就是在调用一个续体的invokeSuspend方法,待这个方法执行结束后,再调用下一个续体的invokeSuspend方法。这样做的一个原因是避免调用栈过深,在BaseContinuationImpl::resumeWith也有相关的注释说明:

    This loop unrolls recursion in current.resumeWith(param) to make saner and shorter stack traces on resume
    

    我们在一个协程中去调用fetchData方法:

    class Temp2 {
        fun execute() {
            GlobalScope.launch(Dispatchers.Main) {
                Temp().fetchData("argument")
    

    通过launch方法可以启动一个协程,其源码如下:

    public fun CoroutineScope.launch(
        context: CoroutineContext = EmptyCoroutineContext,
        start: CoroutineStart = CoroutineStart.DEFAULT,
        block: suspend CoroutineScope.() -> Unit
    ): Job {
        val newContext = newCoroutineContext(context)
        val coroutine = if (start.isLazy)
            LazyStandaloneCoroutine(newContext, block) else
            StandaloneCoroutine(newContext, active = true)
        coroutine.start(start, coroutine, block)
        return coroutine
    

    协程中的代码会被包装为一个block,默认情况下会创建一个StandaloneCoroutine,然后调用它的start方法并返回StandaloneCoroutine

    StandaloneCoroutine间接的实现了Job接口和Continuation<T>接口,如下:

    private open class StandaloneCoroutine(
        parentContext: CoroutineContext,
        active: Boolean
    ) : AbstractCoroutine<Unit>(parentContext, active) {
        override fun handleJobException(exception: Throwable): Boolean {
            handleCoroutineException(context, exception)
            return true
    public abstract class AbstractCoroutine<in T>(
         * The context of the parent coroutine.
        @JvmField
        protected val parentContext: CoroutineContext,
        active: Boolean = true
    ) : JobSupport(active), Job, Continuation<T>, CoroutineScope {...}
    

    可以看出StandaloneCoroutine身兼多职,实现了Job, Continuation<T>, CoroutineScope接口。后面代码跟踪可以得出一个结论,最顶层的续体实现是协程自身,也就是协程恢复的时候续体会一层层地往上回调,最顶层的续体就是协程coroutine自身,即StandaloneCoroutine(这里以StandaloneCoroutine为例)

    另外还要注意一点,launch方法中传入的 block块类型:

     block: suspend CoroutineScope.() -> Unit
    

    它等价于下面的这种函数类型:

    // CoroutineScope:扩展函数转化而来
    // Continuation:suspend关键字转化而来,Continuation参数由编译器传入
    block : (CoroutineScope,Continuation) -> Unit
    // 或者通过Function2的形式表示
    block : Function2<CoroutineScope,Continuation,Unit>
    

    接着跟踪下启动协程的调用过程。在launch方法中,调用了AbstractCoroutine::start方法:

        public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) {
            initParentJob()
            // 语法糖,实际是调用CoroutineStart.invoke方法
            start(block, receiver, this)
    

    CoroutineStart::invoke方法:

        public operator fun <R, T> invoke(block: suspend R.() -> T, receiver: R, completion: Continuation<T>): Unit =
            when (this) {
                DEFAULT -> block.startCoroutineCancellable(receiver, completion)
                ATOMIC -> block.startCoroutine(receiver, completion)
                UNDISPATCHED -> block.startCoroutineUndispatched(receiver, completion)
                LAZY -> Unit // will start lazily
    

    launch方法可以知道CoroutineStart的默认值是CoroutineStart.DEFAULT,因此会调用到blockstartCoroutineCancellable方法:

    internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(
        receiver: R, completion: Continuation<T>,
        onCancellation: ((cause: Throwable) -> Unit)? = null
        runSafely(completion) {
            createCoroutineUnintercepted(receiver, completion).intercepted().resumeCancellableWith(Result.success(Unit), onCancellation)
    

    我在AS跟踪createCoroutineUnintercepted的代码调用时,发现会跳转到IntrinsicsKt.class文件,这个文件里面找不到方法的源代码,最后找到了IntrinsicsJvm.kt文件,找到createCoroutineUnintercepted方法源码,如下:

    # R:CoroutineScope
    # T:Unit
    @SinceKotlin("1.3")
    public actual fun <R, T> (suspend R.() -> T).createCoroutineUnintercepted(
        receiver: R,
        completion: Continuation<T>
    ): Continuation<Unit> {
        // probeCoroutineCreated方法直接返回completion
        val probeCompletion = probeCoroutineCreated(completion)
        return if (this is BaseContinuationImpl)
            create(receiver, probeCompletion)
        else {
            createCoroutineFromSuspendFunction(probeCompletion) {
                (this as Function2<R, Continuation<T>, Any?>).invoke(receiver, it)
    

    这里会判断this的类型是否为BaseContinuationImplthis就是我们之前在launch中传入的lambda块,那么这个lambda代码块是什么类型的呢?想要知道这个答案,我们得对这一节刚开始给出的代码进行反编译

    kotlin代码:

    class Temp2 {
        fun execute() {
            GlobalScope.launch(Dispatchers.Main) {
                Temp().fetchData("argument")
    

    对反编译后的java代码进行适当的重命名和调整,得出:

    public final class Temp2 {
        static final class LaunchLambda extends SuspendLambda implements Function2<CoroutineScope, Continuation<? super Unit>, Object> {
            int label;
            LaunchLambda(Continuation $completion) {
                super(2, $completion);
            @Nullable
            public final Object invokeSuspend(@NotNull Object $result) {
                switch (this.label) {
                    case 0:
                        ResultKt.throwOnFailure(SYNTHETIC_LOCAL_VARIABLE_1);
                        this.label = 1;
                        if ((new Temp()).fetchData("argument", (Continuation<? super Boolean>) this) == COROUTINE_SUSPENDED)
                            return COROUTINE_SUSPENDED;
                        (new Temp()).fetchData("argument", (Continuation<? super Boolean>) this);
                        return Unit.INSTANCE;
                    case 1:
                        ResultKt.throwOnFailure(SYNTHETIC_LOCAL_VARIABLE_1);
                        return Unit.INSTANCE;
                throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
            @NotNull
            public final Continuation<Unit> create(@Nullable Object value, @NotNull Continuation<? super LaunchLambda> $completion) {
                return (Continuation<Unit>) new LaunchLambda($completion);
            @Nullable
            public final Object invoke(@NotNull CoroutineScope p1, @Nullable Continuation<?> p2) {
                return ((LaunchLambda) create(p1, p2)).invokeSuspend(Unit.INSTANCE);
    

    可以看出在Temp2里面会自动生成一个静态内部类LaunchLambda,它对应着launch方法中传入的lambda块。LaunchLambda的继承关系(由上到下,子类到父类的顺序):

    LaunchLambda
    -> SuspendLambda // 用suspend修饰的lambda块都会继承至这个类
    -> ContinuationImpl
    -> BaseContinuationImpl // 重写了resumeWith函数
    -> Continuation    
    

    OK,回到createCoroutineUnintercepted方法中,现在可以回答刚刚提出的问题了,lambda传入的lambda块是不是BaseContinuationImpl类型呢?根据上面的继承关系得出,当然是!那么它就会调用LaunchLambdacreate方法,注意第二个参数传入的是completion(代码中写的是probeCompletion),它最终会被保存在父类BaseContinuationImplcompletion变量中,这个completion参数就是launch方法中创建的StandaloneCoroutine,即协程本身,它作为协程恢复时的最顶层续体

    通过调用create方法获取到一个LaunchLambda实例,createCoroutineUnintercepted方法执行结束并返回LaunchLambda实例,接着代码执行又回到startCoroutineCancellable中,回顾下该方法:

    internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(
        receiver: R, completion: Continuation<T>,
        onCancellation: ((cause: Throwable) -> Unit)? = null
        runSafely(completion) {
            createCoroutineUnintercepted(receiver, completion).intercepted().resumeCancellableWith(Result.success(Unit), onCancellation)
    

    这里有两部分调用,先是调用intercepted方法,然后再调用resumeCancellableWith方法。intercepted方法与续体拦截机制有关,后面会介绍,这里先忽略,这里直接认为调用了LaunchLambda实例的resumeCancellableWith方法即可,该方法如下:

    public fun <T> Continuation<T>.resumeCancellableWith(
        result: Result<T>,
        onCancellation: ((cause: Throwable) -> Unit)? = null
    ): Unit = when (this) {
        is DispatchedContinuation -> resumeCancellableWith(result, onCancellation)
        else -> resumeWith(result)
    

    那么会走到resumeWith方法,前面提到过该方法在父类BaseContinuationImpl实现,在该方法里面会调用invokeSuspend方法,invokeSuspend方法在LaunchLambda中实现了,如下:

    @Nullable
    public final Object invokeSuspend(@NotNull Object $result) {
        switch (this.label) {
            case 0:
                ResultKt.throwOnFailure(SYNTHETIC_LOCAL_VARIABLE_1);
                this.label = 1;
                if ((new Temp()).fetchData("argument", (Continuation<? super Boolean>) this) == COROUTINE_SUSPENDED)
                    return COROUTINE_SUSPENDED;
                (new Temp()).fetchData("argument", (Continuation<? super Boolean>) this);
                return Unit.INSTANCE;
            case 1:
                ResultKt.throwOnFailure(SYNTHETIC_LOCAL_VARIABLE_1);
                return Unit.INSTANCE;
        throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
    

    一开始label的值为0,所以会进入case 0语句分支,在该语句分支里面,会设置label的值为1,然后创建一个Temp对象并且调用它的fetchData方法,并把LaunchLambda自身作为参数传入,也就是LaunchLambda实例会被保存在fetchData方法创建的续体的completion变量里,方便协程恢复的时候进行回调。

    现在续体的持有图:

    到了这里,从启动一个协程到协程最终是如何挂起的,我们已经可以串联起来了。在「如何恢复?」一节中,协程恢复的最后几个步骤我们还没有分析,这里把它分析完,然后整个协程恢复的流程也可以串起来了。

    协程恢复的后续流程:

  • FetchDataStateMachine::invokeSuspend执行完后,会在BaseContinuationImplresumeWith方法中判断FetchDataStateMachine所持有的completion(即LaunchLambda)是否为BaseContinuationImpl类型,由LaunchLambda的继承关系,容易得出答案为「是」,所以会进入下一轮while循环,调用LaunchLambdainvokeSuspend方法。
  • 由于label = 1所以会进入case 1语句,里面直接return Unit。接着判断LaunchLambda持有的completion(即StandaloneCoroutine)是否为BaseContinuationImpl类型,根据StandaloneCoroutine的继承关系容易得出答案为「不是」,所以会调用StandaloneCoroutineresumeWith方法。
  • StandaloneCoroutineresumeWith方法在父类AbstractCoroutine中实现:
  •     public final override fun resumeWith(result: Result<T>) {
            val state = makeCompletingOnce(result.toState())
            // 如果在等子协程完成,则返回
            if (state === COMPLETING_WAITING_CHILDREN) return
            // 应该是做一些后续处理
            afterResume(state)
    

    此时最顶层的续体(协程自身)也恢复了。

  • BaseContinuationImpl::resumeWith方法执行结束,整个协程的恢复也完成了。
  • 在之前流程图的基础上进行补充完善:

    一、协程至上而下调用的流程图(协程挂起)

    其中蓝色的文本和线条表示新增的,红色的文本和线条表示挂起的过程。

    二、协程至下而上恢复的流程图(协程恢复)

    其中蓝色的文本和线条表示新增的,橙色的文本和线条表示方法调用的结束。

    协程上下文

    协程上下文CoroutineContext定义了协程的行为,它记录了当前协程所持有的信息,是协程运行中一个重要的数据对象。CoroutineContext是一个接口:

    public interface CoroutineContext {...}
    

    在续体中就有CoroutineContext的相关信息:

    public interface Continuation<in T> {
        // 与该续体对应的协程的上下文
        public val context: CoroutineContext
        // 恢复对应协程的执行,并且传递一个表示成功或失败的result作为最后一个挂起点的返回值
        public fun resumeWith(result: Result<T>)
    

    下面几种元素都是「协程上下文」的元素:

  • Job:控制协程的生命周期。
  • CoroutineDispatcher:将工作分派到适当的线程。
  • CoroutineName:协程的名称,可用于调试。
  • CoroutineExceptionHandler:处理未捕获的异常。
  • CoroutineContext可以看做是CoroutineContext.Element的一个集合,集合中的每个元素都可以使用CoroutineContext.Key进行定位,且每个元素的Key都是不同的。

    CoroutineContext.Element的定义:

        public interface Element : CoroutineContext {...}
    

    可以看到Element本身也实现了CoroutineContext接口,这很奇怪,看上去好像是Int实现了List<Int>接口一样,为什么元素本身也是集合了呢?其实这主要是为了方便API的设计,这样的话,一个元素比如Job也可以直接作为一个CoroutineContext,而不需要创建一个只包含一个元素的List,多个元素之间也可以通过「+」进行拼接,如:

    scope.launch(CoroutineName("coroutine") + Dispatchers.Main) {...}
    

    这里的「+」其实是操作符重载,对应CoroutineContext声明的plus方法:

        public operator fun plus(context: CoroutineContext): CoroutineContext = ...
    

    「协程上下文」存储元素的方式比较巧妙,它内部并不是创建一个集合,集合的每个位置都存放一个元素。它借助了一个CombinedContext结构来实现数据的存取,CombinedContext的定义及get方法:

    internal class CombinedContext(
        private val left: CoroutineContext,
        private val element: Element
    ) : CoroutineContext, Serializable {
        override fun <E : Element> get(key: Key<E>): E? {
            var cur = this
            while (true) {
                cur.element[key]?.let { return it }
                val next = cur.left
                if (next is CombinedContext) {
                    cur = next
                } else {
                    return next[key]
    

    从构造函数中可以看出它包含两部分内容:leftelement。也就是说一个CombinedContext内部可能包含多个元素。

  • left:可能是普通的上下文元素(CoroutineContext.Element),也可能又是一个CombinedContext(又包含多个上下文元素)。
  • element:一个协程上下文元素。
  • CombinedContextget方法中,有一个while(true)循环,执行过程如下:

  • 它会先判断当前element元素与传入的key是否相符,是的话直接返回该元素,否则获取到left部分。
  • leftCombinedContext部分,则对left变量重复步骤1。
  • left不是CombinedContext部分,则直接调用它的get方法获取元素(获取不到则返回null)。
  • 另外,也可以看出element先于left被访问,所以越靠右边的上下文元素,其优先级越高

    Key用于标识协程上下文元素,看看它的定义:

    public interface CoroutineContext {
        public interface Key<E : Element>
        public interface Element : CoroutineContext {
            // 用于标识元素的Key
        	public val key: Key<*>
    

    CoroutineContext.Element有个抽象类实现,可以让我们更方便地实现上下文元素:

    public abstract class AbstractCoroutineContextElement(public override val key: Key<*>) : Element
    

    CoroutineName为例,分析如何实现一个协程上下文元素:

    public data class CoroutineName(
        val name: String
        /* CoroutineName.Key可以简写为CoroutineName */
    ) : AbstractCoroutineContextElement(CoroutineName) {
        public companion object Key : CoroutineContext.Key<CoroutineName>
    

    首先声明一点,传入父类AbstractCoroutineContextElement的参数是CoroutineName.Key,只是它可以简写为CoroutineName。其实这也很好理解,在Kotlin中,我们调用伴生对象方法的时候,是可以省去伴生对象的类名的,这里也是同样的道理。

    CoroutineName内部声明了一个继承至CoroutineContext.Key的伴生对象Key,并将其作为构造参数传入父类AbstractCoroutineContextElement中,以此作为该协程上下文元素的Key

    上面是实现协程上下文元素的一种普遍做法,即在协程上下文元素里面定义一个伴生对象,以伴生对象为Key,标识该上下文元素

    最后再看一下CoroutineContext的完整定义:

    public interface CoroutineContext {
        // 根据key获取元素
        public operator fun <E : Element> get(key: Key<E>): E?
        // 翻译为"折叠",它与上下文元素的累加有关
        public fun <R> fold(initial: R, operation: (R, Element) -> R): R
        // 协程上下文元素的累加
        public operator fun plus(context: CoroutineContext): CoroutineContext = ...
    	// 当前CoroutineContext中,去掉key标识的元素后,剩下的上下文元素(以CoroutineContext形式返回)
        public fun minusKey(key: Key<*>): CoroutineContext
        public interface Key<E : Element>
        public interface Element : CoroutineContext {
            // 标识上下文元素的Key
            public val key: Key<*>
            // key相同则返回元素自身,否则返回null
            public override operator fun <E : Element> get(key: Key<E>): E? =
                @Suppress("UNCHECKED_CAST")
                if (this.key == key) this as E else null
            // 执行传入的operation函数
            public override fun <R> fold(initial: R, operation: (R, Element) -> R): R =
                operation(initial, this)
            public override fun minusKey(key: Key<*>): CoroutineContext =
                if (this.key == key) EmptyCoroutineContext else this
    

    CoroutineContextplus方法:

    public operator fun plus(context: CoroutineContext): CoroutineContext =
        if (context === EmptyCoroutineContext) this else // fast path -- avoid lambda creation
            context.fold(this) { acc, element ->
                val removed = acc.minusKey(element.key)
                if (removed === EmptyCoroutineContext) element else {
                    // make sure interceptor is always last in the context (and thus is fast to get when present)
                    val interceptor = removed[ContinuationInterceptor]
                    if (interceptor == null) CombinedContext(removed, element) else {
                        val left = removed.minusKey(ContinuationInterceptor)
                        if (left === EmptyCoroutineContext) CombinedContext(element, interceptor) else
                            CombinedContext(CombinedContext(left, element), interceptor)
    

    为了方便后面阐述,记调用形式为A + B,假设A是含有多个元素的协程上下文,B是单个上下文元素。该方法的大致执行流程如下:

  • 若元素B是空的,则返回原来的上下文A。
  • 在fold的lambda块中,可以认为acc为A,element为B。
  • 若A中减去element.key元素后(记为C),C为空上下文,则返回B(相当于元素B替换了上下文A)。
  • 查看C中是否有ContinuationInterceptor元素,没有则将C和B拼接后返回。
  • C中剔除ContinuationInterceptor,记为D,若D是空的,则将B和ContinuationInterceptor拼接然后返回。
  • D不是空的,则将D和B和ContinuationInterceptor拼接然后返回。
  • 简单来说,这里就是要将「传入的协程上下文元素」与「原来的协程上下文元素」进行拼接,若传入的元素与原来集合中的元素的key有冲突,则用传入的元素替换掉原来集合中key冲突的元素。在上下文元素拼接的时候,若有ContinuationInterceptor元素则要确保它在「协程上下文元素集合」的最右边,这样它的优先级最高,从协程上下文获取该元素的时候可以更快地获取到(至于为什么元素在右边,元素的优先级就高、获取快,在前面介绍CombinedContext中已经说明过了)。

    plus方法的执行流程很难用文字叙述清楚,如果想要知道它的实现流程,可以代入几个例子试试。但是它具体的执行流程并不是要分析的重点,有个大概的印象即可。

    续体拦截机制

    这里算是协程实现原理解析的最后一环了。我们在使用协程的时候,会使用到一些调度器如Dispatchers.MainDispatchers.IO等调度器来调度线程,在前面的分析中并没有提到协程是如何进行线程调度的。

    线程的调度与续体拦截器ContinuationInterceptor有关,它也是一种「协程上下文元素」:

    public interface ContinuationInterceptor : CoroutineContext.Element {
        // 续体拦截器对应的Key
    	companion object Key : CoroutineContext.Key<ContinuationInterceptor>
        // 返回一个续体,该续体对原始的续体进行包装(原始的续体作为方法参数传入)。
        // 如果该方法不想拦截传入的续体,也可以直接返回原来的续体。
        // 当原始续体完成时,如果该续体之前被拦截了,协程框架会调用releaseInterceptedContinuation
        // 方法,传入的参数就是「续体的包装类」。
        public fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T>
        // 该函数只有在interceptContinuation成功拦截的情况下,才会被调用。
        // 若原始续体成功被拦截,当原始续体完成且不再被使用时,该方法会被调用,传入的参数是「续体的包装类」。
        public fun releaseInterceptedContinuation(continuation: Continuation<*>) {
            /* do nothing by default */
    

    续体拦截器可以用于拦截一个续体,最常见的续体拦截器就是协程调度器CoroutineDispatcher,可以通过单例类Dispatchers获取到相应的协程调度器。查看CoroutineDispatcher的实现:

    public abstract class CoroutineDispatcher :
        AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {
        @ExperimentalStdlibApi
        public companion object Key : AbstractCoroutineContextKey<ContinuationInterceptor, CoroutineDispatcher>(
            ContinuationInterceptor,
            { it as? CoroutineDispatcher })    
        public open fun isDispatchNeeded(context: CoroutineContext): Boolean = true
        public abstract fun dispatch(context: CoroutineContext, block: Runnable)
        public open fun dispatchYield(context: CoroutineContext, block: Runnable): Unit = dispatch(context, block)
        public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =
            DispatchedContinuation(this, continuation)   
        @InternalCoroutinesApi
        public override fun releaseInterceptedContinuation(continuation: Continuation<*>) {
            (continuation as DispatchedContinuation<*>).reusableCancellableContinuation?.detachChild()
    
  • 拦截器:CoroutineDispatcher继承至ContinuationInterceptor,所以它也是一种续体拦截器。
  • 上下文元素的标识:CoroutineDispatcher继承至AbstractCoroutineContextElement,并传入ContinuationInterceptor.Key构造参数,以此来标识自身。
  • isDispatchNeeded:若需要使用dispatch方法进行调度则返回true,否则返回false。该方法默认返回true。协程调度器可以重写该方法,提供一个性能优化以避免不必要的dispatch,例如主线程调度器Dispatchers.Main会判断当前协程是否已经在UI线程中,如果是的话该方法就会返回false,没有必要再去执行dispatch方法进行不必要的线程调度。
  • dispatch:在给定的上下文和线程中,去执行block块。
  • 假设使用的协程调度器是主线程调度器Dispatchers.Main

        public actual val Main: MainCoroutineDispatcher get() = MainDispatcherLoader.dispatcher
    

    查看MainDispatcherLoader.dispatcher

        @JvmField
        val dispatcher: MainCoroutineDispatcher = loadMainDispatcher()
        private fun loadMainDispatcher(): MainCoroutineDispatcher {
            return try {
                val factories = if (FAST_SERVICE_LOADER_ENABLED) {
                    FastServiceLoader.loadMainDispatcherFactory()
                } else {
                    // We are explicitly using the
                    // `ServiceLoader.load(MyClass::class.java, MyClass::class.java.classLoader).iterator()`
                    // form of the ServiceLoader call to enable R8 optimization when compiled on Android.
                    ServiceLoader.load(
                            MainDispatcherFactory::class.java,
                            MainDispatcherFactory::class.java.classLoader
                    ).iterator().asSequence().toList()
                @Suppress("ConstantConditionIf")
                factories.maxBy { it.loadPriority }?.tryCreateDispatcher(factories)
                    ?: createMissingDispatcher()
            } catch (e: Throwable) {
                // Service loader can throw an exception as well
                createMissingDispatcher(e)
    

    调用了tryCreateDispatcher

    public fun MainDispatcherFactory.tryCreateDispatcher(factories: List<MainDispatcherFactory>): MainCoroutineDispatcher =
        try {
            createDispatcher(factories)
        } catch (cause: Throwable) {
            createMissingDispatcher(cause, hintOnError())
    

    继续跟踪,发现createDispatcherMainDispatcherFactory接口的一个方法,其中的一个实现在AndroidDispatcherFactory中:

    internal class AndroidDispatcherFactory : MainDispatcherFactory {
        override fun createDispatcher(allFactories: List<MainDispatcherFactory>) =
            HandlerContext(Looper.getMainLooper().asHandler(async = true))
    

    HandlerContext其实就是调度器Dispatchers.Main的最终实现:

    # handler:主线程的Handler
    internal class HandlerContext private constructor(
        private val handler: Handler,
        private val name: String?,
        private val invokeImmediately: Boolean
    ) : HandlerDispatcher(), Delay {
        public constructor(
            handler: Handler,
            name: String? = null
        ) : this(handler, name, false)
        override fun isDispatchNeeded(context: CoroutineContext): Boolean {
            return !invokeImmediately || Looper.myLooper() != handler.looper
        override fun dispatch(context: CoroutineContext, block: Runnable) {
            handler.post(block)
    

    isDispatchNeeded:通过looper判断协程当前是否在主线程上,是的话返回false,表示不需要再进行线程调度,否则返回true表示需要进行线程调度。

    dispatch:使用主线程的handler对传入的block块进行post操作。

    对「续体拦截器」「协程调度器」有了一定的了解之后,我们再回过头看一下协程调度器是如何发挥作用的。我们前面分析过Cancellable文件的startCoroutineCancellable方法:

    internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(
        receiver: R, completion: Continuation<T>,
        onCancellation: ((cause: Throwable) -> Unit)? = null
        runSafely(completion) {
            createCoroutineUnintercepted(receiver, completion).intercepted().resumeCancellableWith(Result.success(Unit), onCancellation)
    

    createCoroutineUnintercepted方法中返回了LaunchLambda实例,在之前的分析中,我们忽略了intercepted方法,直接分析为LaunchLambda会调用resumeCancellableWith方法,若没有为协程设定续体拦截器,那么确实是LaunchLambda会直接调用到resumeCancellableWith方法。我们看看,如果为协程设定了续体拦截器,会发生什么?

    查看LaunchLambda调用的intercepted方法,它在IntrinsicsJVM文件中:

    public actual fun <T> Continuation<T>.intercepted(): Continuation<T> =
        (this as? ContinuationImpl)?.intercepted() ?: this
    

    LaunchLambdaContinuationImpl类型,因此会调用到父类ContinuationImpl::intercepted

    internal abstract class ContinuationImpl(
        completion: Continuation<Any?>?,
        private val _context: CoroutineContext?
    ) : BaseContinuationImpl(completion) {
        constructor(completion: Continuation<Any?>?) : this(completion, completion?.context)
        @Transient
        private var intercepted: Continuation<Any?>? = null    
        public fun intercepted(): Continuation<Any?> =
            intercepted
                ?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this)
                    .also { intercepted = it }
    

    刚开始interceptednull,所以会判断协程上下文中是否有ContinuationInterceptor元素,若没有则会返回this(即LaunchLambda自身,并将intercepted变量设置为LaunchLambda),有的话则会调用interceptContinuation方法,假设使用的续体拦截器是Dispatchers.Main,那么就是调用到CoroutineDispatcherinterceptContinuation方法,该方法会返回一个DispatchedContinuation(并将DispatchedContinuation设置到intercepted变量中)。

    查看CoroutineDispatcher::interceptContinuation

        public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =
            DispatchedContinuation(this, continuation)
    

    DispatchedContinuation类:

    internal class DispatchedContinuation<in T>(
        @JvmField val dispatcher: CoroutineDispatcher,
        @JvmField val continuation: Continuation<T>
    ) : DispatchedTask<T>(MODE_UNINITIALIZED), CoroutineStackFrame, Continuation<T> by continuation {...}
    

    在这里的例子中,dispatcher就是Dispatchers.Maincontinuation就是LaunchLambda

    再回到Cancellable文件的startCoroutineCancellable方法:

    internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(
        receiver: R, completion: Continuation<T>,
        onCancellation: ((cause: Throwable) -> Unit)? = null
        runSafely(completion) {
            createCoroutineUnintercepted(receiver, completion).intercepted().resumeCancellableWith(Result.success(Unit), onCancellation)
    

    在有续体拦截器(Dispatchers.Main)的情况下,intercepted方法会返回DispatchedContinuation,接着调用它的resumeCancellableWith方法:

    public fun <T> Continuation<T>.resumeCancellableWith(
        result: Result<T>,
        onCancellation: ((cause: Throwable) -> Unit)? = null
    ): Unit = when (this) {
        is DispatchedContinuation -> resumeCancellableWith(result, onCancellation)
        else -> resumeWith(result)
    

    调用到另外一个resumeCancellableWith方法,这个方法就是在DispatchedContinuation中实现的了:

        inline fun resumeCancellableWith(
            result: Result<T>,
            noinline onCancellation: ((cause: Throwable) -> Unit)?
            val state = result.toState(onCancellation)
            if (dispatcher.isDispatchNeeded(context)) { // 需要线程调度
                _state = state
                resumeMode = MODE_CANCELLABLE
                // 线程调度,将自身以Runnable块形式传入
                dispatcher.dispatch(context, this)
            } else { // 不需要线程调度
                executeUnconfined(state, MODE_CANCELLABLE) {
                    if (!resumeCancelled(state)) {
                        // 最终会调用continuation.resumeWith,即LaunchLambda.resumeWith
                        resumeUndispatchedWith(result)
    

    可以看到,它调用了dispatcher.isDispatchNeeded来判断是否需要进行线程调度,以Dispatchers.Main为例,就是判断当前协程是否在主线程中运行,是的话则不需要调度,否则需要将协程调度到主线程中运行。

  • 不需线程调度:最终会调用到LaunchLambda.resumeWith,它后续的执行流程之前已经分析过了。
  • 需要线程调度:(以主线程的协程调度器为例)最终会将传入的Runnable在主线程中执行。
  • Runnablerun方法在哪实现的呢?在DispatchedContinuation的父类DispatchedTask中有run方法的实现:

        public final override fun run() {
            try {
                // 获取到的delegate其实就是DispatchedContinuation
                val delegate = delegate as DispatchedContinuation<T>
                // 获取到的continuation其实就是LaunchLambda
                val continuation = delegate.continuation
                val context = continuation.context
                val state = takeState() // NOTE: Must take state in any case, even if cancelled
                withCoroutineContext(context, delegate.countOrElement) {
                    val exception = getExceptionalResult(state)
                     * Check whether continuation was originally resumed with an exception.
                     * If so, it dominates cancellation, otherwise the original exception
                     * will be silently lost.
                    val job = if (exception == null && resumeMode.isCancellableMode) context[Job] else null
                    if (job != null && !job.isActive) {
                        val cause = job.getCancellationException()
                        cancelCompletedResult(state, cause)
                        continuation.resumeWithStackTrace(cause)
                    } else {
                        if (exception != null) {
                            continuation.resumeWithException(exception)
                        } else {
                            // 正常情况下,会执行到这里,调用LaunchLambda的resume方法
                            continuation.resume(getSuccessfulResult(state))
            } catch (e: Throwable) {
            } finally {
    

    run方法中,最终会调用到LaunchLambdaresume方法(内部又会调用到resumeWith方法)。所以这里做的线程调度,其实就是通过主线程的handler,将代码post到主线程中去运行,从而完成线程的调度工作。

    另外,还有几个未研究的地方与自己的猜想:

    一、releaseIntercepted方法:在BaseContinuationImpl::resumeWith中,每执行完一个续体的invokeSuspend方法,就会调用该续体的releaseIntercepted方法

        protected override fun releaseIntercepted() {
            val intercepted = intercepted
            // intercepted不为null且不为自身(即之前成功拦截续体),就进入If块
            if (intercepted != null && intercepted !== this) {
                // 调用续体拦截器的releaseInterceptedContinuation方法,并传入续体包装类
     		 	context[ContinuationInterceptor]!!.releaseInterceptedContinuation(
                	 intercepted)
            // 将intercepted变量设置为CompletedContinuation
            this.intercepted = CompletedContinuation // just in case
    

    续体拦截器的releaseInterceptedContinuation方法应该是做一些资源清理的工作。

    二、像withContext这样的函数:

    scope.launch(Dispatchers.Main) {
        withContext(Dispatchers.IO) {}
    
    public suspend fun <T> withContext(
        context: CoroutineContext,
        block: suspend CoroutineScope.() -> T
    ): T {...}
    

    block块执行完后,会将线程自动切回「启动协程时的协程调度器所指定」的线程,那么它是如何切回来的呢?个人猜测,在协程至上而下调用的时候,协程上下文会一层一层地向下传递,withContextblock块执行的时候,协程上下文会被保存在某个地方,等到block块执行结束的时候,会从之前保存的协程上下文中取出协程调度器,将剩余的代码(协程恢复)调度到相应的线程中去执行,从而实现了 block块执行完后,线程会自动切回「启动协程时的协程调度器所指定」的线程。

    协程咖啡厅 - 构造魔法 - 探索 Kotlin 协程实现原理 - M.D

    Suspend functions - Kotlin Vocabulary - YouTube