kmp openharmony 任务依赖拓扑排序与执行批次规划
任务依赖调度与拓扑排序摘要 本文介绍了一个基于Kotlin Multiplatform的任务依赖调度系统,主要功能包括: 任务依赖图解析:支持多种输入格式(文本、单行、分隔符),将任务及其依赖关系解析为有向无环图(DAG) 拓扑排序引擎:实现核心算法计算任务的合法执行顺序,确保每个任务在其依赖项之后执行 执行批次规划:自动划分可并行执行的批次,优化整体执行效率 循环依赖检测:当检测到循环依赖时提供
在完成多轮「时间序列」类案例之后,本篇换一个完全不同的算法方向:任务依赖与执行调度。
在真实项目中,我们经常需要处理带依赖关系的任务图:某些任务必须在其它任务完成后才能执行。
本案例基于 Kotlin Multiplatform(KMP)与 OpenHarmony,实现了一个任务依赖拓扑排序与执行批次规划引擎:
- 使用拓扑排序(Topological Sort)对任务进行合法执行顺序计算;
- 自动划分「执行批次」,同一批次内的任务可以并行执行;
- 检测循环依赖(如 A 依赖 B,B 又依赖 A),并给出提示;
- 通过 ArkTS 单页面板进行可视化展示。
一、任务依赖调度问题简介
典型场景:
- 构建流水线:某些模块依赖其它模块先编译完成;
- 数据处理作业 DAG:A 任务的输出是 B/C 的输入,必须先跑 A;
- 发布流:先发布基础服务,再发布依赖它的业务服务;
- 工作流引擎:任务之间存在前置条件与依赖关系。
这些问题都可以抽象为一个有向图(DAG):
- 节点:任务(Task);
- 有向边:依赖关系(
dep -> task)。
目标是:
- 找到一个合法的执行顺序,使得每个任务都在它依赖的所有任务之后执行;
- 在顺序的基础上,划分「可以并行的批次」,提升整体执行效率;
- 若存在环(循环依赖),则进行检测与提示。
二、Kotlin 任务依赖拓扑排序引擎
1. 输入格式设计
为了与之前案例的风格保持一致,本案例采用纯文本输入格式:
TASK-A:TASK-B,TASK-C
TASK-B:TASK-D
TASK-C:
TASK-D:
也支持单行或使用 |、; 作为分隔符:
TASK-A:TASK-B,TASK-C|TASK-B:TASK-D|TASK-C:|TASK-D:
含义说明:
TASK-A:TASK-B,TASK-C表示 A 依赖 B 和 C(只有当 B、C 完成后,A 才能执行);TASK-C:表示 C 没有任何前置依赖;- 所有出现在任务名或依赖列表中的标识符,都会被视为图中的节点。
2. 核心分析入口:taskDependencySchedulerAnalyzer
在 App.kt 中,我们新增了一个通过 @JsExport 暴露的分析函数:
@JsExport
fun taskDependencySchedulerAnalyzer(inputData: String): String {
val sanitized = inputData.trim()
if (sanitized.isEmpty()) {
return "❌ 输入为空,请按 TASK-A:TASK-B,TASK-C 形式提供任务依赖关系"
}
val graph = parseTaskDependencyGraph(sanitized)
?: return "❌ 未解析到任何有效任务,请检查输入格式是否包含 任务名:依赖列表"
if (graph.nodes.isEmpty()) {
return "❌ 任务集合为空,请至少定义一个任务"
}
val topoResult = topologicalSortWithLevels(graph)
val builder = StringBuilder()
builder.appendLine("🧩 任务依赖拓扑排序与执行批次规划报告")
builder.appendLine("━━━━━━━━━━━━━━━━━━━━━━━━━━")
builder.appendLine("任务总数: ${graph.nodes.size}")
builder.appendLine()
if (!topoResult.isDAG) {
builder.appendLine("❌ 检测到循环依赖,无法生成完整的拓扑排序。")
builder.appendLine()
builder.appendLine("已处理任务数: ${topoResult.sortedOrder.size}")
if (topoResult.sortedOrder.isNotEmpty()) {
builder.append("已确认的部分执行顺序: ")
builder.appendLine(topoResult.sortedOrder.joinToString(" -> "))
}
if (topoResult.cycleNodes.isNotEmpty()) {
builder.append("疑似涉及循环的任务集合: ")
builder.appendLine(topoResult.cycleNodes.sorted().joinToString(", "))
}
return builder.toString().trim()
}
// 拓扑排序结果
builder.appendLine("✅ 拓扑排序结果")
builder.appendLine(topoResult.sortedOrder.joinToString(" -> "))
builder.appendLine()
// 执行批次规划
builder.appendLine("📌 执行批次规划(同一批次内任务可并行执行)")
topoResult.levels.forEachIndexed { index, levelTasks ->
val batchLabel = "第 ${index + 1} 批次"
val tasksStr = levelTasks.joinToString(", ")
builder.appendLine("$batchLabel: $tasksStr")
}
builder.appendLine()
// 每个任务的最早开始批次
builder.appendLine("📊 任务最早开始批次(基于依赖深度)")
val taskToLevel = mutableMapOf<String, Int>()
topoResult.levels.forEachIndexed { levelIndex, tasks ->
tasks.forEach { task ->
taskToLevel[task] = levelIndex + 1
}
}
graph.nodes.sorted().forEach { task ->
val level = taskToLevel[task] ?: 1
val deps = graph.edges[task].orEmpty()
val depsStr = if (deps.isEmpty()) "无" else deps.joinToString(", ")
builder.appendLine("【$task】最早批次: $level, 依赖: $depsStr")
}
return builder.toString().trim()
}
输出内容包括:
- 拓扑排序结果:一个合法的任务执行序列;
- 执行批次列表:第 1、2、3 … 批次,每批次中可以并行执行的任务;
- 每个任务的最早开始批次:结合依赖深度给出调度建议;
- 若有循环依赖,则返回包含部分顺序和疑似循环任务集合的错误报告。
3. 任务依赖图解析
数据结构定义:
private data class TaskDependencyGraph(
val nodes: Set<String>,
val edges: Map<String, List<String>>
)
解析函数:
private fun parseTaskDependencyGraph(raw: String): TaskDependencyGraph? {
val segments = raw.split("\n", ";", "|")
.map { it.trim() }
.filter { it.isNotEmpty() }
if (segments.isEmpty()) return null
val edges = mutableMapOf<String, MutableList<String>>()
val allNodes = mutableSetOf<String>()
for (segment in segments) {
val colonIndex = segment.indexOf(':')
if (colonIndex <= 0) continue
val taskName = segment.substring(0, colonIndex).trim()
val depsPart = segment.substring(colonIndex + 1).trim()
if (taskName.isEmpty()) continue
allNodes += taskName
if (!edges.containsKey(taskName)) {
edges[taskName] = mutableListOf()
}
if (depsPart.isNotEmpty()) {
val deps = depsPart.split(',')
.map { it.trim() }
.filter { it.isNotEmpty() }
for (dep in deps) {
allNodes += dep
edges[taskName]!!.add(dep)
if (!edges.containsKey(dep)) {
edges[dep] = mutableListOf()
}
}
}
}
if (allNodes.isEmpty()) return null
return TaskDependencyGraph(nodes = allNodes, edges = edges)
}
要点:
- 既保证所有任务都出现在
nodes中,也保证所有边都记录在edges中; - 对未显式定义但在依赖中出现的任务,自动补充为空依赖节点。
4. 拓扑排序与执行批次(Kahn 算法)
我们使用 Kahn 算法进行拓扑排序,并在过程中自然得到每一批次任务:
private data class TopoResult(
val sortedOrder: List<String>,
val levels: List<List<String>>,
val isDAG: Boolean,
val cycleNodes: Set<String>
)
private fun topologicalSortWithLevels(graph: TaskDependencyGraph): TopoResult {
val indegree = mutableMapOf<String, Int>()
graph.nodes.forEach { node -> indegree[node] = 0 }
graph.edges.forEach { (task, deps) ->
for (dep in deps) {
// 边: dep -> task
indegree[task] = (indegree[task] ?: 0) + 1
}
}
val queue: ArrayDeque<String> = ArrayDeque()
indegree.forEach { (node, deg) ->
if (deg == 0) queue.addLast(node)
}
val sortedOrder = mutableListOf<String>()
val levels = mutableListOf<List<String>>()
while (queue.isNotEmpty()) {
val currentLevel = mutableListOf<String>()
val levelSize = queue.size
repeat(levelSize) {
val node = queue.removeFirst()
sortedOrder += node
currentLevel += node
val dependents = graph.edges.filter { (_, deps) -> deps.contains(node) }.keys
for (depTask in dependents) {
val newDeg = (indegree[depTask] ?: 0) - 1
indegree[depTask] = newDeg
if (newDeg == 0) {
queue.addLast(depTask)
}
}
}
if (currentLevel.isNotEmpty()) {
levels += currentLevel.sorted()
}
}
val isDAG = sortedOrder.size == graph.nodes.size
val cycleNodes = if (isDAG) emptySet() else indegree.filter { it.value > 0 }.keys
return TopoResult(
sortedOrder = sortedOrder,
levels = levels,
isDAG = isDAG,
cycleNodes = cycleNodes
)
}
说明:
indegree记录每个任务「尚未满足的前置依赖数」;- 每一轮从
queue中取出所有当前入度为 0 的任务,作为同一批次; - 删除这些任务相关的边,更新后继任务的入度;
- 若最终
sortedOrder中任务数小于节点数,说明存在环。
三、JavaScript / ArkTS 集成
通过 @JsExport,taskDependencySchedulerAnalyzer 已经被编译到 JS 模块 hellokjs.js 中,并在 hellokjs.d.ts 中有声明:
export declare function taskDependencySchedulerAnalyzer(inputData: string): string;
你可以在 ArkTS 中直接:
import { taskDependencySchedulerAnalyzer } from './hellokjs';
然后在页面逻辑中调用该函数,将输入文本传入,获取完整的分析报告字符串,再在 UI 中展示。
四、ArkTS 面板设计建议(可选)
如果你想把这个案例挂到首页(Index 页面),建议 UI 设计为:
- 顶部标题:「任务依赖拓扑排序与执行批次规划」;
- 输入区域:
- 文本框支持多行任务依赖定义;
- 右侧角标显示当前任务数;
- 操作区域:
- 「▶ 运行调度分析」按钮;
- 「↻ 恢复示例依赖」按钮;
- 结果区域:
- 上半部分显示拓扑排序与批次列表;
- 下半部分显示每个任务的最早批次与依赖详情;
- 如遇循环依赖,醒目展示错误提示与疑似循环节点集合。
调用逻辑示例:
this.result = taskDependencySchedulerAnalyzer(this.inputData);
五、应用场景
-
构建流水线优化
- 将项目中的模块构建任务抽象为依赖图,自动计算构建顺序与并行批次。
-
数据处理 DAG 调度
- 对离线作业、数据清洗任务等进行拓扑分析,找到合理的执行批次,提升整体吞吐。
-
发布顺序与灰度策略设计
- 在微服务架构中,根据服务依赖关系规划发布顺序,减少「依赖未就绪」导致的故障。
-
工作流引擎核心能力验证
- 在设计工作流系统时,利用该算法验证任务图的合法性(无环)、并输出执行规划。
六、总结
本案例展示了如何在 KMP + OpenHarmony 环境下,实现一个通用的 任务依赖拓扑排序与执行批次规划引擎:
- 使用文本配置描述任务与依赖,降低输入门槛;
- 采用 Kahn 算法完成拓扑排序,并顺带得到「按批次可并行执行」的层级划分;
- 通过详细的报告输出,让调度结果一目了然;
- 借助 KMP 的多平台能力,将调度核心逻辑复用于更广泛的终端设备。
与前面若干「时间序列」类案例不同,本篇聚焦于图与调度算法,为你的 KMP + OpenHarmony 算法案例库增加了一个「图论 / 调度」维度的实战示例。***
欢迎加入开源鸿蒙跨平台社区:https://openharmonycrossplatform.csdn.net
更多推荐



所有评论(0)