原文:zh.annas-archive.org/md5/f61b77e3e4bb835c082a2ac01826bbbb

译者:飞龙

协议:CC BY-NC-SA 4.0

第九章:为并发设计

在本章中,我们将讨论最常见的并发设计模式,这些模式使用协程实现,以及协程如何同步它们的执行。

并发设计模式帮助我们同时管理许多任务。是的,我知道,我们在上一章就是这样做的。那是因为其中一些设计模式已经内置到语言中。

在本章中,我们将简要介绍你需要自己实现的设计模式和并发设计模式,这些模式需要付出很少的努力。

在本章中,我们将介绍以下主题:

  • 活动对象

  • 延迟值

  • 障碍

  • 调度器

  • 管道

  • 扇出

  • 扇入

  • 缓冲通道

  • 无偏选择

  • 互斥锁

  • 在关闭时选择

  • 伴随通道

  • 延迟通道

活动对象

这种设计模式允许一个方法以安全的方式在另一个线程上执行。猜猜还有什么是在另一个线程上执行的?

你完全正确:actor()

因此,它是一种已经内置到语言中的设计模式。或者,更准确地说,内置到其中一个兼容库中。

我们已经看到了如何向actor()发送数据。但我们如何从它那里接收数据?

一种方法是为它提供一个输出通道:

fun activeActor(out: SendChannel<String>) = actor<Int> {
    for (i in this) {
        out.send(i.toString().reversed())
    }
    out.close()
}

记得在你完成时关闭输出通道。

测试

为了测试活动对象模式,我们将启动两个作业。一个将数据发送到我们的 actor:

val channel = Channel<String>()
val actor = activeActor(channel)

val j1 = launch {
    for (i in 42..53) {
        actor.send(i)
    }
    actor.close()
}

另一个将等待输出通道上的输出:

val j2 = launch {
    for (i in channel) {
        println(i)
    }
}

j1.join()
j2.join()

延迟值

我们已经在第八章的线程和协程部分遇到了延迟值,返回结果部分。Deferredasync()函数的结果,例如。你可能也知道它们来自 Java 或 Scala 的Future,或者来自 JavaScript 的Promise

有趣的是,Deferred 是一种我们在前面章节中遇到的代理设计模式。

就像 Kotlin 的Sequence与 Java8 的Stream非常相似一样,Kotlin 的 Deferred 与 Java Future 也非常相似。你很少需要自己创建 Deferred。通常,你会使用async()返回的那个。

在你需要返回一个未来将评估的值的占位符的情况下,你可以这样做:

val deferred = CompletableDeferred<String>()

launch {
    delay(100)
    if (Random().nextBoolean()) {
        deferred.complete("OK")
    }
    else {
        deferred.completeExceptionally(RuntimeException())
    }
}

println(deferred.await())

这段代码将有一半的时间打印OK,另一半的时间抛出RuntimeException

确保你总是完成你的延迟。通常,将包含延迟的任何代码包装在try...catch块中是一个好主意。

如果你对延迟的结果不再感兴趣,也可以取消延迟。只需在它上面调用cancel()即可:

deferred.cancel()

障碍

障碍设计模式为我们提供了在进一步操作之前等待多个并发任务的手段。一个常见的用例是从不同的来源组合对象。

例如,以下是一个类:

data class FavoriteCharacter(val name: String, val catchphrase: String, val repeats: Int)

假设我们在获取名称、catchphrase和数字。这个catchphrase正从三个不同的来源重复。

最基本的方法是使用 CountDownLatch,就像我们在一些之前的例子中所做的那样:

val latch = CountDownLatch(3)

var name: String? = null
launch {
    delay(Random().nextInt(100))
    println("Got name")
    name = "Inigo Montoya"
    latch.countDown()
}

var catchphrase = ""
launch {
    delay(Random().nextInt(100))
    println("Got catchphrase")
    catchphrase = "Hello. My name is Inigo Montoya. You killed my father. Prepare to die."
    latch.countDown()
}

var repeats = 0
launch {
    delay(Random().nextInt(100))
    println("Got repeats")
    repeats = 6
    latch.countDown()
}

latch.await()

println("${name} says: ${catchphrase.repeat(repeats)}")

你会注意到异步任务完成的顺序正在改变:

Got name
Got catchphrase
Got repeats

但最终,我们总是打印出相同的结果:

Inigo Montoya says: Hello. My name is Inigo Montoya. ...

但这个解决方案带来了很多问题。我们需要处理可变变量,要么为它们设置默认值,要么使用空值。

此外,只要我们使用闭包,这也会工作。如果我们的函数比几行长呢?

CountDownLatch

当然,我们可以传递它们闩锁。我们已经见过几次的闩锁允许一个线程等待,直到其他线程完成工作:

private fun getName(latch: CountDownLatch) = launch {
    ...
    latch.countDown()
}

但这并不是一个清晰的职责分离。我们真的想要指定这个函数应该如何同步吗?

让我们再试一次:

private fun getName() = async {
    delay(Random().nextInt(100))
    println("Got name")
    "Inigo Montoya"
}

private fun getCatchphrase() = async {
    delay(Random().nextInt(100))
    println("Got catchphrase")
    "Hello. My name is Inigo Montoya. You killed my father. Prepare to die."
}

private fun getRepeats() = async {
    delay(Random().nextInt(100))
    println("Got repeats")
    6
}

只是一个提醒,fun getRepeats() = async { ... } 中并没有什么魔法。它的更长等效形式是:

private fun getCatchphrase(): Deferred<String> {
    return async {
        ...
    }
}

我们可以调用我们的代码来得到与之前相同的结果:

val name = getName()
val catchphrase = getCatchphrase()
val repeats = getRepeats()

println("${name.await()} says: ${catchphrase.await().repeat(repeats.await())}")

但我们可以通过使用我们的老朋友,数据类,来进一步改进它。

数据类作为屏障

现在我们的数据类是屏障:

val character = FavoriteCharacter(getName().await(), getCatchphrase().await(), getRepeats().await())

// Will happen only when everything is ready
with(character) {
    println("$name says: ${catchphrase.repeat(repeats)}")    
}

数据类作为屏障的额外好处是能够轻松地解构它们:

val (name, catchphrase, repeats) = character
println("$name says: ${catchphrase.repeat(repeats)}")

如果我们从不同的异步任务中接收到的数据类型差异很大,这会工作得很好。在这个例子中,我们接收到了 StringInt

在某些情况下,我们从不同的来源接收相同类型的数据。

例如,让我们问问迈克尔(我们的金丝雀产品负责人),杰克(我们的咖啡师),以及我,我们最喜欢的电影角色是谁:

object Michael {
    fun getFavoriteCharacter() = async {
        // Doesn't like to think much
        delay(Random().nextInt(10))
        FavoriteCharacter("Terminator", "Hasta la vista, baby", 1)
    }
}

object Jake {
    fun getFavoriteCharacter() = async {
        // Rather thoughtful barista
        delay(Random().nextInt(100) + 10)
        FavoriteCharacter("Don Vito Corleone", "I'm going to make him an offer he can't refuse", 1)
    }
}

object Me {
    fun getFavoriteCharacter() = async {
        // I already prepared the answer!
        FavoriteCharacter("Inigo Montoya", "Hello, my name is...", 6)
    }
}

在那种情况下,我们可以使用列表来收集结果:

val favoriteCharacters = listOf(Me.getFavoriteCharacter().await(),
        Michael.getFavoriteCharacter().await(),
        Jake.getFavoriteCharacter().await())

println(favoriteCharacters)

Scheduler

这是我们在 启动协程 部分简要讨论过的另一个概念,在 第八章 中,线程和协程

记得我们的 launch()async() 可以接收 CommonPool 吗?

这里有一个例子来提醒你,你可以明确指定它:

// Same as launch {}
launch(CommonPool) {
...
}

// Same as async {}
val result = async(CommonPool) {
...
}

这个 CommonPool 是一个伪装成调度器的调度器设计模式。许多异步任务可能被映射到同一个调度器。

运行以下代码:

val r1 = async(CommonPool) {
    for (i in 1..1000) {
        println(Thread.currentThread().name)
        yield()
    }
}

r1.await()

有趣的是,同一个协程被不同的线程选中:

ForkJoinPool.commonPool-worker-2
ForkJoinPool.commonPool-worker-3
...
ForkJoinPool.commonPool-worker-3
ForkJoinPool.commonPool-worker-1

你也可以指定上下文为 Unconfined

val r1 = async(Unconfined) {
    ...
}

这将在主线程上运行协程。它打印:

main
main
...

你也可以从你的父协程继承上下文:

val r1 = async {
    for (i in 1..1000) {
        val parentThread = Thread.currentThread().name
        launch(coroutineContext) {
            println(Thread.currentThread().name == parentThread)
        }
        yield()
    }
}

注意,但运行在同一个上下文中并不意味着我们在同一个线程上运行。

你可能会问自己:继承上下文和使用 Unconfined 之间有什么区别?我们将在下一节详细讨论这个问题。

理解上下文

为了理解不同的上下文,让我们看看下面的代码:

val r1 = async(Unconfined) {
    for (i in 1..1000) {
        println(Thread.currentThread().name)
        delay(1)
    }
}

r1.await()

我们不是使用 yield(),而是使用 delay() 函数,它也会挂起当前的协程。

但与 yield() 相比,输出是不同的:

main
kotlinx.coroutines.DefaultExecutor
...

在第一次调用 delay() 之后,协程已经切换了上下文,从而导致了线程的切换。

因此,不建议对于 CPU 密集型任务或需要在特定线程上运行的任务(如 UI 渲染)使用Unconfined

您也可以为协程创建自己的线程池来运行:

val pool = newFixedThreadPoolContext(2, "My Own Pool")
val r1 = async(pool) {
    for (i in 1..1000) {
        println(Thread.currentThread().name)
        yield()
    }
}

r1.await()
pool.close()

它打印:

...
My Own Pool-2
My Own Pool-1
My Own Pool-2
My Own Pool-2
...

如果你创建了自己的线程池,请确保你使用close()释放它或重用它,因为创建新的线程池并持有它从资源角度来看是昂贵的。

管道

在我们的StoryLand中,同样的懒散建筑师,也就是我,正在努力解决一个问题。回到第四章,熟悉行为模式,我们编写了一个 HTML 页面解析器。但它取决于是否有人已经为我们抓取了要解析的页面。它也不够灵活。

我们希望有一个协程产生无限的新闻流,而其他协程则逐步解析这个流。

要开始使用 DOM,我们需要一个库,例如kotlinx.dom。如果你使用Gradle,请确保将以下行添加到你的build.gradle文件中:

repositories {
    ...
    jcenter()
}

dependencies {
    ...
    compile "org.jetbrains.kotlinx:kotlinx.dom:0.0.10"
}

现在,让我们着手处理当前的任务。

首先,我们希望偶尔抓取新闻页面。为此,我们将有一个生产者:

fun producePages() = produce {
    fun getPages(): List<String> {
        // This should actually fetch something
        return listOf("<html><body><H1>Cool stuff</H1></body></html>",
                "<html><body><H1>Event more stuff</H1></body></html>").shuffled()
    }
    while (this.isActive) {
        val pages = getPages()
        for (p in pages) {
            send(p)
        }
        delay(TimeUnit.SECONDS.toMillis(5))
    }
}

我们在这里使用shuffled(),这样列表元素的顺序就不会总是相同。

只要协程正在运行且未被取消,isActive标志将为真。在可能运行很长时间的循环中检查此属性是一种良好的做法,这样它们就可以在迭代之间停止。

每次我们收到新的标题时,我们就将它们发送到下游。

由于科技新闻更新并不频繁。我们可以偶尔使用delay()检查更新。在实际代码中,延迟可能是几分钟,甚至几小时。

下一步是创建包含 HTML 的原始字符串的文档对象模型DOM)。为此,我们将有一个第二个生产者,这个生产者接收一个连接到第一个生产者的通道:

fun produceDom(pages: ReceiveChannel<String>) = produce {

    fun parseDom(page: String): Document {
         return kotlinx.dom.parseXml(*page.toSource()*)
    }

    for (p in pages) {
        send(parseDom(p))
    }
}

我们可以使用for循环遍历通道,直到有更多数据到来。这是从通道中消费数据的一种非常优雅的方式。

在这个生产者中,我们最终使用了我们之前导入的 DOM 解析器。我们还引入了一个方便的String扩展函数:

private fun String.toSource(): InputSource {
    return InputSource(StringReader(this))
}

这是因为parseXml()期望InputSource作为其输入。基本上,这是一个适配器设计模式的应用:

fun produceTitles(parsedPages: ReceiveChannel<Document>) = produce {
    fun getTitles(dom: Document): List<String> {
        val h1 = dom.getElementsByTagName("H1")
        return h1.asElementList().map {
            it.textContent
        }
    }

    for (page in parsedPages) {
        for (t in getTitles(page)) {
            send(t)
        }
    }
}

我们正在寻找标题,因此使用getElementsByTagName("H1")。对于找到的每个标题,可能有多个,我们使用textContent获取其文本。

最后,我们将每个页面的每个标题发送到下一个。

建立管道

现在,为了建立我们的管道:

val pagesProducer = producePages()

val domProducer = produceDom(pagesProducer)

val titleProducer = produceTitles(domProducer)

runBlocking {
    titleProducer.consumeEach {
        println(it)
    }
}

我们有以下内容:

pagesProducer |> domProducer |> titleProducer |> output

管道是将长过程分解成更小步骤的绝佳方式。请注意,每个生产协程都是一个纯函数,因此它也很容易测试和推理。

整个管道可以通过在第一个协程上调用cancel()来停止。

我们可以通过使用扩展函数来实现更友好的 API:

private fun ReceiveChannel<Document>.titles(): ReceiveChannel<String> {
    val channel = this
    fun getTitles(dom: Document): List<String> {
        val h1 = dom.getElementsByTagName("H1")
        return h1.asElementList().map {
            it.textContent
        }
    }

    return produce {
        for (page in channel) {
            for (t in getTitles(page)) {
                send(t)
            }
        }
    }
}

private fun ReceiveChannel<String>.dom(): ReceiveChannel<Document> {
    val channel = this
    return produce() {
        for (p in channel) {
            send(kotlinx.dom.parseXml(p.toSource()))
        }
    }
}

然后,我们可以这样调用我们的代码:

runBlocking {
    producePages().dom().titles().consumeEach {
        println(it)
    }
}

Kotlin 在创建表达性和流畅的 API 方面真的很出色。

扇出设计模式

如果我们管道中不同步骤的工作量差异很大怎么办?

例如,获取 HTML 比解析它花费的时间要多得多。或者如果我们根本没有任何管道,只是有很多任务我们希望在协程之间分配。

这就是扇出设计模式发挥作用的地方。协程的数量可以从同一个通道读取,分配工作。

我们可以有一个协程产生一些结果:

private fun producePages() = produce {
    for (i in 1..10_000) {
        for (c in 'a'..'z') {
            send(i to "page$c")
        }
    }
}

并有一个函数可以创建一个读取这些结果的协程:


private fun consumePages(channel: ReceiveChannel<Pair<Int, String>>) = async {
    for (p in channel) {
        println(p)
    }
}

这使我们能够生成任意数量的消费者:

val producer = producePages()

val consumers = List(10) {
    consumePages(producer)
}

runBlocking {
    consumers.forEach {
        it.await()
    }
}

扇出设计模式允许我们高效地将工作分配给多个协程、线程和 CPU。

扇入设计模式

如果我们的协程总是可以自己做出决定,那会很好。但它们如果需要将计算结果返回给另一个协程怎么办?

扇出相反的是扇入设计模式。不是多个协程从同一个通道读取,而是多个协程可以将他们的结果写入同一个通道。

想象一下,你正在从两个显赫的科技资源中阅读新闻:techBunchtheFerge

每个资源以自己的速度产生值,并将它们通过通道发送:

private fun techBunch(collector: Channel<String>) = launch {
    repeat(10) {
        delay(Random().nextInt(1000))
        collector.send("Tech Bunch")
    }
}

private fun theFerge(collector: Channel<String>) = launch {
    repeat(10) {
        delay(Random().nextInt(1000))
        collector.send("The Ferge")
    }
}

通过提供相同的通道,我们可以合并他们的结果:

val collector = Channel<String>()

techBunch(collector)
theFerge(collector)

runBlocking {
    collector.consumeEachIndexed {
        println("${it.index} Got news from ${it.value}")
    }
}

结合扇出和扇入设计模式是Map/Reduce算法的良好基础。

为了证明这一点,我们将生成 10,000,000 个随机数,并通过多次分割这个任务来计算它们中的最大数。

首先,生成 10,000,000 个随机整数的列表:

val numbers = List(10_000_000) {
    Random().nextInt()
}

管理工人

现在,我们将有两种类型的工人:

  • 分割工人将接收到数字列表,确定列表中的最大数,并将其发送到输出通道:
fun divide(input: ReceiveChannel<List<Int>>, 
           output: SendChannel<Int>) = async {
    var max = 0
    for (list in input) {
        for (i in list) {
            if (i > max) {
                max = i
                output.send(max)
            }
        }
    }
}
  • 收集器将监听这个通道,并且每次有新的子最大数到达时,将决定它是否是史上最大的:
fun collector() = actor<Int> {
    var max = 0
    for (i in this) {
        max = Math.max(max, i)
    }
    println(max)
}

现在,我们只需要建立那些通道:

val input = Channel<List<Int>>()
val output = collector()
val dividers = List(10) {
    divide(input, output)
}

launch {
    for (c in numbers.chunked(1000)) {
        input.send(c)
    }
    input.close()
}

dividers.forEach {
    it.await()
}

output.close()

注意,在这种情况下,我们不会获得性能上的好处,而简单的numbers.max()会产生更好的结果。但随着需要收集的数据量增加,这种模式变得更加有用。

缓冲通道

到目前为止,我们使用的所有通道的容量都是正好一个元素。

这意味着如果你向这个通道写入,但没有人在读取,发送者将被挂起:

val channel = Channel<Int>()

val j = launch {
    for (i in 1..10) {
        channel.send(i)
        println("Sent $i")
    }
}

j.join()

这段代码没有打印任何东西,因为协程正在等待有人从通道读取。

为了避免这种情况,我们可以创建一个缓冲通道:

val channel = Channel<Int>(5)

现在,只有在通道容量达到时才会发生挂起。

它会打印:

Sent 1
Sent 2
Sent 3
Sent 4
Sent 5

由于produce()actor()也由通道支持,我们也可以将其设置为缓冲:

val actor = actor<Int>(capacity = 5) {
    ...
}

val producer = produce<Int>(capacity = 10) {
    ...        
}

无偏选择

与通道一起工作的最有用的一种方式是我们之前在第八章“生产者”部分中看到的select {}子句,线程和协程

但选择是固有的有偏见的。如果两个事件同时发生,它将选择第一个子句。

在下面的例子中,我们将有一个生产者,它以很短的延迟发送五个值:

fun producer(name: String, repeats: Int) = produce {
    repeat(repeats) {
        delay(1)
        send(name)
    }
}

我们将创建三个这样的生产者并查看结果:

val repeats = 10_000
val p1 = producer("A", repeats)
val p2 = producer("B", repeats)
val p3 = producer("C", repeats)

val results = ConcurrentHashMap<String, Int>()
repeat(repeats) {
    val result = select<String> {
        p1.onReceive { it }
        p2.onReceive { it }
        p3.onReceive { it }
    }

    results.compute(result) { k, v ->
        if (v == null) {
            1
        }
        else {
            v + 1
        }
    }
}

println(results)

我们运行了五次这个代码。以下是一些结果:

{A=8235, B=1620, C=145}
{A=7850, B=2062, C=88}
{A=7878, B=2002, C=120}
{A=8260, B=1648, C=92}
{A=7927, B=2011, C=62}

如你所见,A几乎总是赢,而C总是第三。你设置的repeats越多,偏差就越大。

现在我们使用selectUnbiased代替:

...
val result = selectUnbiased<String> {
    p1.onReceive { it }
    p2.onReceive { it }
    p3.onReceive { it }
}
...

前五次执行的结果可能看起来像这样:

{A=3336, B=3327, C=3337}
{A=3330, B=3332, C=3338}
{A=3334, B=3333, C=3333}
{A=3334, B=3336, C=3330}
{A=3332, B=3335, C=3333}

现在数字分布得更均匀了,而且所有子句都有相同的机会被选中。

互斥锁

也称为互斥排他,互斥锁提供了一种保护共享状态的手段。

让我们从那个陈旧、令人讨厌的反例开始:

var counter = 0

val jobs = List(10) {
    launch {
        repeat(1000) {
            counter++
            yield()
        }
    }
}

runBlocking {
    jobs.forEach {
        it.join()
    }
    println(counter)
}

如你所猜,这会打印出除了10*100的结果之外的所有内容。真是尴尬至极。

为了解决这个问题,我们引入了一个互斥锁:

var counter = 0
val mutex = Mutex()

val jobs = List(10) {
    launch {
        repeat(1000) {
            mutex.lock()
            counter++
            mutex.unlock()
            yield()
        }
    }
}

现在我们的例子总是打印出正确的数字。

这对简单情况很有用。但如果关键部分(即lock()unlock()之间)的代码抛出异常怎么办?

然后,我们必须将所有内容包裹在try...catch中,这并不方便:

repeat(1000) {
    try {
        mutex.lock()
        counter++                     
   }
 finally {
        mutex.unlock()                    
    }

    yield()
}

正是为了这个目的,Kotlin 还引入了withLock()

...
repeat(1000) {
    mutex.withLock {
        counter++
    }
    yield()
}
...

选择在关闭时

使用select()从通道中读取直到它关闭是件好事。

你可以在这里看到那个问题的例子:

val p1 = produce {
    repeat(10) {
        send("A")
    }
}

val p2 = produce {
    repeat(5) {
        send("B")
    }
}

runBlocking { 
    repeat(15) {
        val result = selectUnbiased<String> {
            p1.onReceive {
                it
            }
            p2.onReceive {
                it
            }
        }

        println(result)
    }
}

虽然数字相加,但我们运行此代码时可能会经常收到ClosedReceiveChannelException。这是因为第二个生产者有更少的物品,一旦它完成,它将关闭其通道。

为了避免这种情况,我们可以使用onReceiveOrNull,它将同时返回一个可空版本。一旦通道关闭,我们在select中就会收到null

我们可以按任何我们想要的方式处理这个空值,例如,通过使用elvis运算符:

repeat(15) {
    val result = selectUnbiased<String> {
        p1.onReceiveOrNull {
            // Can throw my own exception
            it ?: throw RuntimeException()
        }
        p2.onReceiveOrNull {
            // Or supply default value
            it ?: "p2 closed"
        }
    }

    println(result)
}

利用这些知识,我们可以通过跳过空结果来排空两个通道:

var count = 0
while (count < 15) {
    val result = selectUnbiased<String?> {
        p1.onReceiveOrNull {
            it
        }
        p2.onReceiveOrNull {
            it
        }
    }

    if (result != null) {
        println(result)
        count++
    }
}

辅助通道

到目前为止,我们只讨论了select作为接收者的用法。但我们也可以使用select将项目发送到另一个通道。

让我们看看以下例子:

val batman = actor<String> {
    for (c in this) {
        println("Batman is beating some sense into $c")
        delay(100)
    }
}

val robin = actor<String> {
    for (c in this) {
        println("Robin is beating some sense into $c")
        delay(250)
    }
}

我们有一个超级英雄及其助手作为两个演员。由于超级英雄更有经验,他们通常花费更少的时间来击败他们面对的恶棍。

但在某些情况下,它们仍然手头很忙,所以需要一个助手来帮忙。

我们将向这对组合投掷五个恶棍,并观察它们的反应,同时加入一些延迟:

val j = launch {
    for (c in listOf("Jocker", "Bane", "Penguin", "Riddler", "Killer Croc")) {
        val result = select<Pair<String, String>> {
            batman.onSend(c) {
                Pair("Batman", c)
            }
            robin.onSend(c) {
                Pair("Robin", c)
            }
        }
        delay(90)
        println(result)
    }
}

它打印出:

Batman is beating some sense into Jocker
(Batman, Jocker)
Robin is beating some sense into Bane
(Robin, Bane)
Batman is beating some sense into Penguin
(Batman, Penguin)
Batman is beating some sense into Riddler
(Batman, Riddler)
Robin is beating some sense into Killer Croc
(Robin, Killer Croc)

注意,这个选择的类型参数指的是从块中返回的内容,而不是发送到通道的内容。

这就是为什么我们在这里使用Pair<String, String>的原因。

延迟通道

你与协程工作得越多,就越习惯于等待结果。在某个时刻,你将开始在通道中发送延迟值。

我们将首先创建 10 个异步任务。第一个将延迟很长时间,其余的我们将延迟很短的时间:

val elements = 10
val deferredChannel = Channel<Deferred<Int>>(elements)

launch(CommonPool) {
    repeat(elements) { i ->
        println("$i sent")
        deferredChannel.send(async {
            delay(if (i == 0) 1000 else 10)
            i
        })
    }
}

我们将把这些结果放入一个缓冲通道中。

现在,我们可以从这个通道读取,并使用第二个select块,等待结果:

val time = measureTimeMillis {
    repeat(elements) {
        val result = select<Int> {
            deferredChannel.onReceive {
                select {
                    it.onAwait { it }
                }
            }
        }
        println(result)
    }
}

println("Took ${time}ms")

注意,结果时间是最慢的任务的时间:

Took 1010ms

你还可以使用onAwait()作为另一个通道的停止信号。

因此,我们将创建一个将在 600 毫秒内完成的异步任务:

val stop = async {
    delay(600)
    true
}

并且,就像上一个例子一样,我们将通过缓冲通道发送 10 个延迟值:

val channel = Channel<Deferred<Int>>(10)

repeat(10) {i ->
    channel.send(async {
        delay(i * 100)
        i
    })
}

然后,我们将等待新的值或通知通道应该关闭:

runBlocking {
    for (i in 1..10) {
        select<Unit> {
            stop.onAwait {
                channel.close()
            }
            channel.onReceive {
                println(it.await())
            }
        }
    }
}

如预期的那样,这仅打印了十个值中的六个,在 600 毫秒后停止。

摘要

在本章中,我们介绍了与 Kotlin 中并发相关的各种设计模式。其中大多数都是基于协程、通道、延迟值或它们的组合。

管道扇入扇出有助于分配工作和收集结果。延迟值用作稍后解决某事的占位符。调度器帮助我们管理资源,主要是支持协程的线程。互斥锁屏障有助于控制并发。

现在,你应该理解了select块以及它如何有效地与通道和延迟值结合使用。

在下一章中,我们将讨论 Kotlin 的惯用用法、最佳实践以及随着语言出现的某些反模式。

第十章:习语和反模式

本章讨论了 Kotlin 的最佳和最坏实践。你将了解惯用的 Kotlin 代码应该是什么样子,以及哪些模式应该避免。

完成这一章后,你应该能够编写更易读、更易于维护的 Kotlin 代码,并避免一些常见的陷阱。

在本章中,我们将涵盖以下主题:

  • Let

  • Apply

  • Also

  • Run

  • With

  • 实例检查

  • 尝试使用资源

  • 内联函数

  • 重新声明

  • 常量

  • 构造函数重载

  • 处理空值

  • 显式异步

  • 验证

  • 密封,而不是枚举

  • 更多同伴

  • Scala 函数

Let

通常,我们使用let()只在对象not null时执行某些操作:

val sometimesNull = if (Random().nextBoolean()) "not null" else null

sometimesNull?.let {
    println("It was $it this time")
}

这里一个常见的陷阱是let()本身也可以用于空值:

val alwaysNull = null

alwaysNull.let { // No null pointer there
    println("It was $it this time") // Always prints null
}

使用let()进行空值检查时,不要忘记问号?

let()的返回值与其操作的类型无关:

val numberReturned = justAString.let {
    println(it)
    it.length
}

这段代码将打印"string"并返回其长度为Int 6

Apply

我们已经在之前的章节中讨论了apply()。它返回它操作的对象,并将上下文设置为this。这个函数最有用的用例是设置可变对象的字段。

想想你有多少次不得不创建一个空构造函数的类,然后依次调用很多 setter:

class JamesBond {
    lateinit var name: String
    lateinit var movie: String
    lateinit var alsoStarring: String
}

val agentJavaWay = JamesBond()
agentJavaWay.name = "Sean Connery"
agentJavaWay.movie = "Dr. No"

我们只能设置namemovie,但alsoStarring留空,如下所示:

val `007` = JamesBond().apply {
    this.name = "Sean Connery"
    this.movie = "Dr. No"
}

println(`007`.name)

由于上下文设置为this,我们可以将其简化为以下漂亮的语法:

val `007` = JamesBond().apply {
    name = "Sean Connery"
    movie = "Dr. No"
}

这个函数在你处理通常有很多 setter 的 Java 类时特别有用。

Also

单表达式函数非常简洁和优雅:

fun multiply(a: Int, b: Int): Int = a * b

但通常,你有一个单语句函数,它还需要写入日志,例如。

你可以写成以下方式:

fun multiply(a: Int, b: Int): Int {
    val c = a * b
    println(c)
    return c
}

但这样它就不再是单语句函数了,对吧?

我们还引入了另一个变量。来拯救,also()

fun multiply(a: Int, b: Int): Int = (a * b).also { println(it) }

这个函数将表达式的结果设置到it,并返回表达式的结果。

这也适用于你想要在一系列调用中产生副作用的情况:

val l = (1..100).toList()

l.filter{ it % 2 == 0 }
    .also { println(it) } // Prints, but doesn't change anything
    .map { it * it }

Run

与线程无关,run()let()类似,但它将上下文设置为this而不是使用it

val justAString = "string"

val n = justAString.run { 
    this.length
}

通常,this可以省略:

val n = justAString.run { 
    length
}

当你计划在同一个对象上调用多个方法时,这非常有用,就像apply()一样。

apply()不同,返回结果可能完全不同:

val year = JamesBond().run {
    name = "ROGER MOORE"
    movie = "THE MAN WITH THE GOLDEN GUN"
    1974 // <= Not JamesBond type
}

With

与其他四个作用域函数不同,with()不是一个扩展函数。

这意味着你不能这样做:

"scope".with { ... }

相反,with()接收一个作为参数的对象,你想要作用域的对象:

with("scope") {
    println(this.length) // "this" set to the argument of with()
}

通常情况下,我们可以省略this

with("scope") {
    length
}

就像run()let()一样,你可以从with()返回任何结果。

实例检查

来自 Java,你可能倾向于检查你的对象类型,使用is,然后使用as进行转换:

interface Superhero
class Batman : Superhero {
    fun callRobin() {
        println("To the Bat-pole, Robin!")
    }
}

class Superman : Superhero {
    fun fly() {
        println("Up, up and away!")
    }
}

fun doCoolStuff(s : Superhero) {
    if (s is Superman) {
        (s as Superman).fly()
    }
    else if (s is Batman) {
        (a as Batman).callRobin()
    }
}

但正如你可能知道的,Kotlin 有智能转换,所以在这种情况下,隐式转换不是必需的:

fun doCoolStuff(s : Superhero) {
    if (s is Superman) {
        s.fly()
    }
    else if (s is Batman) {
        s.callRobin()
    }
}

此外,在大多数情况下,使用when()进行智能转换会产生更干净的代码:

fun doCoolStuff(s : Superhero) {
    when(s) {
        is Superman -> s.fly()
        is Batman -> s.callRobin()
        else -> println("Unknown superhero")
    }
}

作为一条经验法则,你应该避免使用类型转换,并尽可能多地依赖智能转换:

// Superhero is clearly not a string
val superheroAsString = (s as String)

但如果你绝对必须,还有一个安全的类型转换操作符:

val superheroAsString = (s as? String)

使用 try-with-resources

Java7 引入了 AutoCloseable 的概念和 try-with-resources 语句。

这个声明允许我们提供一组资源,这些资源在代码使用完毕后会自动关闭。不再有忘记关闭文件的风险(或者至少风险更小)。

在 Java7 之前,那是一个完全的混乱:

BufferedReader br = null; // Nulls are bad, we know that
try {
    br = new BufferedReader(new FileReader("/some/peth"));
    System.out.println(br.readLine());
}
finally {
    if (br != null) { // Explicit check
        br.close(); // Boilerplate
    }
}

在 Java7 之后:

try (BufferedReader br = new BufferedReader(new FileReader("/some/peth"))) {
    System.out.println(br.readLine());
}

在 Kotlin 中,this 语句被 use() 函数替换:

val br = BufferedReader(FileReader(""))

br.use {
    println(it.readLine())
}

内联函数

你可以将内联函数视为编译器的复制/粘贴指令。每次编译器看到标记为内联的函数调用时,它都会用具体的函数体替换调用。

只有当它是一个高阶函数,并且其中一个参数是 lambda 表达式时,才使用内联函数:

inline fun doesntMakeSense(something: String) {
    println(something)
}

这是最常见的使用场景,你希望使用 inline

inline fun makesSense(block: () -> String) {
    println("Before")
    println(block())
    println("After")
}

你可以像往常一样调用它,使用代码块体:

makesSense {
    "Inlining"
}

但如果你查看字节码,你会发现它实际上被转换成了生成的行,而不是函数调用:

println("Before")
println("Inlining")
println("After")

在实际代码中,你会看到以下内容:

String var1 = "Before"; <- Inline function call
System.out.println(var1);
var1 = "Inlining";
System.out.println(var1);
var1 = "After";
System.out.println(var1);

var1 = "Before"; // <- Usual code
System.out.println(var1);
var1 = "Inlining";
System.out.println(var1);
var1 = "After";
System.out.println(var1);

注意,这两个代码块之间没有任何区别。

由于内联函数是复制/粘贴的,所以如果你有超过几行代码,就不应该使用它。将其作为常规函数会更有效率。

Reified

由于内联函数是复制的,我们可以消除 JVM 的一项主要限制——类型擦除。毕竟,在函数内部,我们知道我们得到的确切类型。

让我们看看以下示例。你希望创建一个泛型函数,该函数将接收一个数字,但只有在它与函数类型相同时才会打印它。

你可以尝试编写如下内容:

fun <T> printIfSameType(a: Number) {
    if (a is T) { // <== Error
        println(a)   
    }
}

但这段代码会因为以下错误而无法编译:

Cannot check for instance of erased type: T

在这种情况下,我们通常在 Java 中这样做,即传递类作为参数:

fun <T: Number> printIfSameType(clazz: KClass<T>, a: Number) {
    if (clazz.isInstance(a) ) {
        println(a)
    }
}

我们可以通过运行以下两行来检查这段代码:

printIfSameType(Int::class, 1) // Print 1, as 1 is Int
printIfSameType(Int::class, 2L) // Prints nothing, as 2 is Long

这段代码有几个缺点:

  • 我们不得不使用反射,为此,我们必须包含 kotlin-reflect 库:
compile group: 'org.jetbrains.kotlin', name: 'kotlin-reflect', version: '1.2.31'
  • 我们不能使用 is 操作符,而必须使用 isInstance() 函数。

  • 我们必须传递正确的类:

clazz: KClass<T>

相反,我们可以使用一个 reified 函数:

reified T> printIfSameTypeReified(a: Number) {
    if (a is T) {
        println(a)
    }
}

我们可以测试我们的代码是否仍然按预期工作:

printIfSameTypeReified<Int>(3) // Prints 3, as 3 is Int
printIfSameTypeReified<Int>(4L) // Prints nothing, as 4 is Long
printIfSameTypeReified<Long>(5) // Prints nothing, as 5 is Int
printIfSameTypeReified<Long>(6L) // Prints 6, as 6 is Long

这样我们就能获得该语言的所有好处:

  • 没有必要使用另一个依赖项

  • 清晰的方法签名

  • 使用 is 构造的能力

当然,与常规内联函数相同的规则适用。这段代码将被复制,所以它不应该太大。

考虑另一个关于函数重载的例子:

fun printList(list: List<Int>) {
    println("This is a lit of Ints")
    println(list)
}

fun printList(list: List<Long>) {
    println("This is a lit of Longs")
    println(list)
}

这将无法编译,因为存在平台声明冲突。在 JVM 方面,两者具有相同的签名:printList(list: List)

但使用 reified,我们可以实现这一点:

const val int = 1
const val long = 1L
inline fun <reified T : Any> printList(list: List<T>) {
    when {
        int is T -> println("This is a list of Ints")
        long is T -> println("This is a list of Longs")
        else -> println("This is a list of something else")
    }

    println(list)
}

常量

由于 Java 中的所有内容都是对象(除非你是原始类型),我们习惯于将所有常量作为静态成员放入我们的对象中。

由于 Kotlin 有伴随对象,我们通常尝试将它们放在那里:

class MyClass {
    companion object {
        val MY_CONST = "My Const"
    }
}

这将有效,但你应该记住,毕竟伴随对象是一个对象。

因此,这将被翻译成以下代码,或多或少:

public final class Spock {
   @NotNull
   private static final String MY_CONST = "My Const";
   public static final Spock.Companion Companion = new Spock.Companion(...);

   public static final class Companion {
      @NotNull
      public final String getMY_CONST() {
         return MyClass.MY_CONST;
      }

      private Companion() {
      }
   }
}

我们常量的调用看起来像这样:

String var1 = Spock.Companion.getSENSE_OF_HUMOR();
System.out.println(var1);

因此,我们在 Spock 类中有一个类,但我们想要的只是 static final String

现在我们将这个值标记为常量:

class Spock {
    companion object {
        const val SENSE_OF_HUMOR = "None"
    }
}

这里是字节码的变化:

public final class Spock {
   @NotNull
   public static final String SENSE_OF_HUMOR = "NONE";
   public static final Spock.Companion Companion = new Spock.Companion(...);
   )
   public static final class Companion {
      private Companion() {
      }
        ...
   }
}

这里是调用:

String var1 = "NONE";
System.out.println(var1);

注意,根本就没有调用这个常量,因为编译器已经为我们内联了它的值。毕竟,它是常量。

如果你只需要一个常量,你也可以在任何类外部设置它:

const val SPOCK_SENSE_OF_HUMOR = "NONE"

如果你需要命名空间,你可以将其包裹在一个对象中:

object SensesOfHumor {
    const val SPOCK = "NONE"
}

构造函数重载

在 Java 中,我们习惯于有重载的构造函数:

class MyClass {
    private final String a;
    private final Integer b;
    public MyClass(String a) {
        this(a, 1);
    }

    public MyClass(String a, Integer b) {
        this.a = a;
        this.b = b;
    }
}

我们可以在 Kotlin 中模拟相同的行为:

class MyClass(val a: String, val b: Int, val c: Long) {

    constructor(a: String, b: Int) : this(a, b, 0) 

    constructor(a: String) : this(a, 1)

    constructor() : this("Default")
}

但通常更好的做法是使用默认参数值和命名参数:


class BetterClass(val a: String = "Default",
                  val b: Int = 1,
                  val c: Long = 0)

处理空值

空值是不可避免的,尤其是在你使用 Java 库或从数据库获取数据时。

但你可以用 Java 的方式检查空值:

// Will return "String" half of the time, and null the other half
val stringOrNull: String? = if (Random().nextBoolean()) "String" else null 

// Java-way check
if (stringOrNull != null) {
    println(stringOrNull.length)
}

或者用更短的形式,使用 Elvis 操作符。如果长度不为空,这个操作符将返回其值。否则,它将返回我们提供的默认值,在这个例子中是零:

val alwaysLength = stringOrNull?.length ?: 0

println(alwaysLength) // Will print 6 or 0, but never null

如果你有一个嵌套对象,你可以链式调用这些检查:

data class Json(
        val User: Profile?
)

data class Profile(val firstName: String?,
                   val lastName: String?)

val json: Json? = Json(Profile(null, null))

println(json?.User?.firstName?.length)

最后,你可以使用 let() 块来进行这些检查:

println(json?.let {
    it.User?.let {
        it.firstName?.length
    }
})

如果你想要消除所有地方的 it(),你可以使用 run:

println(json?.run {
    User?.run {
        firstName?.length
    }
})

尽可能避免不安全的 !! 空值操作符:

println(json!!.User!!.firstName!!.length)

这将导致 KotlinNullPointerException

显式异步

正如你在上一章中看到的,在 Kotlin 中引入并发非常容易:

fun getName() = async {
   delay(100)
   "Ruslan"
}

但这种并发可能对函数的用户来说是不预期的行为,因为他们可能期望一个简单的值:

println("Name: ${getName()}")

它会打印:

Name: DeferredCoroutine{Active}@...

当然,这里缺少的是 await()

println("Name: ${getName().await()}")

但如果我们相应地命名我们的函数,这会显得更加明显:

fun getNameAsync() = async {
   delay(100)
   "Ruslan"
}

通常,你应该建立某种约定来区分异步函数和常规函数。

验证

你有多少次不得不编写如下代码:

fun setCapacity(cap: Int) {
    if (cap < 0) {
        throw IllegalArgumentException()
    }
    ...
}

相反,你可以使用 require() 来检查参数:

fun setCapacity(cap: Int) {
    require(cap > 0)
}

这使得代码更加流畅。

你可以使用 require() 来检查嵌套的空值:

fun printNameLength(p: Profile) {
    require(p.firstName != null)
}

但也有 requireNotNull() 来处理这种情况:

fun printNameLength(p: Profile) {
    requireNotNull(p.firstName)
}

使用 check() 来验证你对象的状态。这在你提供用户可能没有正确设置的对象时很有用:

private class HttpClient {
    var body: String? = null
    var url: String = ""

    fun postRequest() {
        check(body != null) {
            "Body must be set in POST requests"
        }
    }

    fun getRequest() {
        // This one is fine without body
    }
}

再次强调,对于 null 有一个快捷方式:checkNotNull()

密封,而不是枚举

来自 Java,你可能想给你的 enum 赋予功能:

// Java code
enum PizzaOrderStatus {
    ORDER_RECEIVED, 
    PIZZA_BEING_MADE, 
    OUT_FOR_DELIVERY, 
    COMPLETED;

    public PizzaOrderStatus nextStatus() {
        switch (this) {
            case ORDER_RECEIVED: return PIZZA_BEING_MADE;
            case PIZZA_BEING_MADE: return OUT_FOR_DELIVERY;
            case OUT_FOR_DELIVERY: return COMPLETED;
            case COMPLETED:return COMPLETED;
        }
    }
}

相反,你可以使用 sealed 类:

sealed class PizzaOrderStatus(protected val orderId: Int) {
    abstract fun nextStatus() : PizzaOrderStatus
    class OrderReceived(orderId: Int) : PizzaOrderStatus(orderId) {
        override fun nextStatus(): PizzaOrderStatus {
            return PizzaBeingMade(orderId)
        }
    }

    class PizzaBeingMade(orderId: Int) : PizzaOrderStatus(orderId) {
        override fun nextStatus(): PizzaOrderStatus {
            return OutForDelivery(orderId)
        }
    }

    class OutForDelivery(orderId: Int) : PizzaOrderStatus(orderId) {
        override fun nextStatus(): PizzaOrderStatus {
            return Completed(orderId)
        }
    }

    class Completed(orderId: Int) : PizzaOrderStatus(orderId) {
        override fun nextStatus(): PizzaOrderStatus {
            return this
        }
    }
}

这种方法的优点是,我们现在可以传递数据以及状态:

var status: PizzaOrderStatus = OrderReceived(123)

while (status !is Completed) {
    status = when (status) {
        is OrderReceived -> status.nextStatus()
        is PizzaBeingMade -> status.nextStatus()
        is OutForDelivery -> status.nextStatus()
        is Completed -> status
    }
}

通常,如果你想要将数据与状态关联起来,密封类是很好的选择。

更多伴随对象

你只能有一个伴随对象在你的类中:

class A {
   companion {
   }
   companion {
   }
}

但你可以在你的类中拥有任意多的对象:

class A {
   object B {
   }
   object C {
   }
}

这有时用于产生命名空间。命名空间很重要,因为它为你提供了更好的命名约定。想想看,当你创建了像 SimpleJsonParser 这样的类,它继承自 JsonParser,而 JsonParser 又继承自 Parser 时的情况。你可以将这个结构转换为 Json.Parser,例如,这要简洁得多,也更实用,因为 Kotlin 代码应该是这样的。

Scala 函数

从 Scala 转向 Kotlin 的开发者有时可能会这样定义他们的函数:

fun hello() = {
    "hello"
}

调用此函数不会打印你期望的内容:

println("Say ${hello()}")

它会打印以下内容:

 Say () -> kotlin.String

我们缺少的是第二组括号:

println("Say ${hello()()}")

它会打印以下内容:

Say hello

这是因为单表达式定义可以翻译成:

fun hello(): () -> String {
    return {
        "hello"
    }
}

它可以进一步翻译成:

fun helloExpandedMore(): () -> String {
    return fun():String {
        return "hello"
    }
}

现在,你至少可以看到那个函数的来源了。

摘要

在本章中,我们回顾了 Kotlin 中的最佳实践以及语言的一些注意事项。现在你应该能够编写更符合语言习惯、性能良好且易于维护的代码。

你应该利用作用域函数,但确保不要过度使用它们,因为它们可能会使代码变得混乱,尤其是对于那些刚开始学习这门语言的人来说。

确保正确处理空值和类型转换,使用 let()Elvis 操作符以及语言提供的智能转换。

在下一章和最后一章中,我们将通过编写一个真实生活中的微服务来应用这些技能,使用我们所学的一切。

第十一章:使用 Kotlin 的响应式微服务

在本章中,我们将通过使用 Kotlin 编程语言构建一个微服务来运用我们迄今为止学到的技能。我们还想让这个微服务是响应式的,并且尽可能接近现实生活。为此,我们将使用 Vert.x 框架,其优势将在下一节中列出。

你可能已经厌倦了创建待办事项或购物清单。

因此,相反,这个微服务将是一个 猫收容所。这个微服务应该能够做到以下事情:

  • 提供一个我们可以 ping 的端点来检查服务是否正在运行

  • 列出目前收容所中的猫咪

  • 提供一种方法来添加新的猫咪

你需要开始以下内容:

  • JDK 1.8 或更高版本

  • IntelliJ IDEA

  • Gradle 4.2 或更高版本

  • PostgreSQL 9.4 或更高版本

本章将假设你已经安装了 PostgreSQL 并且你对它有基本的工作知识。如果没有,请参阅官方文档:www.postgresql.org/docs/9.4/static/tutorial-install.html

在本章中,我们将涵盖以下主题:

  • 开始使用 Vert.x

  • 处理请求

  • 测试

  • 与数据库一起工作

  • EventBus

开始使用 Vert.x

我们将为我们的微服务使用的框架称为 Vert.x。它是一个与 reactive extensions 共享许多共同点的响应式框架,我们在 第七章 保持响应式 中讨论了它。它是异步和非阻塞的。

让我们通过一个具体的例子来理解这意味着什么。

我们将从一个新的 Kotlin Gradle 项目开始。从你的 IntelliJ IDEA 中,打开 File | New | Project,在 New Project 向导中选择 Gradle | Kotlin,然后点击 Finish。给你的项目一个 GroupId(我选择了 me.soshin)和一个 ArtifactId(在我的例子中是 catsShelter)。

Gradle 是一个构建工具,类似于 Maven 和 Ant。它有一个很好的语法,并以优化的方式编译你的项目。你可以在这里了解更多:gradle.org/

在下一屏上,选择 Use auto-import 和 Create directories for empty content roots,然后点击 Finish。

接下来,将以下依赖项添加到你的 build.gradle 文件中。

dependencies {
    def $vertx_version = '3.5.1'
    ...
    compile group: 'io.vertx', name: 'vertx-core', version: $vertx_version
    compile group: 'io.vertx', name: 'vertx-web', version: $vertx_version
    compile group: 'io.vertx', name: 'vertx-lang-kotlin', version: $vertx_version
    compile group: 'io.vertx', name: 'vertx-lang-kotlin-coroutines', version: $vertx_version
}

以下是对每个依赖项的解释:

  • vertx-core 是核心库

  • vertx-web 是必需的,因为我们希望我们的服务是基于 REST 的

  • vertx-lang-kotlin 提供了使用 Vert.x 编写 Kotlin 代码的惯用方法

  • 最后,vertx-lang-kotlin-coroutines 与我们在 第九章 详细讨论的协程集成,专为并发设计

注意,我们定义了一个变量来指定我们应该使用 Vert.x 的哪个版本。截至目前,最新稳定版本是 3.5.1,但到你阅读这本书的时候,它将是 3.5.2 或甚至 3.6.0。

作为一般规则,所有 Vert.x 库应该使用相同的版本,这时变量就变得很有用了。

src/main/kotlin 文件夹中创建一个名为 Main.kt 的文件,内容如下:

fun main(vararg args: String) {
   val vertx = Vertx.vertx()

   vertx.createHttpServer().requestHandler{ req ->
            req.response().end("OK")
        }.listen(8080)
}

这就是你需要启动一个当你在浏览器中打开 localhost:8080 时会响应 OK 的网络服务器的所有内容。

现在,让我们了解这里实际上发生了什么。我们使用第三章的 工厂方法 从 Understanding Structural Patterns 创建一个 Vert.x 实例。

Handler 只是一个简单的监听器,或者是一个订阅者。如果你不记得它是如何工作的,请查看第四章的 Getting Familiar with Behavioral Patterns,了解 Observable 设计模式。在我们的情况下,它将为每个新的请求被调用。这就是 Vert.x 的异步特性在起作用。

注意,requestHandler() 是一个接收块的函数。像任何其他惯用的 Kotlin 代码一样,你不需要括号。

如果你使用的是 IntelliJ IDEA 等集成开发环境,你可以直接运行它。另一种选择是将以下行添加到你的 build.gradle 文件中:

apply plugin: 'application'
mainClassName = "com.gett.MainKt"

然后,你可以简单地使用以下命令启动它:

./gradlew run

另一个选择是使用 VertxGradlePlugin (github.com/jponge/vertx-gradle-plugin),它将做同样的事情。

路由

注意,无论我们指定哪个 URL,我们总是得到相同的结果。

当然,这不是我们想要达到的目标。让我们先添加最基本的服务端点,它只会告诉我们服务正在运行。

为了做到这一点,我们将使用 Router

val vertx = Vertx.vertx() // Was here before
val router = Router.router(vertx)
...

Router 允许你为不同的 HTTP 方法和 URL 指定处理器。

但是,默认情况下,它不支持协程。让我们通过创建一个扩展函数来解决这个问题:

fun Route.asyncHandler(fn : suspend (RoutingContext) -> Unit) {
    handler { ctx ->
        launch(ctx.vertx().dispatcher()) {
            try {
                fn(ctx)
            } catch(e: Exception) {
                ctx.fail(e)
            }
        }
    }
}

如果你熟悉现代 JavaScript,这类似于 async() => {}

现在,我们可以使用这个新的扩展方法:

router.get("/alive").asyncHandler {
    // Some response comes here
    // We now can use any suspending function in this context
}

我们看到了如何在第一个示例中返回一个平面文本响应。所以,让我们返回 JSON 代替。大多数实际应用程序使用 JSON 进行通信。

将以下行添加到你的处理器中:

...
val json = json {
    obj (
       "alive" to true
    )
}
it.respond(json.toString())
...

我们声明的另一个扩展函数是 respond()。它看起来如下所示:

fun RoutingContext.respond(responseBody: String = "", status: Int = 200) {
    this.response()
            .setStatusCode(status)
            .end(responseBody)
}

现在将你的路由器连接到服务器。

你可以通过用以下行替换之前的服务器实例化来实现这一点:

vertx.createHttpServer().
   requestHandler(router::accept).listen(8080)

现在,所有路由都将由 Router 处理。

你可以在浏览器中打开 http://localhost:8080/alive 并确保你得到 {"alive": true} 的响应。

恭喜!你已经成功创建了第一个返回 JSON 的路由。从现在起,无论何时你不确定你的应用程序是否正在运行,你都可以简单地使用这个 URL 来检查它。当你使用负载均衡器时,这一点尤为重要,因为负载均衡器需要知道在任何时候有多少应用程序可用。

处理请求

我们接下来的任务是向我们的虚拟收容所添加第一只猫。

这应该是一个 POST 请求,其中请求体的内容可能看起来像这样:{"name": "Binky", "age": 4}

如果你熟悉像 curlPostman 这样的工具来发出 POST 请求,那很好。如果不熟悉,我们将在下一节编写一个测试来检查这个场景。

我们首先需要做的是在我们初始化我们的路由器之后添加以下行:

router.route().handler(BodyHandler.create())

这将告诉 Vert.x 将请求体解析为 JSON,适用于任何请求。另一种方法是使用 router.route("/*")

现在,让我们确定我们的 URL 应该是什么样子。良好的实践是将我们的 API URL 进行版本控制,所以我们希望它如下所示:

api/v1/cats

因此,我们可以假设以下:

  • GET api/v1/cats 将返回我们庇护所中所有的猫。

  • POST api/v1/cats 将添加一只新的猫。

  • GET api/v1/cats/34 如果存在,将返回 ID=34 的猫,否则返回 404。

理解了这一点后,我们可以继续如下操作:

router.post("/api/v1/cats").asyncHandler { ctx ->
    // Some code of adding a cat comes here
}
router.get("/api/v1/cats").asyncHandler { ctx ->
    // Code for getting all the cats
}

最后一个端点需要接收一个路径参数。我们使用分号符号来表示:

router.get("/api/v1/cats/:id").asyncHandler { ctx ->
    // Fetches specific cat
}

Verticles

现在遇到了一个问题。我们的代码位于 Main.kt 文件中,它越来越大。我们可以通过使用 verticles 来开始分割它。

你可以把 verticle 看作是一个轻量级 actor。让我们看看以下代码的例子:

class ServerVerticle: CoroutineVerticle() {

    override suspend fun start() {
        val router = router()
        vertx.createHttpServer().requestHandler(router::accept).listen(8080)
    }

    private fun router(): Router {
        val router = Router.router(vertx)
        // Our router code comes here now
        ...
        return router
    }
}

现在我们需要启动这个 verticle。有几种不同的方法可以做到这一点,但最简单的方法是将这个类的实例传递给 deployVerticle() 方法:

vertx.deployVerticle(ServerVerticle())

现在我们的代码被分成两个文件,ServerVerticle.ktMain.kt

注意,但是 /api/v1/cats/ 每次都会重复。有没有一种方法可以消除这种冗余?实际上,有。它被称为 子路由

子路由

我们将保持 /alive 端点不变,但我们将所有其他端点提取到一个单独的函数中:

private fun apiRouter(): Router {
    val router = Router.router(vertx)

    router.post("/cats").asyncHandler { ctx ->
        ctx.respond(status=501)
    }
    router.get("/cats").asyncHandler { ctx ->
        ...
    }
    router.get("/cats/:id").asyncHandler { ctx ->
        ...
    }
    return router
}

有一种更流畅的方式来定义它,但我们保留了原来的方式,因为它更易读。

就像我们向 Vert.x 服务器实例提供主路由器一样,我们现在将子路由器按如下方式提供给主路由器:

router.mountSubRouter("/api/v1", apiRouter())

保持我们的代码干净和良好分离非常重要。

测试

在我们继续将猫添加到数据库之前,让我们首先编写一些测试来确保到目前为止一切正常。

为了做到这一点,我们将使用 TestNG 测试框架。你也可以使用 JUnitVertxUnit 来达到同样的目的。

首先,将以下行添加到你的 build.gradledependencies 部分:

testCompile group: 'org.testng', name: 'testng', version: '6.11'

现在,我们将创建我们的第一个测试。它应该位于 /src/test/kotlin/<your_package>

所有集成测试的基本结构看起来像这样:

class ServerVerticleTest {
    // Usually one instance of VertX is more than enough
    val vertx = Vertx.vertx()

    @BeforeClass
    fun setUp() {
        // You want to start your server once
        startServer()
    }

    @AfterClass
    fun tearDown() {
        // And you want to stop your server once
        vertx.close()
    }

    @Test
    fun testAlive() {
        // Here you assert something
    }

    // More tests come here
    ...
}

一个好技巧是使用 Kotlin 反引号符号来命名你的测试。

你可以像这样命名你的测试:

@Test
fun testAlive() {
    ...
}

但更好的命名测试的方式是这样的:

@Test
fun `Tests that alive works`() {
    ...
}

现在我们想要向我们的 /alive 端点发出实际的 HTTP 请求,例如,并检查响应代码。为此,我们将使用 Vert.x 网络客户端。

将其添加到你的 build.gradle 依赖项部分:

compile group: 'io.vertx', name: 'vertx-web-client', version: $vertx_version

如果你打算只在测试中使用它,你可以指定 testCompile 而不是 compile。但 WebClient 非常有用,你最终可能还是会将其用在代码中。

辅助方法

在我们的测试中,我们将创建两个辅助函数,分别称为 get()post(),它们将向我们的测试服务器发出 GETPOST 请求。

我们将从 get() 开始:

private fun get(path: String): HttpResponse<Buffer> {
    val d1 = CompletableDeferred<HttpResponse<Buffer>>()

    val client = WebClient.create(vertx)
    client.get(8080, "localhost", path).send {
        d1.complete(it.result())
    }

    return runBlocking {
        d1.await()
    }
}

第二种方法 post() 将非常相似,但它还将有一个请求体参数:


private fun post(path: String, body: String = ""): HttpResponse<Buffer> {
    val d1 = CompletableDeferred<HttpResponse<Buffer>>()

    val client = WebClient.create(vertx)
    client.post(8080, "localhost", path).sendBuffer(Buffer.buffer(body), { res ->
        d1.complete(res.result())
    })

    return runBlocking {
        d1.await()
    }
}

这两个函数都使用了 Kotlin 提供的默认参数值协程。

你应该编写自己的辅助函数或根据你的需求修改它们。

我们还需要另一个辅助函数 startServer(),我们已经在 @BeforeClass 中提到过它。它应该看起来像这样:

private fun startServer() {
    val d1 = CompletableDeferred<String>()
    vertx.deployVerticle(ServerVerticle(), {
        d1.complete("OK")
    })
    runBlocking {
        println("Server started")
        d1.await()
    }
}

我们需要两个新的扩展函数来方便我们。这些函数将把服务器响应转换为 JSON:

private fun <T> HttpResponse<T>.asJson(): JsonNode {
    return this.bodyAsBuffer().asJson()
}

private fun Buffer.asJson(): JsonNode {
    return ObjectMapper().readTree(this.toString())
}

现在我们已经准备好编写我们的第一个测试:

@Test
fun `Tests that alive works`() {
    val response = get("/alive")
    assertEquals(response.statusCode(), 200)

    val body = response.asJson()
    assertEquals(body["alive"].booleanValue(), true)
}

运行 ./gradlew test 以检查这个测试是否通过。

接下来,我们将编写另一个测试;这次是为猫的创建端点。

起初,它将失败:

@Test
fun `Makes sure cat can be created`() {
   val response = post("/api/v1/cats",
                """
                {
                    "name": "Binky",
                    "age": 5
                }
                """)

   assertEquals(response.statusCode(), 201)
   val body = response.asJson()

   assertNotNull(body["id"])
   assertEquals(body["name"].textValue(), "Binky")
   assertEquals(body["age"].intValue(), 5)
}

注意,我们的服务器返回状态码 501 Not Implemented,并且没有返回 cat ID。

我们将在下一节讨论数据库持久性时修复这个问题。

与数据库一起工作

如果没有将我们的对象(即猫)保存到某种持久存储的能力,我们将无法取得更大的进展。

为了做到这一点,我们首先需要连接到数据库。

将以下两行添加到你的 build.gradle 依赖部分:

compile group: 'org.postgresql', name: 'postgresql', version: '42.2.2'
compile group: 'io.vertx', name: 'vertx-jdbc-client', version: $vertx_version

第一行代码获取 PostgreSQL 驱动。第二行添加了 Vert.x JDBC 客户端,这使得 Vert.x 在拥有驱动程序的情况下可以连接到任何支持 JDBC 的数据库。

管理配置

现在我们想要将数据库配置存储在某个地方。对于本地开发,可能将配置硬编码是可行的。

当我们连接到数据库时,我们至少需要指定以下参数:

  • 用户名

  • 密码

  • 主机

  • 数据库名

我们应该在哪里存储它们?

当然,一个选项当然是将这些值硬编码。这对于本地环境来说是可以的,但当我们部署这个服务到其他地方时怎么办呢?

你会去,我不能来!XDSpringBoot 做的,或者我们可以尝试从环境变量中读取它们。无论如何,我们需要一个封装这个逻辑的对象,如下面的代码所示:

object Config {
    object Db {
        val username = System.getenv("DATABASE_USERNAME") ?: "postgres"
        val password = System.getenv("DATABASE_PASSWORD") ?: ""
        val database = System.getenv("DATABASE_NAME") ?: "cats_db"
        val host = System.getenv("DATABASE_HOST") ?: ""

        override fun toString(): String {
            return mapOf("username" to username,
                    "password" to password,
                    "database" to database,
                    "host" to host).toString()
        }
    }

    override fun toString(): String {
        return mapOf(
                "Db" to Db
        ).toString()
    }
}

这当然只是你可以采取的一种方法。

现在,我们将使用此配置代码创建 JDBCClient

fun CoroutineVerticle.getDbClient(): JDBCClient {
    val postgreSQLClientConfig = JsonObject(
            "url" to "jdbc:postgresql://${Config.Db.host}:5432/${Config.Db.database}",
            "username" to Config.Db.username,
            "password" to Config.Db.password)
    return JDBCClient.createShared(vertx, postgreSQLClientConfig)
}

在这里,我们选择了一个扩展函数,它将在所有 CoroutineVerticles 上工作。

为了简化与 JDBCClient 一起工作,我们将向其中添加一个名为 query() 的方法:

fun JDBCClient.query(q: String, vararg params: Any): Deferred<JsonObject> {
    val deferred = CompletableDeferred<JsonObject>()
    this.getConnection { conn ->
        conn.handle({
            result().queryWithParams(q, params.toJsonArray(), { res ->
                res.handle({
                    deferred.complete(res.result().toJson())
                }, {
                    deferred.completeExceptionally(res.cause())
                })
            })
        }, {
            deferred.completeExceptionally(conn.cause())
        })
    }

    return deferred
}

我们还会添加 toJsonArray() 方法,因为这是我们 JDBCClient 使用的:

private fun <T> Array<T>.toJsonArray(): JsonArray {
    val json = JsonArray()

    for (e in this) {
        json.add(e)
    }

    return json
}

注意这里 Kotlin 泛型是如何被用来简化转换同时保持类型安全的。

我们还会添加一个 handle() 函数,它将为我们提供一个简单的 API 来处理异步错误:

inline fun <T> AsyncResult<T>.handle(success: AsyncResult<T>.() -> Unit, failure: () -> Unit) {
    if (this.succeeded()) {
        success()
    }
    else {
        this.cause().printStackTrace()
        failure()
    }
}

为了确保一切正常工作,我们将在我们的/alive路由上添加一个检查:

val router = Router.router(vertx)
val dbClient = getDbClient()
...
router.get("/alive").asyncHandler {
    val dbAlive = dbClient.query("select true as alive")
    val json = json {
        obj (
                "alive" to true,
                // This is JSON, but we can access it as an array
                "db" to dbAlive.await()["rows"]
        )
    }
    it.respond(json)
}

需要添加的行用粗体标出。

在添加这些行并打开localhost:8080/alive之后,你应该得到以下 JSON 代码:

{"alive":true, "db":[{"alive":true}]}

管理数据库

当然,我们的测试没有通过。这是因为我们还没有创建我们的数据库。确保你在命令行中运行以下行:

$ createdb cats_db

在我们确认数据库正在运行之后,让我们实现我们的第一个真实端点。

我们将保持我们的 SQL 与实际代码的清晰分离。将以下内容添加到你的ServerVerticle中:

private val insert = """insert into cats (name, age)
            |values (?, ?::integer) RETURNING *""".trimMargin()

我们在这里使用多行字符串,通过|trimMargin()来重新对齐它们。

现在用以下代码调用这个查询:

...
val db = getDbClient()
router.post("/cats").asyncHandler { ctx ->
    db.queryWithParams(insert, ctx.bodyAsJson.toCat(), {
       it.handle({
          // We'll always have one result here, since it's our row
          ctx.respond(it.result().rows[0].toString(), 201)
       }, {
          ctx.respond(status=500)
       })
   })
}

注意,我们没有在任何地方打印错误信息。这是因为我们定义了handle()函数来处理这个任务。

我们还定义了自己的函数来解析请求体,将JsonObject转换为JsonArray,这是JDBCClient所期望的:

private fun JsonObject.toCat() = JsonArray().apply {
   add(this@toCat.getString("name"))
   add(this@toCat.getInteger("age"))
}

注意,这里有两个不同的this版本。一个指的是apply()函数的内部作用域。另一个指的是toCat()函数的外部作用域。要引用外部作用域,我们使用@scopeName注解。

正如你所见,扩展函数是清理代码的极其强大的工具。

当你再次运行我们的测试时,你会注意到它仍然失败,但现在有一个不同的错误代码。这是因为我们还没有创建我们的表。让我们现在就创建它。有几种方法可以做到这一点,但最方便的方法是简单地运行以下命令:

psql -c "create table cats (id bigserial primary key, name varchar(20), age integer)" cats_db

再次运行你的测试以确保它通过。

EventBus

这是第二次我们遇到了相同的问题:我们的类越来越大,我们通常尽可能避免这种情况。

如果我们再次将创建猫的逻辑拆分到一个单独的文件中呢?让我们称它为CatVerticle.kt

但是,我们需要一种方法让ServerVerticleCatVerticle通信。在像SpringBoot这样的框架中,你会使用依赖注入来达到这个目的。但是对于响应式框架呢?

消费者

为了解决通信问题,Vert.x 使用EventBus。它是我们在第四章中讨论的Observable设计模式的实现,熟悉行为模式。任何 verticle 都可以通过事件总线发送消息,在这些两种模式之间进行选择:

  • send()将消息发送给单个订阅者

  • publish()将消息发送给所有订阅者

无论使用哪种方法发送消息,你都可以使用 EventBus 上的consumer()方法来订阅它:

const val CATS = "cats:get"

class CatVerticle : CoroutineVerticle() {

    override suspend fun start() {
        val db = getDbClient()
        vertx.eventBus().consumer<JsonObject>(CATS) { req ->
            ...
        }
    }
}

类型指定了我们期望接收消息的对象。在这种情况下,它是JsonObject。常量CATS是我们订阅的键。它可以是任何字符串。通过使用命名空间,我们确保未来不会发生冲突。如果我们要在我们的收容所中添加狗,我们将使用另一个具有另一个命名空间的常量。例如:

const val DOGS  = "dogs:get" // Just an example, don't copy it

现在我们添加以下两个查询,它们只是多行字符串常量:

private const val QUERY_ALL = """select * from cats"""
class CatVerticle : CoroutineVerticle() {

    private val QUERY_WITH_ID = """select * from cats
                     where id = ?::integer""".trimIndent()
...
}

为什么我们在类内部放置一个,在类外部放置另一个?

QUERY_ALL是一个简短的查询,它适合一行。我们可以允许自己将其作为一个常量。另一方面,QUERY_WITH_ID是一个较长的查询,需要一些缩进。由于我们只在运行时移除缩进,所以我们不能将其作为一个常量。因此,我们使用成员值。在现实世界的项目中,你的大部分查询可能都需要是私有值。但了解两种方法之间的区别很重要。

我们用以下代码填充我们的消费者:

...
try {
    val body = req.body()
    val id: Int? = body["id"]
    val result = if (id != null) {
        db.query(QUERY_WITH_ID, id)
    } else {
        db.query(QUERY_ALL)
    }
    launch {
        req.reply(result.await())
    }
}
catch (e: Exception) {
    req.fail(0, e.message)
}
...

如果请求中包含猫的 ID,我们就获取这只特定的猫。否则,我们获取所有可用的猫。

我们使用launch()是因为我们想要await()结果,并且我们没有返回值。

生产者

剩下的就是从ServerVerticle调用猫。为此,我们将在我们的CoroutineVerticle中添加另一个方法:

fun <T> CoroutineVerticle.send(address: String,
                               message: T,
                               callback: (AsyncResult<Message<T>>) -> Unit) {
    this.vertx.eventBus().send(address, message, callback)
}

然后我们可以这样处理我们的请求:

...
router.get("/cats").asyncHandler { ctx ->
    send(CATS, ctx.queryParams().toJson()) {
        it.handle({
            val responseBody = it.result().body()
            ctx.respond(responseBody.get<JsonArray>("rows").toString())
        }, {
            ctx.respond(status=500)
        })
    }
}
...

注意,我们正在重用之前定义的同一个常量,称为CATS

这样,我们可以轻松地检查谁可以发送这个事件,谁消费它。如果成功,我们将返回一个 JSON。否则,我们将返回一个 HTTP 错误代码。

我们添加的另一个方法是toJson()MultiMap上。MultiMap是一个包含我们的查询参数的对象:

private fun MultiMap.toJson(): JsonObject {
    val json = JsonObject()

    for (k in this.names()) {
        json.put(k, this[k])
    }

    return json
}

为了确保一切按预期工作,让我们为我们的新端点创建两个额外的测试。

只别忘了在你的Main.kt文件和测试中的startServer()函数中添加以下行:

...
vertx.deployVerticle(CatVerticle())
...

更多测试

现在添加以下基本测试:

@Test
fun `Make sure that all cats are returned`() {
    val response = get("/api/v1/cats")
    assertEquals(response.statusCode(), 200)

    val body = response.asJson()

    assertTrue(body.size() > 0)
}

为了确保你理解所有这些是如何协同工作的,这里有一些你可能希望完成的额外任务:

  1. 将添加新猫的逻辑移动到CatVerticle

  2. 实现获取单个猫的功能。注意代码与获取所有猫的代码非常相似?重构它以使用 Kotlin 的一个酷特性——局部函数,我们之前已经讨论过了。

  3. 按照同样的原则实现删除和更新猫的功能。

摘要

本章汇总了我们关于 Kotlin 设计模式和习惯用法所学的所有内容,以生成一个可扩展的微服务。而且,多亏了 Vert.x,它也是反应式的,这使得它具有极高的可扩展性。它还进行了测试,正如任何现实世界的应用程序应该的那样。

在我们的应用程序中,类是根据领域而不是层来划分的,这与通常的 MVC 架构相反。Vert.x 中的最小工作单元被称为 verticle,verticles 通过 EventBus 进行通信。

我们的 API 遵循了所有 REST 的最佳实践:使用 HTTP 动词和有意义的路径来访问资源,以及消费和生成 JSON。

你可以将同样的原则应用到你要编写的任何其他实际应用中,我们确实希望你会选择 Vert.x 和 Kotlin 来实现这一点。

Logo

开源鸿蒙跨平台开发社区汇聚开发者与厂商,共建“一次开发,多端部署”的开源生态,致力于降低跨端开发门槛,推动万物智联创新。

更多推荐