kotlin协程之什么是协程
编写异步操作,避免了传统回调(Callback)或 Future/Promise 模式带来的 “回调地狱” 问题。Kotlin 协程(Coroutines)是一种轻量级的线程管理机制,用于简化异步编程和并发任务处理。确保是在主线程,而且获得主线程handler,原来就是把代码块通过handler推进消息队列。继续跟踪MainDispatcherLoader.dispatcher。右键go to i
·
官方回答是:
Kotlin 协程(Coroutines)是一种轻量级的线程管理机制,用于简化异步编程和并发任务处理。它允许你以同步的代码风格编写异步操作,避免了传统回调(Callback)或 Future/Promise 模式带来的 “回调地狱” 问题。
这种回答很模糊,我们用实例来回答:
我们现在全局作用域中启动一个协程
GlobalScope.launch(Dispatchers.Main) {
}
我们跟踪Dispatchers.Main
@JvmStatic
public actual val Main: MainCoroutineDispatcher get() = MainDispatcherLoader.dispatcher
继续跟踪MainDispatcherLoader.dispatcher
@JvmField val dispatcher: MainCoroutineDispatcher = loadMainDispatcher() private fun loadMainDispatcher(): MainCoroutineDispatcher { return try { val factories = if (FAST_SERVICE_LOADER_ENABLED) { FastServiceLoader.loadMainDispatcherFactory() } else { // We are explicitly using the // `ServiceLoader.load(MyClass::class.java, MyClass::class.java.classLoader).iterator()` // form of the ServiceLoader call to enable R8 optimization when compiled on Android. ServiceLoader.load( MainDispatcherFactory::class.java, MainDispatcherFactory::class.java.classLoader ).iterator().asSequence().toList() } @Suppress("ConstantConditionIf") factories.maxByOrNull { it.loadPriority }?.tryCreateDispatcher(factories) ?: createMissingDispatcher() } catch (e: Throwable) {
跟踪loadMainDispatcherFactory
internal fun loadMainDispatcherFactory(): List<MainDispatcherFactory> {
val clz = MainDispatcherFactory::class.java
if (!ANDROID_DETECTED) {
return load(clz, clz.classLoader)
}
return try {
val result = ArrayList<MainDispatcherFactory>(2)
public interface MainDispatcherFactory {
public val loadPriority: Int // higher priority wins
/**
* Creates the main dispatcher. [allFactories] parameter contains all factories found by service loader.
* This method is not guaranteed to be idempotent.
*
* It is required that this method fails with an exception instead of returning an instance that doesn't work
* correctly as a [Delay].
* The reason for this is that, on the JVM, [DefaultDelay] will use [Dispatchers.Main] for most delays by default
* if this method returns an instance without throwing.
*/
public fun createDispatcher(allFactories: List<MainDispatcherFactory>): MainCoroutineDispatchery
右键go to implementation
internal class AndroidDispatcherFactory : MainDispatcherFactory {
override fun createDispatcher(allFactories: List<MainDispatcherFactory>): MainCoroutineDispatcher {
val mainLooper = Looper.getMainLooper() ?: throw IllegalStateException("The main looper is not available")
return HandlerContext(mainLooper.asHandler(async = true))
}
确保获得主线程handler,原来就是把代码块通过handler推进消息队列。再看
HandlerContext
internal class HandlerContext private constructor(
private val handler: Handler,
private val name: String?,
private val invokeImmediately: Boolean
) : HandlerDispatcher(), Delay {
/**
* Creates [CoroutineDispatcher] for the given Android [handler].
*
* @param handler a handler.
* @param name an optional name for debugging.
*/
constructor(
handler: Handler,
name: String? = null
) : this(handler, name, false)
@Volatile
private var _immediate: HandlerContext? = if (invokeImmediately) this else null
override val immediate: HandlerContext = _immediate ?:
HandlerContext(handler, name, true).also { _immediate = it }
override fun isDispatchNeeded(context: CoroutineContext): Boolean {
return !invokeImmediately || Looper.myLooper() != handler.looper
}
override fun dispatch(context: CoroutineContext, block: Runnable) {
if (!handler.post(block)) {
cancelOnRejection(context, block)
}
}
果然是包装成runnable对象执行
接下来看看默认调度器。
GlobalScope.launch(Dispatchers.Default) {
}
我们跟踪 Dispatchers.Default
public actual object Dispatchers {
@JvmStatic
public actual val Default: CoroutineDispatcher = DefaultScheduler
internal object DefaultScheduler : SchedulerCoroutineDispatcher( CORE_POOL_SIZE, MAX_POOL_SIZE, IDLE_WORKER_KEEP_ALIVE_NS, DEFAULT_SCHEDULER_NAME )
这几个参数是不是很眼熟,妥妥的线程池的参数配置啊。
我们再来看基类SchedulerCoroutineDispatcher
internal open class SchedulerCoroutineDispatcher(
private val corePoolSize: Int = CORE_POOL_SIZE,
private val maxPoolSize: Int = MAX_POOL_SIZE,
private val idleWorkerKeepAliveNs: Long = IDLE_WORKER_KEEP_ALIVE_NS,
private val schedulerName: String = "CoroutineScheduler",
) : ExecutorCoroutineDispatcher() {
internal open class SchedulerCoroutineDispatcher(
private val corePoolSize: Int = CORE_POOL_SIZE,
private val maxPoolSize: Int = MAX_POOL_SIZE,
private val idleWorkerKeepAliveNs: Long = IDLE_WORKER_KEEP_ALIVE_NS,
private val schedulerName: String = "CoroutineScheduler",
) : ExecutorCoroutineDispatcher() {
override val executor: Executor
get() = coroutineScheduler
// This is variable for test purposes, so that we can reinitialize from clean state
private var coroutineScheduler = createScheduler()
private fun createScheduler() =
CoroutineScheduler(corePoolSize, maxPoolSize, idleWorkerKeepAliveNs, schedulerName)
我们看看CoroutineScheduler到底是什么
@Suppress("NOTHING_TO_INLINE")
internal class CoroutineScheduler(
@JvmField val corePoolSize: Int,
@JvmField val maxPoolSize: Int,
@JvmField val idleWorkerKeepAliveNs: Long = IDLE_WORKER_KEEP_ALIVE_NS,
@JvmField val schedulerName: String = DEFAULT_SCHEDULER_NAME
) : Executor, Closeable {
.....
private fun createNewWorker(): Int { val worker: Worker return synchronized(workers) { // Make sure we're not trying to resurrect terminated scheduler if (isTerminated) return -1 val state = controlState.value val created = createdWorkers(state) val blocking = blockingTasks(state) val cpuWorkers = (created - blocking).coerceAtLeast(0) // Double check for overprovision if (cpuWorkers >= corePoolSize) return 0 if (created >= maxPoolSize) return 0 // start & register new worker, commit index only after successful creation val newIndex = createdWorkers + 1 require(newIndex > 0 && workers[newIndex] == null) /* * 1) Claim the slot (under a lock) by the newly created worker * 2) Make it observable by increment created workers count * 3) Only then start the worker, otherwise it may miss its own creation */ worker = Worker(newIndex) workers.setSynchronized(newIndex, worker) require(newIndex == incrementCreatedWorkers()) cpuWorkers + 1 }.also { worker.start() } // Start worker when the lock is released to reduce contention, see #3652 }
我们看看这个worker是什么
internal inner class Worker private constructor() : Thread()
其实就是个线程。看来default调度器就是通过线程池来调度代码块。比较适合CPU密集任务。
更多推荐


所有评论(0)