Kotlin – 协程基础及原理

作者 : 开心源码 本文共19754个字,预计阅读时间需要50分钟 发布时间: 2022-05-14 共190人阅读

初识协程

什么是协程

Kotlin 1.3 增加了协程 Coroutine 的概念,文档中详情协程是一种并发设计模式,可以在 Android 平台上使用它来简化异步执行的代码。

协程具备如下特点:

  • 异步代码同步化:使用编写同步代码的方式编写异步代码。

  • 轻量:您可以在单个线程上运行多个协程,由于协程支持挂起,不会使正在运行协程的线程阻塞。挂起比阻塞节省内存,且支持多个并行操作。

  • 内存泄漏更少:使用结构化并发机制在一个作用域内执行多项操作。

  • 内置取消支持:取消操作会自动在运行中的整个协程层次结构内传播。

  • Jetpack 集成:许多 Jetpack 库都包含提供全面协程支持的扩展。某些库还提供自己的协程作用域,可供您用于结构化并发。

协程的挂起和恢复

Kotlin 协程的挂起和恢复本质上是挂起函数的挂起和恢复。

suspend fun suspendFun() {}

挂起函数suspend 关键字修饰的普通函数。假如在协程体内调用了挂起函数,那么调用处就被称为 挂起点。挂起点假如出现 异步调用,那么当前协程就会被挂起,直到对应的 Continuation.resume() 函数被调用才会恢复执行。

挂起函数和普通函数的区别在于:

  • 挂起函数只能在协程体内或者其余挂起函数内调用;

  • 挂起函数可以调用任何函数,普通函数只能调用普通函数。

suspend 除用于修饰函数外还可用于修饰 lambda 表达式,在源码分析的章节会详细分析它们的区别。

基本用法

Gradle 引入

dependencies {// Kotlin Coroutinesimplementation 'org.jetbrains.kotlinx:kotlinx-coroutines-core:1.4.2'// 使用 `Dispatchers.Main` 需要增加如下依赖implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-android:1.4.2'}

启动协程

kotlin 协程框架为我们提供了两种便捷的方式启动协程:

  • GlobalScop.launch

  • GlobalScope.async

分别来使用两种方式输出 Hello World!

fun main() {GlobalScope.launch { // 使用 GlobalScope.launch 启动协程delay(1000L) // 非阻塞的等待 1 秒钟(默认时间单位是毫秒)println("World!") // 在推迟后打印输出}print("Hello ") // 协程已在等待时主线程还在继续Thread.sleep(2000L) // 阻塞主线程 2 秒钟来保证 JVM 存活}fun main() {GlobalScope.async { // 使用 GlobalScope.async 启动协程delay(1000L)println("World!")}print("Hello ")Thread.sleep(2000L)}

从上面的例子里看这两种方式如同并没有什么区别,其实区别在他们的返回值上

  • GlobalScop.launch:返回值 Job

  • GlobalScope.async:返回值 Deferred<T>

Deferred<T>Job 的子类,并且可以通过调用 await 函数获取协程的返回值。上面 GlobalScope.async 的例子改造一下:

GlobalScope.launch {val result = GlobalScope.async { // 使用 GlobalScope.async 启动协程delay(1000L)"World!"}println("Hello ${result.await()}")}Thread.sleep(2000L)//输出:Hello World!

上面的示例把 async 嵌套在了 launch 函数体内部,这是由于 await 是一个挂起函数,而挂起函数不同于普通函数的就是它必需在协程体或者其余挂起函数内部调用。

在协程体内 ({} 内) 可以隐藏 GlobalScope 直接使用 async、launch 启动协程,所以上面的示例可以修改如下:

GlobalScope.launch {val result = async { // 使用 GlobalScope.async 启动协程...}...// launch {}}...

协程操作

通过理解协程的两种启动方式,我们知道 GlobalScop.launch、GlobalScop.async 的返回值都是 Job 对象或者其子类对象。那 Job 是什么呢? 又有哪些功能。

Job 是一个可取消的后端任务,用于操作协程的执行并记录执行过程中协程的状态,所以一般来说 Job 实例也代表了协程。

Job 具备如下几种状态:

State[isActive][isCompleted][isCancelled]
New (可选初始状态)falsefalsefalse
Active (默认初始状态)truefalsefalse
Completing (瞬态)truefalsefalse
Cancelling (瞬态)falsefalsetrue
Cancelled (最终状态)falsetruetrue
Completed (最终状态)falsetruefalse

通常情况下,创立 Job 时会自动启动,状态默认为 _Active_,但是假如创立时增加参数 CoroutineStart.Lazy 则状态为 _NEW_,可以通过 start() 或者 join() 等函数激活。

Job 状态流程图:

wait children+-----+ start +--------+ complete +-------------+ finish +-----------+| New | -----> | Active | ---------> | Completing | -------> | Completed |+-----+ +--------+ +-------------+ +-----------+| cancel / fail || +----------------+| |V V+------------+ finish +-----------+| Cancelling | --------------------------------> | Cancelled |+------------+ +-----------+

Job 的可用方法:

  • cancel(CancellationException):取消 Job 对应的协程并发送协程取消错误 (CancellationException)。

  • invokeOnCompletion():注册当此 Job 状态升级为 Completed 时同步调用的解决程序。

  • join():挂起 Job 对应的协程,当协程完成时,外层协程恢复。

  • start():假如创立 Job 对象时使用的启动模式为 CoroutineStart.Lazy,通过它可以启动协程。

  • cancelAndJoin():取消 Job 并挂起当前协程,直到 Job 被取消。

当要取消正在运行的协程:

val job = launch {repeat(1000) { i ->println("job: I'm sleeping $i ...")delay(500L)}}delay(1300L) // 推迟一段时间println("main: I'm tired of waiting!")job.cancel() // 取消该作业job.join() // 等待作业执行结束println("main: Now I can quit.")// 输出job: I'm sleeping 0 ...job: I'm sleeping 1 ...job: I'm sleeping 2 ...main: I'm tired of waiting!main: Now I can quit.

上面示例中可以使用 cancelAndJoin 函数它合并了对 cancel 以及 join 函数的调用。

注意:假如在协程执行过程中没有挂起点,那么协程是不可被取消的。

val startTime = System.currentTimeMillis()val job = launch(Dispatchers.Default) {var nextPrintTime = startTimevar i = 0while (i < 5) { // 一个执行计算的循环,只是为了占用 CPU// 每秒打印消息两次if (System.currentTimeMillis() >= nextPrintTime) {println("job: I'm sleeping ${i++} ...")nextPrintTime += 500L}}}delay(1300L) // 等待一段时间,并保证协程开始执行println("main: I'm tired of waiting!")job.cancelAndJoin() // 取消一个作业并且等待它结束println("main: Now I can quit.")// 输出job: I'm sleeping 0 ...job: I'm sleeping 1 ...job: I'm sleeping 2 ...main: I'm tired of waiting!job: I'm sleeping 3 ...job: I'm sleeping 4 ...main: Now I can quit.

简单来说,假如协程体内没有挂起点的话,已开始执行的协程是无法取消的。

下面来详情,协程启动时传参的含义及作用:

public fun CoroutineScope.launch(context: CoroutineContext = EmptyCoroutineContext,start: CoroutineStart = CoroutineStart.DEFAULT,block: suspend CoroutineScope.() -> Unit): Job {...}

协程的启动模式

CoroutineStart:协程启动模式。协程内提供了四种启动模式:

  • DEFAULT:协程创立后,立即开始调度,在调度前假如协程被取消,其将直接进入取消相应的状态。

  • ATOMIC:协程创立后,立即开始调度,协程执行到第一个挂起点之前不响应取消。

  • LAZY:只有协程被需要时,包括主动调用协程的 start()、join()、await() 等函数时才会开始调度,假如调度前就被取消,那么该协程将直接进入异常结束状态。

  • UNDISPATCHED:协程创立后立即执行,直到遇到第一个真正挂起的点。

立即调度和立即执行的区别:立即调度表示协程的调度器会立即接收到调度指令,但具体执行的时机以及在那个线程上执行,还需要根据调度器的具体情况而定,也就是说立即调度到立即执行之间通常会有一段时间。因而,我们得出以下结论:

  • DEFAULT 尽管是立即调度,但也有可能在执行前被取消。

  • UNDISPATCHED 是立即执行,因而协程肯定会执行。

  • ATOMIC 尽管是立即调度,但其将调度和执行两个步骤合二为一了,就像它的名字一样,其保证调度和执行是原子操作,因而协程也肯定会执行。

  • UNDISPATCHEDATOMIC 尽管都会保证协程肯定执行,但在第一个挂起点之前,前者运行在协程创立时所在的线程,后者则会调度到指定的调度器所在的线程上执行。

协程上下文和调度器

CoroutineContext:协程上下文。用于控制协程的行为,上文提到的 Job 和准备详情的调度器都属于 CoroutineContext

协程默认提供了四种调度器:

  • Dispatchers.Default:默认调度器,假如没有指定协程调度器和其余任何阻拦器,那默认都使用它来构建协程。适合解决后端计算,其是一个 CPU 密集型任务调度器。

  • Dispatchers.IOIO 调度器,适合执行 IO 相关操作,其是一个 IO 密集型任务调度器。

  • Dispatchers.MainUI 调度器,会将协程调度到主线程中执行。

  • Dispatchers.Unconfined:非受限制调度器,不要求协程执行在特定线程上。协程的调度器假如是 Unconfined,那么它在挂起点恢复执行时会在恢复所在的线程上直接执行,当然,假如嵌套创立以它为调度器的协程,那么这些协程会在启动时被调度到协程框架内部的时间循环上,以避免出现 StackOverflow

  • Dispatchers.Unconfined:非受限调度器,会在调用它的线程启动协程,但它仅仅只是运行到第一个挂起点。挂起后,它恢复线程中的协程,而这完全由被调用的挂起函数来决定。

runBlocking {launch { // 运行在父协程的上下文中,即 runBlocking 主协程println("main runBlocking : I'm working in thread ${Thread.currentThread().name}")}launch(Dispatchers.Unconfined) { // 不受限的——将工作在主线程中println("Unconfined : I'm working in thread ${Thread.currentThread().name}")}launch(Dispatchers.Default) { // 将会获取默认调度器println("Default : I'm working in thread ${Thread.currentThread().name}")}}//输出结果Unconfined : I'm working in thread main @coroutine#3Default : I'm working in thread DefaultDispatcher-worker-1 @coroutine#4main runBlocking : I'm working in thread main @coroutine#2

withContext

除了可以在 GlobalScope.launch {}、GlobalScope.async {} 创立协程时设置协程调度器,

async {...}.await() 相比 withContext 的内存开销更低,因而对于使用 async 之后立即调用 await 的情况,应当优先使用 withContext

withTimeout

Kotlin 协程提供了 withTimeout 函数设置超时取消。假如运行超时,取消后会抛出 TimeoutCancellationException 异常。抛出异常的情况下回影响到其余协程,这时候可以使用 withTimeoutOrNull 函数,它会在超时的情况下返回 null 而不抛出异常。

runBlocking {val result = withContext(coroutineContext) {withTimeoutOrNull(500) {delay(1000)"hello"}}println(result)}// 输出结果hello

yield

假如想要处理上面示例中的问题可以使用 yield 函数。它的作用在于检查所在协程的状态,假如已经取消,则抛出取消异常予以响应。此外它还会尝试出让线程的执行权,给其余协程提供执行机会。

在上面示例中增加 yield 函数:

if (System.currentTimeMillis() >= nextPrintTime) {yield()println("job: I'm sleeping ${i++} ...")nextPrintTime += 500L}// 输出结果job: I'm sleeping 0 ...job: I'm sleeping 1 ...job: I'm sleeping 2 ...main: I'm tired of waiting!main: Now I can quit.

协程的作用域

协程作用域:协程作用域主要用于明确协程之间的父子关系,以及对于取消或者者异常解决等方面的传播行为。

协程作用域包括以下三种:

  • 顶级作用域:没有父协程的协程所在的作用域为顶级作用域。

  • 协同作用域:协程中启动新的协程,新协程为所在协程的子协程,这种情况下子协程所在的作用域默认为协同作用域。此时子协程抛出的未捕获异常将传递给父协程解决,父协程同时也会被取消。

  • 主从作用域:与协程作用域在协程的父子关系上一致,区别在于处于该作用域下的协程出现未捕获的异常时不会将异常向上传递给父协程。

父子协程间的关系:

  • 父协程被取消,则所有子协程均被取消。

  • 父协程需要等待子协程执行完毕之后才会最终进入完成状态,不论父协程自身的协程体能否已经执行完毕。

  • 子协程会继承父协程的协程上下文元素,假如自身有相同 key 的成员,则覆盖对应的 key,覆盖的效果仅限自身范围内有效。

公告顶级作用域:GlobalScope.launch {}runBlocking {}

公告协同作用域:coroutineScope {}

公告主从作用域:supervisorScope {}

coroutineScope {}supervisorScope {} 是挂起函数所以它们只能在协程作用域中或者挂起函数中调用。

coroutineScope {}supervisorScope {} 的区别在于 SupervisorCoroutine 重写了 childCancelled() 函数使异常不会向父协程传递。

协程并发

通过上文的详情可以理解到协程其实就是执行在线程上的代码片段,所以线程的并发解决都可以用在协程上,比方 synchorinzedCAS 等。而协程本身也提供了两种方式解决并发:

  • Mutex:互斥锁;

  • Semaphore:信号量。

Mutex

Mutex 相似于 synchorinzed,协程竞争时将协程包装为 LockWaiter 使用双向链表存储。Mutex 还提供了 withLock 扩展函数,以简化使用:

runBlocking<Unit> {val mutex = Mutex()var counter = 0repeat(10000) {GlobalScope.launch {mutex.withLock {counter ++}}}Thread.sleep(500) //暂停一会儿等待所有协程执行结束println("The final count is $counter")}
Semaphore

Semaphore 用以限制访问特定资源的协程数量。

runBlocking<Unit> {val semaphore = Semaphore(1)var counter = 0repeat(10000) {GlobalScope.launch {semaphore.withPermit {counter ++}}}Thread.sleep(500) //暂停一会儿等待所有协程执行结束println("The final count is $counter")}

注意:只有在 permits = 1 时才和 Mutex 功能相同。

源码分析

suspend

我们来看 suspend 修饰函数和修饰 lambda 的区别。

挂起函数:

suspend fun suspendFun() {}

编译成 java 代码如下:

@Nullablepublic final Object suspendFun(@NotNull Continuation $completion) {return Unit.INSTANCE;}

可以看到挂起函数其实隐藏着一个 Continuation 协程实例参数,而这个参数其实就来源于协程体或者者其余挂起函数,因而挂起函数只能在协程体内或者其余函数内调用了。

suspend 修饰 lambda 表达式:

suspend {}// 反编译结果如下Function1 var2 = (Function1)(new Function1((Continuation)null) {int label;@Nullablepublic final Object invokeSuspend(@NotNull Object $result) {switch(this.label) {case 0:return Unit.INSTANCE;default:}}@NotNullpublic final Continuation create(@NotNull Continuation completion) {Function1 var2 = new <anonymous constructor>(completion);return var2;}public final Object invoke(Object var1) {return ((<undefinedtype>)this.create((Continuation)var1)).invokeSuspend(Unit.INSTANCE);}});

suspend lambda 实际会被编译成 SuspendLambda 的子类。suspendLambda 的继承关系如下图:

image

通过反编译的代码可以发现我们在协程体内编写的代码最终是在 invokeSuspend 函数内执行的。而在 BaseContinuationImpl 内实现了 Continuation 协程接口的 resumeWidth 函数,并在其内调用了 invokeSuspend 函数。

suspend 关键字的详情先到这里,接下来我们看协程是如何创立并运行的。

协程是如何被创立的

文件地址 kotlin.coroutines.Continuation.kt

Continuation.kt 文件基本属于协程的基础核心了,搞懂了它也就相当于搞懂了协程的基础原理。

  • 协程接口的定义;

  • 唤醒或者启动协程的函数;

  • 四种创立协程的函数;

  • 帮助获取协程内的协程实例对象的函数。

首先是协程的接口公告,非常简单:

/*** 协程接口,T 表示在最后一个挂起点恢复时的返回值类型*/public interface Continuation<in T> {/*** 协程上下文*/public val context: CoroutineContext/*** 这个函数的功能有很多,它可以启动协程,也可以恢复挂点,还可以作为最后一次挂起点恢复时输出协程的结果*/public fun resumeWith(result: Result<T>)}

协程接口公告之后 Continuation.kt 文件提供了两个调用 resumeWith 函数的函数:

public inline fun <T> Continuation<T>.resume(value: T): Unit =resumeWith(Result.success(value))public inline fun <T> Continuation<T>.resumeWithException(exception: Throwable): Unit =resumeWith(Result.failure(exception))

这两个函数除了传参一成功一失败,它们的功能是一模一样的,都是直接调用了 resumeWith 函数。相当于是 resumeWith 函数的封装。

再而后就是四种创立协程的方式了:

public fun <T> (suspend () -> T).createCoroutine(completion: Continuation<T>): Continuation<Unit> =SafeContinuation(createCoroutineUnintercepted(completion).intercepted(), COROUTINE_SUSPENDED)public fun <R, T> (suspend R.() -> T).createCoroutine(receiver: R,completion: Continuation<T>): Continuation<Unit> =SafeContinuation(createCoroutineUnintercepted(receiver, completion).intercepted(), COROUTINE_SUSPENDED)public fun <T> (suspend () -> T).startCoroutine(completion: Continuation<T>) {createCoroutineUnintercepted(completion).intercepted().resume(Unit)}public fun <R, T> (suspend R.() -> T).startCoroutine(receiver: R,completion: Continuation<T>) {createCoroutineUnintercepted(receiver, completion).intercepted().resume(Unit)}

这四种方式可以说是类似度超高,createCoroutinestartCoroutine 最大的区别在于,通过 createCoroutine 创立的协程需要掉用 resume 函数启动,而 startCoroutine 函数内部已经默认调用了 resume 函数。那我们先用第一种方式创立一个协程:

// 创立协程val continuation = suspend {println("In Coroutine")}.createCoroutine(object : Continuation<Unit> {override fun resumeWith(result: Result<Unit>) {println(result)}override val context = EmptyCoroutineContext})// 启动协程continuation.resume(Unit)

调用 createCoroutine 函数创立协程时传入了 Continuation 协程的匿名类对象,诶?如同有点不对,为什么创立协程的时候要传一个协程实例进去,直接用不就成了。想知道为什么的话,那就需要看看 createCoroutine 究竟做了什么操作了。

SafeContinuation(createCoroutineUnintercepted(completion).intercepted(), COROUTINE_SUSPENDED)

首先调用的是 createCoroutineUnintercepted 函数,它的源码可以在 kotlin.coroutines.intrinsics.IntrinsicsJvm.kt 内找到:

public actual fun <T> (suspend () -> T).createCoroutineUnintercepted(completion: Continuation<T>): Continuation<Unit> {val probeCompletion = probeCoroutineCreated(completion)return if (this is BaseContinuationImpl)create(probeCompletion)elsecreateCoroutineFromSuspendFunction(probeCompletion) {(this as Function1<Continuation<T>, Any?>).invoke(it)}}

probeCoroutineCreated 函数内直接将参数返回了,并且通过断点的方式,它的返回值和 completion 传参是一样的,所以这里先忽略它。

通过断点会发现 (this is BaseContinuationImpl) 判断的返回值是 true 这也就间接证实了上文中 suspend lambdaBaseContinuationImpl 的继承关系。最后返回的是 create(Continuation) 函数的返回值,这里可以发现作为参数传入的 Continuation 变量被 suspend lambda 包裹了一层,而后返回,相当于 suspend lambda 成为了 Continuation 的代理商。

到这里 createCoroutineUnintercepted(completion) 的含义就搞明白了:

object : Continuation<Unit> {} 创立的协程实例传入 suspend lambda,由其代理商协程执行操作。

紧接着又调用了 intercepted 函数,intercepted 函数公告也在 IntrinsicsJvm.kt 文件内:

public actual fun <T> Continuation<T>.intercepted(): Continuation<T> = (this as? ContinuationImpl)?.intercepted() ?: this

接着看 ContinuationImplintercepted 函数:

public fun intercepted(): Continuation<Any?> =intercepted?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this).also { intercepted = it }

其中 context[ContinuationInterceptor]?.interceptContinuation(this) 这句代码涉及到协程阻拦器的概念,下文会详细分析。这里可以先简单详情一下,协程阻拦器和协程其实也是代理商的关系。所以 intercepted() 可以了解为假如协程上下文中增加了协程阻拦器,那么就返回协程阻拦器,不然就返回 suspend lambda 实例本身,而它们都实现了 Continuation 接口。

先做一个小结,通过上文的详情基本就清楚了,createCoroutine、startCoroutine 函数其实不是用来创立协程的,协程实例就是它们的传参,它们是为协程增加代理商的。

createCoroutineUnintercepted(completion).intercepted()

通过上面的代码,为协程增加了代理商,分别是 suspend lambda 和协程阻拦器。这时候通过协程实例调用 resumeWith 函数时会先执行两层代理商内实现的 resumeWith 函数逻辑,最终才会执行到协程的 resumeWith 函数输出最终结果。

createCoroutine 函数内,在增加两层代理商之后又增加了一层代理商,SafeContinuationSafeContinuation 内部使用协程的三种状态,并配合 CAS 操作,保证当前返回的 SafeContinuation 实例对象仅能调用一次 resumeWith 函数,屡次调用会报错。

  • UNDECIDED:初始状态

  • COROUTINE_SUSPENDED:挂起状态

  • RESUMED:恢复状态

协程是如何被挂起又是如何被恢复的

那为什么协程要这么做,很麻烦不是?要弄清楚这个问题先来看 BaseContinuationImplresumeWith 函数实现吧。

public final override fun resumeWith(result: Result<Any?>) {var current = thisvar param = resultwhile (true) {probeCoroutineResumed(current)with(current) {val completion = completion!!val outcome: Result<Any?> =try {val outcome = invokeSuspend(param)if (outcome === COROUTINE_SUSPENDED) returnResult.success(outcome)} catch (exception: Throwable) {Result.failure(exception)}releaseIntercepted() // this state machine instance is terminatingif (completion is BaseContinuationImpl) {current = completionparam = outcome} else {// top-level completion reached -- invoke and returncompletion.resumeWith(outcome)return}}}}

当调用 resume(Unit) 启动协程时,因为代理商的存在会调用到 BaseContinuationImplresumeWith() 函数,函数内会执行 invokeSuspend() 函数,也就说我们所说的协程体。

查看如下代码的 invokeSuspend 函数:

suspend {5}// 反编译后的 invokeSuspend 函数public final Object invokeSuspend(@NotNull Object $result) {Object var2 = IntrinsicsKt.getCOROUTINE_SUSPENDED();switch(this.label) {case 0:ResultKt.throwOnFailure($result);return Boxing.boxInt(5);default:throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");}}

可以看到这里直接返回了最终的结果 5,接着在 ContinuationImpl.resumeWith 函数内最终调用

completion.resumeWith(outcome)

输出协程的最终结果。

这是协程执行同步代码的过程,可以看到在整个过程中,ContinuationImpl 如同并没有起到什么作用,那接着来看在协程体内执行异步代码:

suspend {suspendFunc()}suspend fun suspendFunc() = suspendCoroutine<Int> { continuation ->thread {Thread.sleep(1000)continuation.resume(5)}}// 反编译后public final Object invokeSuspend(@NotNull Object $result) {Object var2 = IntrinsicsKt.getCOROUTINE_SUSPENDED();Object var10000;switch(this.label) {case 0:ResultKt.throwOnFailure($result);this.label = 1;var10000 = DeepKotlin3Kt.suspendFunc(this);if (var10000 == var2) {return var2;}break;case 1:ResultKt.throwOnFailure($result);var10000 = $result;break;default:throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");}return var10000;}public static final Object suspendFunc(@NotNull Continuation $completion) {boolean var1 = false;boolean var2 = false;boolean var3 = false;SafeContinuation var4 = new SafeContinuation(IntrinsicsKt.intercepted($completion));Continuation continuation = (Continuation)var4;int var6 = false;ThreadsKt.thread$default(false, false, (ClassLoader)null, (String)null, 0, (Function0)(new DeepKotlin3Kt$suspendFunc02$2$1(continuation)), 31, (Object)null);Object var10000 = var4.getOrThrow();if (var10000 == IntrinsicsKt.getCOROUTINE_SUSPENDED()) {DebugProbesKt.probeCoroutineSuspended($completion);}return var10000;}

resume 函数启动协程,invokeSuspend 函数第一次执行时 this.label == 0 执行 case 0 代码,this.label 变量赋值为 1, 而后判断假如 if (var10000 == var2)true 那么 invokeSuspend 函数返回 var2,也就是 COROUTINE_SUSPENDED 标识,在 resumeWith 函数内,判断假如 invokeSuspend 函数的返回值为 COROUTINE_SUSPENDEDreture。这也就是协程的挂起过程。

当线程执行结束,调用 resume 函数恢复协程时再次执行到 invokeSuspend 函数,这时 this.label == 1,执行 case 1 代码,直接返回结果 5。那在 resumeWith 函数内,这时就不会执行 return 了,最终会调用协程的 resumeWith 函数输出最终的结果,这也就是协程的恢复过程。

通过理解协程运行流程可以发现 ContinuationImpl 其实是协程挂起和恢复逻辑的真正执行者。也正是由于协程挂起和恢复逻辑的存在,所以我们可以像编写同步代码一样调用异步代码:

suspend {println("Coroutine start")println("Coroutine: ${System.currentTimeMillis()}")val resultFun = suspendThreadFun()println("Coroutine: suspendThreadFun-$resultFun-${System.currentTimeMillis()}")val result = suspendNoThreadFun()println("Coroutine: suspendNoThreadFun-$result-${System.currentTimeMillis()}")}.startCoroutine(object : Continuation<Unit> {override val context = EmptyCoroutineContextoverride fun resumeWith(result: Result<Unit>) {println("Coroutine End: $result")}})suspend fun suspendThreadFun() = suspendCoroutine<Int> { continuation ->thread {Thread.sleep(1000)continuation.resumeWith(Result.success(5))}}suspend fun suspendNoThreadFun() = suspendCoroutine<Int> { continuation ->continuation.resume(5)}//输出:Coroutine startCoroutine: 1627014868152Coroutine: suspendThreadFun-5-1627014869182Coroutine: suspendNoThreadFun-5-1627014869186Coroutine End: Success(kotlin.Unit)

创立协程作用域

在通过 createCoroutine 创立协程时,你会发现还可为它传递 receiver 参数,这个参数的作用是用于扩展协程体,一般称其为 协程作用域

public fun <R, T> (suspend R.() -> T).createCoroutine(receiver: R,completion: Continuation<T>): Continuation<Unit> =SafeContinuation(createCoroutineUnintercepted(receiver, completion).intercepted(), COROUTINE_SUSPENDED)

可以看到 suspend lambda 表达式也出现了变化。我们知道 () -> TFunction0lambda 表达式,R.() -> T 相当于 R 类的 () -> T 扩展。假如理解扩展函数的话就知道扩展函数会将所扩展的类作为其参数,那么 R.() -> T 也就是 Function1lambda 表达式了。

当然因为 suspend 关键字的作用,又添加了 Continuation 参数,所以最终看到的就是 Function1Function2

由于扩展函数的作用,所以可以在协程体内通过 this (可隐藏)调用 receiver 的函数或者者属性。示例如下:

launchCoroutine(ProducerScope<Int>()) {produce(1000)}fun <R, T> launchCoroutine(receiver: R, block: suspend R.() -> T) {block.startCoroutine(receiver, object : Continuation<T> {override val context = EmptyCoroutineContextoverride fun resumeWith(result: Result<T>) {println("Coroutine End: $result")}})}class ProducerScope<T> {fun produce(value: T) {println(value)}}

GlobalScope.launch 源码分析

理解上文创立协程的逻辑之后再来分析 GlobalScope.launch 就非常简单了。GlobalScope.launch 最终会执行到 CoroutineStart.invoke 函数:

AbstractCoroutine.kt

public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) {initParentJob()start(block, receiver, this)}

CoroutineStart.kt

public operator fun <R, T> invoke(block: suspend R.() -> T, receiver: R, completion: Continuation<T>): Unit =when (this) {DEFAULT -> block.startCoroutineCancellable(receiver, completion)ATOMIC -> block.startCoroutine(receiver, completion)UNDISPATCHED -> block.startCoroutineUndispatched(receiver, completion)LAZY -> Unit // will start lazily}

代码基本跟上文分析的一致。

自己设置协程上下文

协程上下文在协程中的作用非常大,有它在相当于协程有了装备卡槽一样。你可以将你想增加的上下文对象合并到 CoroutineContext 参数上,而后在其余地方使用。

CoroutineContext 的数据结构有如下特点:

  • 可以通过 [] 以相似 List 的方式访问任何一个协程上下文对象,[] 内是目标协程上下文。

  • 协程上下文可以通过 + 的方式依次累加,当然 += 也是可用的。

我们来自己设置一个协程上下文给协程增加一个名字:

public data class CoroutineName(val name: String) : AbstractCoroutineContextElement(CoroutineName) {public companion object Key : CoroutineContext.Key<CoroutineName>override fun toString(): String = "CoroutineName($name)"}

应用到示例中:

var coroutineContext: CoroutineContext = EmptyCoroutineContextcoroutineContext += CoroutineName("c0-01")suspend {println("Run Coroutine")}.startCoroutine(object : Continuation<Unit> {override fun resumeWith(result: Result<Unit>) {println("${context[CoroutineName]?.name}")}override val context = coroutineContext})//输出:Run Coroutinec0-01

其实协程已经为我们提供了 CoroutineName 实现。

自己设置协程阻拦器

通过实现阻拦器接口 ContinuationInterceptor 来定义阻拦器,由于阻拦器也是协程上下文的一类实现,所以使用阻拦器时将其增加到对应的协程上下文中就可。

公告一个日志阻拦器:

class LogInterceptor : ContinuationInterceptor {override val key = ContinuationInterceptoroverride fun <T> interceptContinuation(continuation: Continuation<T>) = LogContinuation(continuation)}class LogContinuation<T>(private val continuation: Continuation<T>) : Continuation<T> by continuation {override fun resumeWith(result: Result<T>) {println("before resumeWith: $result")continuation.resumeWith(result)println("after resumeWith")}}

阻拦器的关键阻拦函数是 interceptContinuation,可以根据需要返回一个新的 Continuation 实例。

在协程生命周期内每次恢复调用都会触发阻拦器。恢复调用有如下两种情况:

  • 协程启动时调用一次,通过恢复调用来开始执行协程体从开始到下一次挂起之间的逻辑。

  • 挂起点处假如异步挂起,则在恢复时再调用一次。

由此可知,恢复调用的次数为 n+1 次,其中 n 是协程体内真正挂起执行异步逻辑的挂起点的个数。

改写上面的例子:

// 异步挂起函数suspend fun suspendFunc02() = suspendCoroutine<Int> { continuation ->thread {continuation.resumeWith(Result.success(5))}}// 开启协程 - 未增加日志阻拦器suspend {suspendFunc02()suspendFunc02()}.startCoroutine(object : Continuation<Int> {override val context: CoroutineContext = EmptyCoroutineContextoverride fun resumeWith(result: Result<Int>) {...result.onSuccess {println("Coroutine End: ${context[CoroutineName]?.name}, $result")}}})// 输出如下Coroutine End: Success(5)// 开启协程 - 增加日志阻拦器suspend {suspendFunc02()suspendFunc02()}.startCoroutine(object : Continuation<Int> {override val context: CoroutineContext = LogInterceptor()override fun resumeWith(result: Result<Int>) {...result.onSuccess {println("Coroutine End: ${context[CoroutineName]?.name}, $result")}}})// 输出如下:before resumeWith: Success(kotlin.Unit)after resumeWithbefore resumeWith: Success(5)after resumeWithbefore resumeWith: Success(5)after resumeWith
说明
1. 本站所有资源来源于用户上传和网络,如有侵权请邮件联系站长!
2. 分享目的仅供大家学习和交流,您必须在下载后24小时内删除!
3. 不得使用于非法商业用途,不得违反国家法律。否则后果自负!
4. 本站提供的源码、模板、插件等等其他资源,都不包含技术服务请大家谅解!
5. 如有链接无法下载、失效或广告,请联系管理员处理!
6. 本站资源售价只是摆设,本站源码仅提供给会员学习使用!
7. 如遇到加密压缩包,请使用360解压,如遇到无法解压的请联系管理员
开心源码网 » Kotlin – 协程基础及原理

发表回复