多线程编程练习

上周,我们使用Actor模型解决了字数统计问题:对象在不同的​​线程上运行并通过消息进行通信。 本周,我们将删除对象,并使用线程之间共享的数据结构:这种共享结构在本书中称为数据空间

这是《编程风格练习》焦点系列的 17 帖子。其他帖子包括:

  1. 以编程风格介绍练习
  2. 以编程风格进行练习,将内容堆叠起来
  3. 编程风格的练习,Kwisatz Haderach风格
  4. 编程风格的练习,递归
  5. 具有高阶功能的编程风格的练习
  6. 以编程风格进行练习
  7. 以编程风格进行练习,回到面向对象的编程
  8. 编程风格的练习:地图也是对象
  9. 编程风格的练习:事件驱动的编程
  10. 编程风格的练习和事件总线
  11. 反思编程风格的练习
  12. 面向方面的编程风格的练习
  13. 编程风格的练习:FP&I / O
  14. 关系数据库风格的练习
  15. 编程风格的练习:电子表格
  16. 并发编程风格的练习
  17. 编程风格的练习:在线程之间共享数据 (本文)
  18. 使用Hazelcast以编程风格进行练习
  19. MapReduce风格的练习
  20. 编程风格的练习总结

建模数据空间

原始的Python代码使用两个专用的数据空间:

  1. 要存储从源文件读取的单词,
  2. 存储单词频率

Python代码使用队列对数据空间进行建模 。 因此,在Kotlin中进行相同的操作很有意义。 但是,Java API提供了许多选择,如下面的简化图所示:

Java队列类图

让我们描述一下队列和阻塞队列。

队列

在Java中, Queue接口没有关于元素顺序的明确语义:它可以是FIFOLIFO或完全不同的东西,例如基于优先级属性。

队列提供两种不同的添加,检查和删除元素的方式:一种在操作失败时引发异常,另一种返回特定值。 例如,如果队列为空,则删除元素可能会失败。

特征 例外 特殊价值

Checks an element

element(): E

peek(): E

Adds an element

add(e: E): boolean

offer(e: E): boolean

Removes an element

remove(e: E): boolean

poll(e: E): boolean

阻塞队列

阻塞队列为:

队列还支持以下操作:在检索元素时等待队列变为非空,并在存储元素时等待队列中的空间变为可用。
— JavaDoc
https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/BlockingQueue.html

它添加了两种不同的方法来实现上述操作:一种阻塞并一种超时。

特征 封锁 超时

Adds

put(e: E): boolean

offer(e: E, long timeout, TimeUnit unit): boolean

Removes

take(): E

poll(long timeout, TimeUnit unit): E

有两种现成的阻止感兴趣队列的实现:

  1. ArrayBlockingQueue由简单数组支持
  2. LinkedBlockingQueue使用链接在一起的节点

移植Python代码

原始Python代码的直接移植依赖于相同的设计:可变的数据结构和超时。

主要功能的实现如下:

funrun(filename:String):Map<String,Int>{
    valfreqSpace=LinkedBlockingQueue<Map<String,Int>>()
    valwordSpace=read(filename)
        .flatMap{it.toLowerCase().split("\\W|_".toRegex())}
        .filter{it.isNotBlank()&&it.length>=2}
        .toBlockingQueue()
    valcount=4
    valexecutorService=Executors.newFixedThreadPool(count)
    valcallables=IntRange(1,4).map{_->
        {processWords(wordSpace,freqSpace)} (1)
    }.map{Executors.callable(it)} (2)
    executorService.invokeAll(callables) (3)
    valfrequencies=mutableMapOf<String,Int>() (4)
    while(freqSpace.isNotEmpty()){ (5)
        valpartial=freqSpace.poll(1,TimeUnit.SECONDS) (5)  (6)
        partial?.entries?.forEach{ (5)
            frequencies.merge(it.key,it.value){ (5)
              count,value->count+value (5)
            }
        }
    }
    returnfrequencies
        .toList()
        .sortedByDescending{it.second}
        .take(25)
        .toMap()
}

fun<E>List<E>.toBlockingQueue()=LinkedBlockingDeque<E>(this)
  1. processWord()函数包装在lambda中
  2. 变换Callable兼容拉姆达到Runnable
  3. 开始运行线程,等待最后一个线程完成
  4. 可变图以存储最终结果
  5. 清空队列并在地图中收集结果
  6. 使用超时功能从队列中删除元素
funprocessWords(words:BlockingQueue<String>,
                 frequencies:BlockingQueue<Map<String,Int>>){
  valstopWords=read("stop_words.txt")
    .flatMap{it.split(",")}
  valwordFreq=mutableMapOf<String,Int>()
  while(words.isNotEmpty()){ (1)
    valword=words.poll(1,TimeUnit.SECONDS) (1)  (2)
    if(word!=null&&!stopWords.contains(word)) (1)
      wordFreq.merge(word,1){ (1)
        count,value->count+value (1)
      }
  }
  frequencies.put(wordFreq)
}
  1. 如上所述,清空队列并将结果收集在地图中
  2. 同样,使用超时删除元素

引入并发哈希图

上面的代码,就像原始的Python解决方案一样,有一个问题:它使用队列来存储部分单词频率的映射。 然后,需要将这些部分结果组合在一起以获得最终的词频。 为什么不计算每个线程中的计数?

我们需要一个线程安全的映射:为此,Java API提供了并发的哈希映射。 它提供或覆盖线程安全的方法。

Java并发映射类图

使用并发哈希映射,可以将processWords()函数更新为:

funprocessWords(words:BlockingQueue<String>,
                 frequencies:ConcurrentMap<String,Int>){
  valstopWords=read("stop_words.txt")
    .flatMap{it.split(",")}
  while(words.isNotEmpty()){
    valword=words.poll(1,TimeUnit.SECONDS)
    if(word!=null&&!stopWords.contains(word))
      frequencies.merge(word,1){
        count,value->count+value
      }
  }
}

由于现在在以前的函数中合并了频率,因此可以简化run()函数:

valexecutorService=Executors.newFixedThreadPool(count)
valcallables=IntRange(1,4).map{_->
  {processWords(wordSpace,freqSpace)}
}.map{Executors.callable(it)}
executorService.invokeAll(callables)
returnfreqSpace
  .toList()
  .sortedByDescending{it.second}
  .take(25)
  .toMap()

无股收益

在线程之间共享数据需要同步线程访问。 反过来,这需要锁定并影响性能。 并发专家现在这样:避免并发问题的最佳方法是尽可能避免使用共享数据。 在我们的例子中,没有充分理由在所有线程之间共享空间一词。

为了避免锁定,我们可以将单词分成列表,然后将列表发送到每个线程以分别处理:

valcount=4
valcallables=words.chunked(words.size/count) (1)
  .map{Runnable{processWords(it,freqSpace)}} (2)
  .map{Executors.callable(it)} (3)
valexecutorService=Executors.newFixedThreadPool(count) (3)
executorService.invokeAll(callables) (3)
  1. 在整个单词列表中创建几乎相等大小的块
  2. 将每个块映射到Runnable 。 请注意,将lambda明确分配给Runnable并非绝对必要,它可以提高可读性
  3. 其余代码与以前一样

结论

并发代码很难编写,难以推理,并且更容易发生错误。 Java提供了线程安全的专用数据结构,以减轻负担,甚至减轻一点负担。

最后,完全避免这些问题的最佳方法是完全避免共享数据。 确保尽可能将其作为第一选择。

这篇文章的完整源代码可以在Github上找到。

翻译自: https://blog.frankel.ch/exercises-programming-style/17/

多线程编程练习

Logo

开源鸿蒙跨平台开发社区汇聚开发者与厂商,共建“一次开发,多端部署”的开源生态,致力于降低跨端开发门槛,推动万物智联创新。

更多推荐