在这里插入图片描述

在完成多轮「时间序列」类案例之后,本篇换一个完全不同的算法方向:任务依赖与执行调度
在真实项目中,我们经常需要处理带依赖关系的任务图:某些任务必须在其它任务完成后才能执行。

本案例基于 Kotlin Multiplatform(KMP)与 OpenHarmony,实现了一个任务依赖拓扑排序与执行批次规划引擎:

  • 使用拓扑排序(Topological Sort)对任务进行合法执行顺序计算;
  • 自动划分「执行批次」,同一批次内的任务可以并行执行;
  • 检测循环依赖(如 A 依赖 B,B 又依赖 A),并给出提示;
  • 通过 ArkTS 单页面板进行可视化展示。

一、任务依赖调度问题简介

典型场景:

  • 构建流水线:某些模块依赖其它模块先编译完成;
  • 数据处理作业 DAG:A 任务的输出是 B/C 的输入,必须先跑 A;
  • 发布流:先发布基础服务,再发布依赖它的业务服务;
  • 工作流引擎:任务之间存在前置条件与依赖关系。

这些问题都可以抽象为一个有向图(DAG):

  • 节点:任务(Task);
  • 有向边:依赖关系(dep -> task)。

目标是:

  1. 找到一个合法的执行顺序,使得每个任务都在它依赖的所有任务之后执行;
  2. 在顺序的基础上,划分「可以并行的批次」,提升整体执行效率;
  3. 若存在环(循环依赖),则进行检测与提示。

二、Kotlin 任务依赖拓扑排序引擎

1. 输入格式设计

为了与之前案例的风格保持一致,本案例采用纯文本输入格式:

TASK-A:TASK-B,TASK-C
TASK-B:TASK-D
TASK-C:
TASK-D:

也支持单行或使用 |; 作为分隔符:

TASK-A:TASK-B,TASK-C|TASK-B:TASK-D|TASK-C:|TASK-D:

含义说明:

  • TASK-A:TASK-B,TASK-C 表示 A 依赖 B 和 C(只有当 B、C 完成后,A 才能执行);
  • TASK-C: 表示 C 没有任何前置依赖;
  • 所有出现在任务名或依赖列表中的标识符,都会被视为图中的节点。

2. 核心分析入口:taskDependencySchedulerAnalyzer

App.kt 中,我们新增了一个通过 @JsExport 暴露的分析函数:

@JsExport
fun taskDependencySchedulerAnalyzer(inputData: String): String {
    val sanitized = inputData.trim()
    if (sanitized.isEmpty()) {
        return "❌ 输入为空,请按 TASK-A:TASK-B,TASK-C 形式提供任务依赖关系"
    }

    val graph = parseTaskDependencyGraph(sanitized)
        ?: return "❌ 未解析到任何有效任务,请检查输入格式是否包含 任务名:依赖列表"

    if (graph.nodes.isEmpty()) {
        return "❌ 任务集合为空,请至少定义一个任务"
    }

    val topoResult = topologicalSortWithLevels(graph)
    val builder = StringBuilder()

    builder.appendLine("🧩 任务依赖拓扑排序与执行批次规划报告")
    builder.appendLine("━━━━━━━━━━━━━━━━━━━━━━━━━━")
    builder.appendLine("任务总数: ${graph.nodes.size}")
    builder.appendLine()

    if (!topoResult.isDAG) {
        builder.appendLine("❌ 检测到循环依赖,无法生成完整的拓扑排序。")
        builder.appendLine()
        builder.appendLine("已处理任务数: ${topoResult.sortedOrder.size}")
        if (topoResult.sortedOrder.isNotEmpty()) {
            builder.append("已确认的部分执行顺序: ")
            builder.appendLine(topoResult.sortedOrder.joinToString(" -> "))
        }
        if (topoResult.cycleNodes.isNotEmpty()) {
            builder.append("疑似涉及循环的任务集合: ")
            builder.appendLine(topoResult.cycleNodes.sorted().joinToString(", "))
        }
        return builder.toString().trim()
    }

    // 拓扑排序结果
    builder.appendLine("✅ 拓扑排序结果")
    builder.appendLine(topoResult.sortedOrder.joinToString(" -> "))
    builder.appendLine()

    // 执行批次规划
    builder.appendLine("📌 执行批次规划(同一批次内任务可并行执行)")
    topoResult.levels.forEachIndexed { index, levelTasks ->
        val batchLabel = "第 ${index + 1} 批次"
        val tasksStr = levelTasks.joinToString(", ")
        builder.appendLine("$batchLabel: $tasksStr")
    }
    builder.appendLine()

    // 每个任务的最早开始批次
    builder.appendLine("📊 任务最早开始批次(基于依赖深度)")
    val taskToLevel = mutableMapOf<String, Int>()
    topoResult.levels.forEachIndexed { levelIndex, tasks ->
        tasks.forEach { task ->
            taskToLevel[task] = levelIndex + 1
        }
    }
    graph.nodes.sorted().forEach { task ->
        val level = taskToLevel[task] ?: 1
        val deps = graph.edges[task].orEmpty()
        val depsStr = if (deps.isEmpty()) "无" else deps.joinToString(", ")
        builder.appendLine("【$task】最早批次: $level, 依赖: $depsStr")
    }

    return builder.toString().trim()
}

输出内容包括:

  1. 拓扑排序结果:一个合法的任务执行序列;
  2. 执行批次列表:第 1、2、3 … 批次,每批次中可以并行执行的任务;
  3. 每个任务的最早开始批次:结合依赖深度给出调度建议;
  4. 若有循环依赖,则返回包含部分顺序和疑似循环任务集合的错误报告。

3. 任务依赖图解析

数据结构定义:

private data class TaskDependencyGraph(
    val nodes: Set<String>,
    val edges: Map<String, List<String>>
)

解析函数:

private fun parseTaskDependencyGraph(raw: String): TaskDependencyGraph? {
    val segments = raw.split("\n", ";", "|")
        .map { it.trim() }
        .filter { it.isNotEmpty() }

    if (segments.isEmpty()) return null

    val edges = mutableMapOf<String, MutableList<String>>()
    val allNodes = mutableSetOf<String>()

    for (segment in segments) {
        val colonIndex = segment.indexOf(':')
        if (colonIndex <= 0) continue

        val taskName = segment.substring(0, colonIndex).trim()
        val depsPart = segment.substring(colonIndex + 1).trim()

        if (taskName.isEmpty()) continue

        allNodes += taskName
        if (!edges.containsKey(taskName)) {
            edges[taskName] = mutableListOf()
        }

        if (depsPart.isNotEmpty()) {
            val deps = depsPart.split(',')
                .map { it.trim() }
                .filter { it.isNotEmpty() }
            for (dep in deps) {
                allNodes += dep
                edges[taskName]!!.add(dep)
                if (!edges.containsKey(dep)) {
                    edges[dep] = mutableListOf()
                }
            }
        }
    }

    if (allNodes.isEmpty()) return null
    return TaskDependencyGraph(nodes = allNodes, edges = edges)
}

要点

  • 既保证所有任务都出现在 nodes 中,也保证所有边都记录在 edges 中;
  • 对未显式定义但在依赖中出现的任务,自动补充为空依赖节点。

4. 拓扑排序与执行批次(Kahn 算法)

我们使用 Kahn 算法进行拓扑排序,并在过程中自然得到每一批次任务:

private data class TopoResult(
    val sortedOrder: List<String>,
    val levels: List<List<String>>,
    val isDAG: Boolean,
    val cycleNodes: Set<String>
)

private fun topologicalSortWithLevels(graph: TaskDependencyGraph): TopoResult {
    val indegree = mutableMapOf<String, Int>()
    graph.nodes.forEach { node -> indegree[node] = 0 }

    graph.edges.forEach { (task, deps) ->
        for (dep in deps) {
            // 边: dep -> task
            indegree[task] = (indegree[task] ?: 0) + 1
        }
    }

    val queue: ArrayDeque<String> = ArrayDeque()
    indegree.forEach { (node, deg) ->
        if (deg == 0) queue.addLast(node)
    }

    val sortedOrder = mutableListOf<String>()
    val levels = mutableListOf<List<String>>()

    while (queue.isNotEmpty()) {
        val currentLevel = mutableListOf<String>()
        val levelSize = queue.size
        repeat(levelSize) {
            val node = queue.removeFirst()
            sortedOrder += node
            currentLevel += node

            val dependents = graph.edges.filter { (_, deps) -> deps.contains(node) }.keys
            for (depTask in dependents) {
                val newDeg = (indegree[depTask] ?: 0) - 1
                indegree[depTask] = newDeg
                if (newDeg == 0) {
                    queue.addLast(depTask)
                }
            }
        }
        if (currentLevel.isNotEmpty()) {
            levels += currentLevel.sorted()
        }
    }

    val isDAG = sortedOrder.size == graph.nodes.size
    val cycleNodes = if (isDAG) emptySet() else indegree.filter { it.value > 0 }.keys

    return TopoResult(
        sortedOrder = sortedOrder,
        levels = levels,
        isDAG = isDAG,
        cycleNodes = cycleNodes
    )
}

说明

  • indegree 记录每个任务「尚未满足的前置依赖数」;
  • 每一轮从 queue 中取出所有当前入度为 0 的任务,作为同一批次
  • 删除这些任务相关的边,更新后继任务的入度;
  • 若最终 sortedOrder 中任务数小于节点数,说明存在

三、JavaScript / ArkTS 集成

通过 @JsExporttaskDependencySchedulerAnalyzer 已经被编译到 JS 模块 hellokjs.js 中,并在 hellokjs.d.ts 中有声明:

export declare function taskDependencySchedulerAnalyzer(inputData: string): string;

你可以在 ArkTS 中直接:

import { taskDependencySchedulerAnalyzer } from './hellokjs';

然后在页面逻辑中调用该函数,将输入文本传入,获取完整的分析报告字符串,再在 UI 中展示。


四、ArkTS 面板设计建议(可选)

如果你想把这个案例挂到首页(Index 页面),建议 UI 设计为:

  • 顶部标题:「任务依赖拓扑排序与执行批次规划」;
  • 输入区域:
    • 文本框支持多行任务依赖定义;
    • 右侧角标显示当前任务数;
  • 操作区域:
    • 「▶ 运行调度分析」按钮;
    • 「↻ 恢复示例依赖」按钮;
  • 结果区域:
    • 上半部分显示拓扑排序与批次列表;
    • 下半部分显示每个任务的最早批次与依赖详情;
    • 如遇循环依赖,醒目展示错误提示与疑似循环节点集合。

调用逻辑示例:

this.result = taskDependencySchedulerAnalyzer(this.inputData);

五、应用场景

  1. 构建流水线优化

    • 将项目中的模块构建任务抽象为依赖图,自动计算构建顺序与并行批次。
  2. 数据处理 DAG 调度

    • 对离线作业、数据清洗任务等进行拓扑分析,找到合理的执行批次,提升整体吞吐。
  3. 发布顺序与灰度策略设计

    • 在微服务架构中,根据服务依赖关系规划发布顺序,减少「依赖未就绪」导致的故障。
  4. 工作流引擎核心能力验证

    • 在设计工作流系统时,利用该算法验证任务图的合法性(无环)、并输出执行规划。

六、总结

本案例展示了如何在 KMP + OpenHarmony 环境下,实现一个通用的 任务依赖拓扑排序与执行批次规划引擎

  1. 使用文本配置描述任务与依赖,降低输入门槛;
  2. 采用 Kahn 算法完成拓扑排序,并顺带得到「按批次可并行执行」的层级划分;
  3. 通过详细的报告输出,让调度结果一目了然;
  4. 借助 KMP 的多平台能力,将调度核心逻辑复用于更广泛的终端设备。

与前面若干「时间序列」类案例不同,本篇聚焦于图与调度算法,为你的 KMP + OpenHarmony 算法案例库增加了一个「图论 / 调度」维度的实战示例。***
欢迎加入开源鸿蒙跨平台社区:https://openharmonycrossplatform.csdn.net

Logo

开源鸿蒙跨平台开发社区汇聚开发者与厂商,共建“一次开发,多端部署”的开源生态,致力于降低跨端开发门槛,推动万物智联创新。

更多推荐