官方回答是:

Kotlin 协程(Coroutines)是一种轻量级的线程管理机制,用于简化异步编程和并发任务处理。它允许你以同步的代码风格编写异步操作,避免了传统回调(Callback)或 Future/Promise 模式带来的 “回调地狱” 问题。

这种回答很模糊,我们用实例来回答:

我们现在全局作用域中启动一个协程

GlobalScope.launch(Dispatchers.Main) {
    
}

我们跟踪Dispatchers.Main

@JvmStatic
public actual val Main: MainCoroutineDispatcher get() = MainDispatcherLoader.dispatcher

继续跟踪MainDispatcherLoader.dispatcher

@JvmField
val dispatcher: MainCoroutineDispatcher = loadMainDispatcher()

private fun loadMainDispatcher(): MainCoroutineDispatcher {
    return try {
        val factories = if (FAST_SERVICE_LOADER_ENABLED) {
            FastServiceLoader.loadMainDispatcherFactory()
        } else {
            // We are explicitly using the
            // `ServiceLoader.load(MyClass::class.java, MyClass::class.java.classLoader).iterator()`
            // form of the ServiceLoader call to enable R8 optimization when compiled on Android.
            ServiceLoader.load(
                    MainDispatcherFactory::class.java,
                    MainDispatcherFactory::class.java.classLoader
            ).iterator().asSequence().toList()
        }
        @Suppress("ConstantConditionIf")
        factories.maxByOrNull { it.loadPriority }?.tryCreateDispatcher(factories)
            ?: createMissingDispatcher()
    } catch (e: Throwable) {

跟踪loadMainDispatcherFactory

internal fun loadMainDispatcherFactory(): List<MainDispatcherFactory> {
    val clz = MainDispatcherFactory::class.java
    if (!ANDROID_DETECTED) {
        return load(clz, clz.classLoader)
    }

    return try {
        val result = ArrayList<MainDispatcherFactory>(2)

public interface MainDispatcherFactory {
    public val loadPriority: Int // higher priority wins

    /**
     * Creates the main dispatcher. [allFactories] parameter contains all factories found by service loader.
     * This method is not guaranteed to be idempotent.
     *
     * It is required that this method fails with an exception instead of returning an instance that doesn't work
     * correctly as a [Delay].
     * The reason for this is that, on the JVM, [DefaultDelay] will use [Dispatchers.Main] for most delays by default
     * if this method returns an instance without throwing.
     */
    public fun createDispatcher(allFactories: List<MainDispatcherFactory>): MainCoroutineDispatchery

右键go to implementation 

internal class AndroidDispatcherFactory : MainDispatcherFactory {

    override fun createDispatcher(allFactories: List<MainDispatcherFactory>): MainCoroutineDispatcher {
        val mainLooper = Looper.getMainLooper() ?: throw IllegalStateException("The main looper is not available")
        return HandlerContext(mainLooper.asHandler(async = true))
    }

确保获得主线程handler,原来就是把代码块通过handler推进消息队列。再看

HandlerContext
internal class HandlerContext private constructor(
    private val handler: Handler,
    private val name: String?,
    private val invokeImmediately: Boolean
) : HandlerDispatcher(), Delay {
    /**
     * Creates [CoroutineDispatcher] for the given Android [handler].
     *
     * @param handler a handler.
     * @param name an optional name for debugging.
     */
    constructor(
        handler: Handler,
        name: String? = null
    ) : this(handler, name, false)

    @Volatile
    private var _immediate: HandlerContext? = if (invokeImmediately) this else null

    override val immediate: HandlerContext = _immediate ?:
        HandlerContext(handler, name, true).also { _immediate = it }

    override fun isDispatchNeeded(context: CoroutineContext): Boolean {
        return !invokeImmediately || Looper.myLooper() != handler.looper
    }

    override fun dispatch(context: CoroutineContext, block: Runnable) {
        if (!handler.post(block)) {
            cancelOnRejection(context, block)
        }
    }

果然是包装成runnable对象执行

接下来看看默认调度器。

GlobalScope.launch(Dispatchers.Default) {

}

我们跟踪 Dispatchers.Default

public actual object Dispatchers {
    @JvmStatic
    public actual val Default: CoroutineDispatcher = DefaultScheduler
internal object DefaultScheduler : SchedulerCoroutineDispatcher(
    CORE_POOL_SIZE, MAX_POOL_SIZE,
    IDLE_WORKER_KEEP_ALIVE_NS, DEFAULT_SCHEDULER_NAME
)

这几个参数是不是很眼熟,妥妥的线程池的参数配置啊。

我们再来看基类SchedulerCoroutineDispatcher

internal open class SchedulerCoroutineDispatcher(
    private val corePoolSize: Int = CORE_POOL_SIZE,
    private val maxPoolSize: Int = MAX_POOL_SIZE,
    private val idleWorkerKeepAliveNs: Long = IDLE_WORKER_KEEP_ALIVE_NS,
    private val schedulerName: String = "CoroutineScheduler",
) : ExecutorCoroutineDispatcher() {

    
internal open class SchedulerCoroutineDispatcher(
    private val corePoolSize: Int = CORE_POOL_SIZE,
    private val maxPoolSize: Int = MAX_POOL_SIZE,
    private val idleWorkerKeepAliveNs: Long = IDLE_WORKER_KEEP_ALIVE_NS,
    private val schedulerName: String = "CoroutineScheduler",
) : ExecutorCoroutineDispatcher() {
override val executor: Executor
    get() = coroutineScheduler

// This is variable for test purposes, so that we can reinitialize from clean state
private var coroutineScheduler = createScheduler()
private fun createScheduler() =
    CoroutineScheduler(corePoolSize, maxPoolSize, idleWorkerKeepAliveNs, schedulerName)

我们看看CoroutineScheduler到底是什么

@Suppress("NOTHING_TO_INLINE")
internal class CoroutineScheduler(
    @JvmField val corePoolSize: Int,
    @JvmField val maxPoolSize: Int,
    @JvmField val idleWorkerKeepAliveNs: Long = IDLE_WORKER_KEEP_ALIVE_NS,
    @JvmField val schedulerName: String = DEFAULT_SCHEDULER_NAME
) : Executor, Closeable {

.....

private fun createNewWorker(): Int {
    val worker: Worker
    return synchronized(workers) {
        // Make sure we're not trying to resurrect terminated scheduler
        if (isTerminated) return -1
        val state = controlState.value
        val created = createdWorkers(state)
        val blocking = blockingTasks(state)
        val cpuWorkers = (created - blocking).coerceAtLeast(0)
        // Double check for overprovision
        if (cpuWorkers >= corePoolSize) return 0
        if (created >= maxPoolSize) return 0
        // start & register new worker, commit index only after successful creation
        val newIndex = createdWorkers + 1
        require(newIndex > 0 && workers[newIndex] == null)
        /*
         * 1) Claim the slot (under a lock) by the newly created worker
         * 2) Make it observable by increment created workers count
         * 3) Only then start the worker, otherwise it may miss its own creation
         */
        worker = Worker(newIndex)
        workers.setSynchronized(newIndex, worker)
        require(newIndex == incrementCreatedWorkers())
        cpuWorkers + 1
    }.also { worker.start() } // Start worker when the lock is released to reduce contention, see #3652
}

我们看看这个worker是什么

internal inner class Worker private constructor() : Thread()

其实就是个线程。看来default调度器就是通过线程池来调度代码块。比较适合CPU密集任务。

Logo

开源鸿蒙跨平台开发社区汇聚开发者与厂商,共建“一次开发,多端部署”的开源生态,致力于降低跨端开发门槛,推动万物智联创新。

更多推荐