编程风格的练习:在线程之间共享数据
上周,我们使用Actor模型解决了字数统计问题:对象在不同的线程上运行并通过消息进行通信。 本周,我们将删除对象,并使用线程之间共享的数据结构:这种共享结构在本书中称为数据空间 。
这是《编程风格练习》重点系列的第 17 个帖子。其他帖子包括:
- 以编程风格介绍练习
- 以编程风格进行练习,将内容堆叠起来
- 编程风格的练习,Kwisatz Haderach风格
- 编程风格的练习,递归
- 具有高阶功能的编程风格的练习
- 以编程风格进行练习
- 以编程风格进行练习,回到面向对象的编程
- 编程风格的练习:地图也是对象
- 编程风格的练习:事件驱动的编程
- 编程风格的练习和事件总线
- 反思编程风格的练习
- 面向方面的编程风格的练习
- 编程风格的练习:FP&I / O
- 关系数据库风格的练习
- 编程风格的练习:电子表格
- 并发编程风格的练习
- 编程风格的练习:在线程之间共享数据 (本文)
- 使用Hazelcast以编程风格进行练习
- MapReduce样式的练习
- 编程风格的练习总结
建模数据空间
原始的Python代码使用两个专用的数据空间:
- 要存储从源文件读取的单词,
- 存储单词频率
Python代码使用队列来建模数据空间 。 因此,在Kotlin中进行相同的操作很有意义。 但是,Java API提供了许多选择,如下面的简化图所示:
让我们描述一下队列和阻塞队列。
队列
在Java中, Queue接口没有关于元素顺序的明确语义:它可以是FIFO , LIFO或完全不同的东西,例如基于优先级属性。
队列提供了两种不同的添加,检查和删除元素的方式:一种在操作失败时引发异常,另一种返回特定值。 例如,如果队列为空,则删除元素可能会失败。
| 特征 | 例外 | 特殊价值 |
|---|---|---|
|
Checks an element |
|
|
|
Adds an element |
|
|
|
Removes an element |
|
|
阻塞队列
阻塞队列为:
队列还支持以下操作:在检索元素时等待队列变为非空,并在存储元素时等待队列中的空间变为可用。
https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/BlockingQueue.html
它添加了两种不同的方法来实现上述操作:一种阻塞并一种超时。
| 特征 | 封锁 | 超时 |
|---|---|---|
|
Adds |
|
|
|
Removes |
|
|
有两种现成的阻止感兴趣队列的实现:
ArrayBlockingQueue由简单数组支持LinkedBlockingQueue使用链接在一起的节点
移植Python代码
原始Python代码的直接移植依赖于相同的设计:可变的数据结构和超时。
主要功能的实现如下:
funrun(filename:String):Map<String,Int>{
valfreqSpace=LinkedBlockingQueue<Map<String,Int>>()
valwordSpace=read(filename)
.flatMap{it.toLowerCase().split("\\W|_".toRegex())}
.filter{it.isNotBlank()&&it.length>=2}
.toBlockingQueue()
valcount=4
valexecutorService=Executors.newFixedThreadPool(count)
valcallables=IntRange(1,4).map{_->
{processWords(wordSpace,freqSpace)} (1)
}.map{Executors.callable(it)} (2)
executorService.invokeAll(callables) (3)
valfrequencies=mutableMapOf<String,Int>() (4)
while(freqSpace.isNotEmpty()){ (5)
valpartial=freqSpace.poll(1,TimeUnit.SECONDS) (5) (6)
partial?.entries?.forEach{ (5)
frequencies.merge(it.key,it.value){ (5)
count,value->count+value (5)
}
}
}
returnfrequencies
.toList()
.sortedByDescending{it.second}
.take(25)
.toMap()
}
fun<E>List<E>.toBlockingQueue()=LinkedBlockingDeque<E>(this)
- 将
processWord()函数包装在lambda中 - 变换
Callable兼容拉姆达到Runnable - 开始运行线程,等待最后一个线程完成
- 可变图以存储最终结果
- 清空队列并在地图中收集结果
- 使用超时机制从队列中删除元素
funprocessWords(words:BlockingQueue<String>,
frequencies:BlockingQueue<Map<String,Int>>){
valstopWords=read("stop_words.txt")
.flatMap{it.split(",")}
valwordFreq=mutableMapOf<String,Int>()
while(words.isNotEmpty()){ (1)
valword=words.poll(1,TimeUnit.SECONDS) (1) (2)
if(word!=null&&!stopWords.contains(word)) (1)
wordFreq.merge(word,1){ (1)
count,value->count+value (1)
}
}
frequencies.put(wordFreq)
}
- 如上所述,清空队列并将结果收集在地图中
- 同样,使用超时删除元素
引入并发哈希图
上面的代码,就像原始的Python解决方案一样,存在一个问题:它使用队列来存储部分单词频率的映射。 然后,需要将这些部分结果组合在一起以获得最终的词频。 为什么不计算每个线程中的计数?
我们需要一个线程安全的映射:为此,Java API提供了并发的哈希映射。 它提供或覆盖线程安全的方法。
使用并发哈希映射,可以将processWords()函数更新为:
funprocessWords(words:BlockingQueue<String>,
frequencies:ConcurrentMap<String,Int>){
valstopWords=read("stop_words.txt")
.flatMap{it.split(",")}
while(words.isNotEmpty()){
valword=words.poll(1,TimeUnit.SECONDS)
if(word!=null&&!stopWords.contains(word))
frequencies.merge(word,1){
count,value->count+value
}
}
}
由于现在将频率合并到上一个函数中,因此可以简化run()函数:
valexecutorService=Executors.newFixedThreadPool(count)
valcallables=IntRange(1,4).map{_->
{processWords(wordSpace,freqSpace)}
}.map{Executors.callable(it)}
executorService.invokeAll(callables)
returnfreqSpace
.toList()
.sortedByDescending{it.second}
.take(25)
.toMap()
无股收益
在线程之间共享数据需要同步线程访问。 反过来,这需要锁定并影响性能。 并发专家现在这样:避免并发问题的最佳方法是尽可能避免使用共享数据。 在我们的例子中,没有充分理由在所有线程之间共享空间一词。
为了避免锁定,我们可以将单词分成列表,然后将列表发送到每个线程以分别处理:
valcount=4
valcallables=words.chunked(words.size/count) (1)
.map{Runnable{processWords(it,freqSpace)}} (2)
.map{Executors.callable(it)} (3)
valexecutorService=Executors.newFixedThreadPool(count) (3)
executorService.invokeAll(callables) (3)
- 在整个单词列表中创建大小几乎相等的块
- 将每个块映射到一个
Runnable。 请注意,将lambda明确分配给Runnable并非绝对必要,它可以提高可读性 - 其余代码与以前一样
结论
并发代码难以编写,难以推理,并且更容易发生错误。 Java提供了线程安全的专用数据结构,以减轻负担,甚至减轻一点负担。
最后,完全避免这些问题的最佳方法是完全避免共享数据。 确保尽可能将其作为第一选择。
翻译自: https://blog.frankel.ch/exercises-programming-style/17/
更多推荐

所有评论(0)