Kotlin协程核心库分析-3 Job链
概述我们知道协程启动后可以在内部在启动一个子协程,当存在这样的结构化层级时父协程的取消会引起子协程立即跟随取消,当子协程存在未捕获的异常时同样会级联到父协程的取消。并且协程启动后返回的Job对象时可以监听完成和取消事件。这一切背后的都靠背后一个链表保存所有的子协程和监听器。...
概述
我们知道协程启动后可以在内部在启动一个子协程,当存在这样的结构化层级时父协程的取消会引起子协程立即跟随取消,当子协程存在未捕获的异常时同样会级联到父协程的取消。并且协程启动后返回的Job对象时可以监听完成和取消事件。这一切背后依赖于一个Job内部的链表保存所有的子协程和监听器。
所以我们来看看下面的列子:
fun main() {
//创建一个Job
val job = GlobalScope.launch {
Thread.sleep(100)
println("协程完成")
}
//第一个监听
val disposableOne = job.invokeOnCompletion {
println("disposableOne 完成")
}
//第二个监听
val disposableTwo = job.invokeOnCompletion {
println("disposableTwo 完成")
}
//第三个监听
val disposableThree = job.invokeOnCompletion {
println("disposableThree 完成")
}
TimeUnit.SECONDS.sleep(1)
println("结束")
}
输出:
协程完成
disposableOne 完成
disposableTwo 完成
disposableThree 完成
结束
我们文本讲探讨job.invokeOnCompletion背后的原理。
首先我们先看返回的job的类继承图:
StandaloneCoroutine是启动协程后返回的一个Job实现类,而真正的实现位于JobSupport类上。
我们方法的实现:
//JobSupport.kt
//Job内部的状态对象
internal val state: Any? get() {
_state.loop { state ->
if (state !is OpDescriptor) return state
state.perform(this)
}
}
//轮询状态,注意这里是一个内联函数block函数必须显示显示return 才可结束
private inline fun loopOnState(block: (Any?) -> Unit): Nothing {
while (true) {
block(state)
}
}
//调用重载函数 onCancelling 取消的时候是否回调,invokeImmediately 已经结束的job是否回调
public final override fun invokeOnCompletion(handler: CompletionHandler): DisposableHandle =
invokeOnCompletion(onCancelling = false, invokeImmediately = true, handler = handler)
public final override fun invokeOnCompletion(
onCancelling: Boolean,
invokeImmediately: Boolean,
handler: CompletionHandler
): DisposableHandle {
var nodeCache: JobNode<*>? = null
//这里只有return才会返回
loopOnState { state ->
when (state) {
//省略
}
}
}
我们可以把重点放在loopOnState传入的闭包函数中,
public final override fun invokeOnCompletion(
onCancelling: Boolean,
invokeImmediately: Boolean,
handler: CompletionHandler
): DisposableHandle {
var nodeCache: JobNode<*>? = null
loopOnState { state ->
when (state) {
is Empty -> {
if (state.isActive) {
val node = nodeCache ?: makeNode(handler, onCancelling).also { nodeCache = it }
if (_state.compareAndSet(state, node)) return node
} else
promoteEmptyToNodeList(state) // that way we can add listener for non-active coroutine
}
is Incomplete -> {
val list = state.list
if (list == null) { // SINGLE/SINGLE+
promoteSingleToNodeList(state as JobNode<*>)
} else {
var rootCause: Throwable? = null
var handle: DisposableHandle = NonDisposableHandle
if (onCancelling && state is Finishing) {
synchronized(state) {
rootCause = state.rootCause
if (rootCause == null || handler.isHandlerOf<ChildHandleNode>() && !state.isCompleting) {
val node = nodeCache ?: makeNode(handler, onCancelling).also { nodeCache = it }
if (!addLastAtomic(state, list, node)) return@loopOnState // retry
if (rootCause == null) return node
handle = node
}
}
}
if (rootCause != null) {
if (invokeImmediately) handler.invokeIt(rootCause)
return handle
} else {
val node = nodeCache ?: makeNode(handler, onCancelling).also { nodeCache = it }
if (addLastAtomic(state, list, node)) return node
}
}
}
else -> {
if (invokeImmediately) handler.invokeIt((state as? CompletedExceptionally)?.cause)
return NonDisposableHandle
}
}
}
}
JobSupport默认状态state是一个Empty对象,所以我们第一次调用的时候逻辑如下
//循环的时候缓存
var nodeCache: JobNode<*>? = null
loopOnState { state ->
when (state) {
//第一次默认为Empty状态
is Empty -> {
//当前协程是存活的所以是true
if (state.isActive) {
//创建一个Node 然后把当前状态设置成Node
val node = nodeCache ?: makeNode(handler, onCancelling).also { nodeCache = it }
//CAS设置当前状态为新建的Node
if (_state.compareAndSet(state, node)) return node
} else{
}
}
下列函数用于创建一个监听节点,第一个参数的是回调函数,第二个参数是判断是否监听取消事件makeNode(handler, onCancelling)
private fun makeNode(handler: CompletionHandler, onCancelling: Boolean): JobNode<*> {
return if (onCancelling)
(handler as? JobCancellingNode<*>)?.also { assert { it.job === this } }
?: InvokeOnCancelling(this, handler)
else
//这里我们只监听完成的所以创建InvokeOnCompletion
(handler as? JobNode<*>)?.also { assert { it.job === this && it !is JobCancellingNode } }
?: InvokeOnCompletion(this, handler)
}
我们简单的看下InvokeOnCompletion继承图:
我们只需要注意两个地方实现了Job接口,和一个LockFreeLinkedListNode类。Job不解释了(返回的新的Job又可以在其上添加监听接口哦,这是父子协程核心),而LockFreeLinkedListNode是一个线程安全链表结构。
LockFreeLinkedListNode简单讲解:
我们不过多讲解如何线程安全(自旋+CAS),我们只大致说明结构.
public actual open class LockFreeLinkedListNode {
//下一个节点
private val _next = atomic<Any>(this)
//上一个节点
private val _prev = atomic(this)
}
这是最核心的链表结构,内部有两个指针分别指向上一个Node和下一个Node.
比较核心的方法:
//LockFreeLinkedListNode.kt
/**
* 插入一个节点在当前节点_prev上
*/
public actual inline fun addLastIf(node: Node, crossinline condition: () -> Boolean): Boolean {
val condAdd = makeCondAddOp(node, condition)
while (true) { // lock-free loop on prev.next
val prev = prevNode // sentinel node is never removed, so prev is always defined
when (prev.tryCondAddNext(node, this, condAdd)) {
SUCCESS -> return true
FAILURE -> return false
}
}
}
举个例子吧:
调用之前的:
NodeTwo调用addLastIf函数插入Node New:
总结就是调用后向前插入即可。
我们回过头来继续分析:
//循环的时候缓存
var nodeCache: JobNode<*>? = null
loopOnState { state ->
when (state) {
//第一次默认为Empty状态
is Empty -> {
//当前协程是存活的所以是true
if (state.isActive) {
//创建一个Node 然后把当前状态设置成Node
val node = nodeCache ?: makeNode(handler, onCancelling).also { nodeCache = it }
//CAS设置当前状态为新建的Node
if (_state.compareAndSet(state, node)) return node
} else{
}
}
当有第一个监听器时创建后把状态改变为这个新创建的节点然后返回.
我们继续分析有第二个监听节点的时候:
fun main() {
//创建一个Job
val job = GlobalScope.launch {
Thread.sleep(100)
println("协程完成")
}
//第一个监听
val disposableOne = job.invokeOnCompletion {
println("disposableOne 完成")
}
// 分析到这里
//第二个监听
val disposableTwo = job.invokeOnCompletion {
println("disposableTwo 完成")
}
}
我们看下第二个disposableTwo源码走向:
public final override fun invokeOnCompletion(
onCancelling: Boolean,
invokeImmediately: Boolean,
handler: CompletionHandler
): DisposableHandle {
var nodeCache: JobNode<*>? = null
loopOnState { state ->
when (state) {
//由于不是第一次不会走这里
is Empty -> {
//....
}
//我们可以从第一次创建的节点继承图知道,第一次插入节点继承Incomplete
is Incomplete -> {
val list = state.list
if (list == null) {
//我们第一次创建的Node的list必然为空
//此处代码创建了一个list节点作为一个新的状态,它实际没有
//包含任务监听器,仅仅是一个哨兵节点或者头节点
//然后把当前状态变为这个list对象,然后把第一次创建Node放入,接着继续循环
promoteSingleToNodeList(state as JobNode<*>)
} else {
var rootCause: Throwable? = null
var handle: DisposableHandle = NonDisposableHandle
//当前协程已经结束,这里我们为false
if (onCancelling && state is Finishing) {
//...
}
//异常取消原因当然这里rootCause==null
if (rootCause != null) {
if (invokeImmediately) handler.invokeIt(rootCause)
return handle
} else {
//最终走到这里,此时状态是一个list对象,我们把当前状态
//这里我们创建一个节点放入list链表中
val node = nodeCache ?: makeNode(handler, onCancelling).also { nodeCache = it }
if (addLastAtomic(state, list, node)) return node
}
}
}
else -> { // is complete
//协程完成的时候会走到这里 ,当然我们目前不会走到这
//...
}
}
}
}
第一步:
创建一个list对象作为新的state,并把第一次创建的node放入链表
promoteSingleToNodeList(state as JobNode<*>)
第二步:
创建一个新的Node
val node = nodeCache ?: makeNode(handler, onCancelling).also { nodeCache = it }
第三步:
放入新建的Node链表
promoteSingleToNodeList讲解:
//JobSupport.kt
//参数state 就是我们第一次创建的对象
private fun promoteSingleToNodeList(state: JobNode<*>) {
//创建一个List对象,并且这个state将state的next和prev字段设置为List
//注意List也是一个Node对象,也有next和prev字段。此时新建list的
//next和prev的设置为当前state。也就是形成了一个循环双向列表
state.addOneIfEmpty(NodeList())
//返回新创建的list状态
val list = state.nextNode
//讲当前的对象状态设置为创建的List对象(本质也是个Node)
_state.compareAndSet(state, list)
}
addOneIfEmpty 函数的作用和addLastIf(此函数上文有讲解)差不多,不过addOneIfEmpty会将调用者next和prev字段同时设置为传入参数。
我们看看这个NodeList对象继承关系:
internal class NodeList : LockFreeLinkedListHead(), Incomplete {
override val isActive: Boolean get() = true
override val list: NodeList get() = this//注意这个对象list默认部位空
}
我们发现它与第一次创建的Node区别在于他不在实现Job,第二个list属于不在为空。
- 运行前:

state.addOneIfEmpty(NodeList())运行后:
promoteSingleToNodeList(state: JobNode<*>)整个方法执行后(注意state变化为list):

最后在创建一个新的Node插入到其链表

最后我们插入第三个也差不多,状态不会改变依然指向NodeList,只不过在NodeList前插入了一个点而已。
父子协程
文字开篇说过父子协程也是依靠上文的链表实现的,我们现在来看下如下代码:
val job = GlobalScope.launch {
//启动一个子协程
launch {
Thread.sleep(1000)
}
Thread.sleep(100)
println("协程完成")
}
再看下launch
public fun CoroutineScope.launch(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> Unit
): Job {
val newContext = newCoroutineContext(context)
val coroutine = if (start.isLazy)
LazyStandaloneCoroutine(newContext, block) else
//看看标准的协程如何构建
StandaloneCoroutine(newContext, active = true)
coroutine.start(start, coroutine, block)
return coroutine
}
private open class StandaloneCoroutine(
parentContext: CoroutineContext,
active: Boolean
) : AbstractCoroutine<Unit>(parentContext, active) {
}
public abstract class AbstractCoroutine<in T>(
protected val parentContext: CoroutineContext,
active: Boolean = true
) : JobSupport(active), Job, Continuation<T>, CoroutineScope {
internal fun initParentJob() {
initParentJobInternal(parentContext[Job])
}
public fun start(start: CoroutineStart, block: suspend () -> T) {
initParentJob()
start(block, this)
}
}
我们看到启动协程的时候调用initParentJob初始化父协程,
//JobSupport.kt
internal fun initParentJobInternal(parent: Job?) {
if (parent == null) {
parentHandle = NonDisposableHandle
return
}
//确保父协程已经启动
parent.start()
//核心方法我们继续跟进去看看
val handle = parent.attachChild(this)
parentHandle = handle
if (isCompleted) {
handle.dispose()
parentHandle = NonDisposableHandle // release it just in case, to aid GC
}
}
我们最后看到最终调用到父的invokeOnCompletion函数,将自己插入NodeList中,所以当父协程取消时子协程可以及时收到通知而取消自身
public final override fun attachChild(child: ChildJob): ChildHandle {
return invokeOnCompletion(onCancelling = true, handler = ChildHandleNode(this, child).asHandler) as ChildHandle
}
更多推荐



所有评论(0)