javase学习流程
When I heard for the first time about flow I wasn’t excited at all. I remembered how it had have been learning RxJava, the hours I spent figuring out operators, observables, subjects and until now I can’t say know how to do everything using RxJava.
当我第一次听到有关流量的信息时,我一点都不兴奋。 我记得它是如何学习RxJava的,我花了很多时间弄清楚运算符,可观察对象,主题,直到现在我还不能说知道如何使用RxJava做所有事情。
But did I need to learn another reactive streams framework? Yes, I did.
但是我是否需要学习另一个React流框架? 是的,我做到了。
Whoever has had worked with a Java library in Kotlin can tell that dealing with it is pretty straightforward, but some small issues make it annoying, and for RxJava is the type inference.
曾经在Kotlin中使用过Java库的人都可以说,处理它非常简单,但是一些小问题使它变得烦人,对于RxJava来说是类型推断。
There is this wrapper, RxKotlin, that adds extension functions to specify the types explicitly so the compiler can translate the correct version into the generics used in Java.
有一个包装器RxKotlin,它添加了扩展功能来显式指定类型,以便编译器可以将正确的版本转换为Java中使用的泛型。
RxJava is complicated even though the reactive stream definition is simple.The basic definition of a data stream is a producer that emits data over time and a consumer that observes that data. There are two additions to the pattern: 1) A way to communicate no more values in the stream and2) That there has been an error in the stream.
即使React式流定义很简单,RxJava也很复杂。数据流的基本定义是随着时间推移发出数据的生产者和观察该数据的使用者。 该模式有两个附加功能:1)一种在流中不再传递任何值的方法,以及2)流中存在错误。
The real problem I see in RxJava is the steep curve to learn it and the root cause of it is related to the asynchronous and synchronous operators plus, they have added multiple observers to optimize the framework.We need to know two versions of every operator like map and flatMap also a version for each combination between different observers: flatMapSingle or flatMapMaybe if we want to stream the result into another observer.
我在RxJava中看到的真正问题是学习它的难度很大,其根本原因与异步和同步运算符有关,另外,他们添加了多个观察者来优化框架。我们需要了解每个运算符的两个版本,例如map和flatMap也是不同观察者之间每种组合的版本: flatMapSingle或flatMapMaybe如果我们想将结果流式传输到另一个观察者中。
But let’s get started, I’m going to use the code of an app that I made to learn flow.
但是,让我们开始吧,我将使用为学习流程而编写的应用程序代码。
演示应用 (Demo App)
It’s a Pomodoro Technique app, it observe the database and time to show a clock ticking also, it turn on/off the Do Not Disturb mode when you enter to or exit from a pomodoro.
这是一个Pomodoro Technique应用程序,它观察数据库和时间以显示时钟滴答,当您进入或退出Pomodoro时,它会打开/关闭“请勿打扰”模式。
流量⇿可观察 (Flow ⇿ Observable)
In this example the ObserveState use case have two sources of data, in one side we have the TimeApi and, in the other one the TransitionApi.
在此示例中, ObserveState用例有两个数据源,一方面我们有TimeApi ,另一方面我们有TransitionApi 。
The TransitionApi emits a value every time the state changes. In order to do that StateLogDao persist the state in a database and emit a value of any change. In the TransitionApi implementation (RoomTransitionApi) we have to filter values that do not represent a state change, this is because sqlite triggers are executed with any change in the database even if they are not related to the query result set we are observing.
每次状态更改时, TransitionApi发出一个值。 为此, StateLogDao将状态保留在数据库中并发出任何更改的值。 在TransitionApi实现( RoomTransitionApi )中,我们必须过滤不代表状态更改的值,这是因为sqlite触发器会在数据库中发生任何更改的情况下执行,即使它们与我们正在观察的查询结果集无关。
override fun observeTransitionLog(): Flow<Transition> = stateLogDao
.observeLast()
.map { it?.toTransition() }
.filterNotNull()
filterNotNullsimple operator on a flow that filters out anynullvalue
filterNotNull简单运算符,用于过滤掉任何null值的流
The TimeApi emits the current time every second, the implementation is based on Kotlin Ticker Channel and the Java time api.
TimeApi发出当前时间,该实现基于Kotlin Ticker Channel和Java time api。
@Provides
fun provideTimeApi() = object : TimeApi {
override fun now() = Instant.now().epochSecond.toInt()
override fun ticker(): Flow<Int> = kotlinx.coroutines.channels
.ticker(1_000)
.receiveAsFlow()
.map { now() }
}
The ObserveState use case combines the two emissions into a single emission that calculates the state and how long that the system has been in that state.
ObserveState用例将这两种排放合并为一个排放,用于计算状态以及系统处于该状态的时间。
operator fun invoke(): Flow<State> = timeApi
.ticker()
.combine(transitionApi.observeTransitionLog()) { now, transition ->
transition.toState(now)
}
.distinctUntilChanged()
.onEach {
try {
if (it is State.Pomodoro) turnOnZenMode() else turnOffZenMode()
} catch (e: Exception) {
errorChannel.send(e)
}
}
.onEach {
if (it is State.Pomodoro || it is State.Break) it.doOnTimeUp {
nextState(Action.COMPLETE)
}
}
combineit’s likecombineLatestfrom Rxjava, it executes the lambda with the two last emissions of the sources.
combine就像combineLatest的CombineLatest一样,它使用源的最后两个发射执行lambda。
distinctUntilChangedkeeps in memory a reference to the previous emission and if the new one is equal to the previous one, it gets ignored.
distinctUntilChanged在内存中保存到以前发射的引用,如果新的一个等于前一个,它就会被忽略。
onEachit’s the side effect on the stream. LikeonNextof RxJava it does not modify the stream it rather executes a lambda.
onEach是流上的副作用。 像onNext的onNext一样,它不会修改流,而是执行lambda。
We finally have to collect this flow, we can consume it based on the UI state with an operator called asLiveData.
最后,我们必须收集此流,我们可以使用名为asLiveData的运算符根据UI状态使用它。
val viewState: LiveData<ClockViewState> = observeState()
.map { it.toViewState() }
.flowOn(defaultDispatcher)
.catch { errorChannel.send(it) }
.asLiveData(viewModelScope.coroutineContext)
asLiveDataIt subscribes to the flow based on the life cycle of its observer and keeps the subscription active when there is a configuration change.
asLiveData它根据其观察者的生命周期订阅流,并在配置发生更改时使订阅保持活动状态。
暂停功能⇿单一,可能和可完成 (Suspend functions ⇿ Single, Maybe & Completable)
Single, Maybe and Completable were created to optimize the framework but in flow we don’t need that optimization since the lambdas we use are suspend functions and the compiler knows that it only needs to continue the execution after the suspend function finishes.
创建Single,Maybe和Completable是为了优化框架,但在流程中我们不需要那种优化,因为我们使用的lambda是暂停函数,并且编译器知道只需要在暂停函数完成后继续执行即可。
单 (Single)
When we are calculating the next state of the machine we always return it or we throw an exception.
当我们计算机器的下一个状态时,我们总是将其返回,否则会抛出异常。
suspend operator fun invoke(action: Action): State {
val currentState = transitionApi
.getLastTransition()
?.state
?: throw IllegalNullStateException
val nextState = when {
currentState == State.IDLE && action == Action.START -> State.POMODORO
currentState == State.POMODORO && action == Action.STOP -> State.IDLE
currentState == State.POMODORO && action == Action.COMPLETE -> State.DONE
currentState == State.DONE && action == Action.TAKE -> State.BREAK
currentState == State.BREAK && action == Action.START -> State.POMODORO
currentState == State.BREAK && action == Action.COMPLETE -> State.IDLE
else -> throw IllegalActionException(currentState, action)
}
transitionApi.newTransition(timeApi.now(), nextState)
return nextState
}
suspend fun aFunction(): Intany suspend function that always returns anIntor any not null value it’s aSingle.
suspend fun aFunction(): Int任何暂停的功能,它总是返回的Int或任何不为空值,它是一个Single。
也许 (Maybe)
We are using a Maybe when starting the app and we don’t know if the database is empty so, we query for the last transition and if it is null we insert the initial transition.
我们在启动应用程序时正在使用Maybe,并且我们不知道数据库是否为空,因此,我们查询最后的转换,如果它为null,则插入初始转换。
suspend operator fun invoke(): State? = when {
transitionApi.getLastTransition() == null -> State.IDLE.also {
transitionApi.newTransition(timeApi.now(), it)
}
else -> null
}
suspend fun aFunction(): Int?any suspend function that returns anIntornullit’s aMaybe.
suspend fun aFunction(): Int?任何返回Int或null暂停函数都是Maybe。
可完成 (Completable)
We are using a completable when inserting a new transition in the database, we execute a suspend function that does not return anything.
在数据库中插入新过渡时,我们使用了completable,我们执行了不返回任何内容的suspend函数。
override suspend fun newTransition(timestamp: Int, state: State) {
stateLogDao.insert(StateLog(0L, timestamp, state.name))
}
suspend fun aFunction()any suspend function that doesn’t return anything it’s aCompletable.
suspend fun aFunction()任何暂停不返回任何东西它是一个功能Completable。
频道⇿主题 (Channels ⇿ Subjects)
Channels are like RxJava Subjects, they are something where you can write in one end and read from the other. The reading operation can be consume as a flow and there several kind of channels. We are going to talk about about of ConflatedBroadcastChannel is a non blocking broadcast channel that only delivers the latest value to it’s current subscribers. Late ones won’t receive previous values.
渠道就像RxJava主题一样,您可以在其中一方面编写内容,另一方面可以进行阅读。 读取操作可以作为流消耗,并且存在多种通道。 我们将要讨论的ConflatedBroadcastChannel是一个无阻塞广播频道,仅向其当前订户提供最新的价值。 迟到的将不会收到以前的值。
The ObserveState use case depends on ZenModeApi to turn on and off the Do Not Disturb mode but if the user hasn’t granted permissions to the app, it will throw an exception terminating the flow.
ObserveState用例取决于ZenModeApi来打开和关闭“请勿打扰”模式,但是如果用户尚未授予该应用程序许可,则它将引发异常以终止流程。
In order to not terminate the flow and handle the exception gracefully asking the user for permissions we try-catch the turning on/off and post the exception to the injected ConflatedBroadcastChannel, and continue with the main responsibility of the use case, observe the state.
为了不终止流程并优雅地向用户询问权限,我们try-catch打开/关闭并将异常发布到注入的ConflatedBroadcastChannel ,并继续用例的主要职责,观察状态。
operator fun invoke(): Flow<State> = timeApi
.ticker()
.combine(transitionApi.observeTransitionLog()) { now, transition ->
transition.toState(now)
}
.distinctUntilChanged()
.onEach {
try {
if (it is State.Pomodoro) turnOnZenMode() else turnOffZenMode()
} catch (e: Exception) {
errorChannel.send(e)
}
}
.onEach {
if (it is State.Pomodoro || it is State.Break) it.doOnTimeUp {
nextState(Action.COMPLETE)
}
}
In the ClockViewModel we re-emit all the errors in the channel but if any of those errors is ZendModeApi.AccessDeniedException we catch it and post a different message in a navigation channel to ask for permissions.
在ClockViewModel我们重新发射该通道中的所有错误,但是如果其中任何一个错误是ZendModeApi.AccessDeniedException我们将捕获该错误并在导航通道中发布另一条消息以请求权限。
val errors = errorChannel.asFlow().handleErrors()
private fun Flow<Throwable>.handleErrors(): Flow<Throwable> = flow {
distinctUntilChanged().collect {
if (it is ZenModeApi.AccessDeniedException) {
navigationChannel.send(ClockNavDirection.AskDndPermission)
} else {
emit(it)
}
}
}
测试中 (Testing)
Testing with flow at first glance may not seem as straightforward as in RxJava because we do not have these robust tools, such as TestObserver, TestSubscriber, and TestScheduler but very recently SQLDelight’s FlowAssert has been extracted out into its own library called Turbine. It’s a small testing library for flow for which you would need Kotlin 1.4.0.
乍看之下,使用流进行测试似乎并不像RxJava中那样简单,因为我们没有这些强大的工具,例如TestObserver , TestSubscriber和TestScheduler但是最近SQLDelight的FlowAssert被提取到了自己的名为Turbine的库中。 这是一个用于流程的小型测试库,您将需要Kotlin 1.4.0。
调度器 (Schedulers)
A good practice in RxJava was to inject the scheduler that we wanted to observe the stream. In flow it’s a good practice too. In the ClockViewModel we insert the CoroutineDispatcher where we want to collect the values of the flow.
RxJava中的一个好习惯是注入我们要观察流的调度程序。 在流程中,这也是一个好习惯。 在ClockViewModel我们插入CoroutineDispatcher我们要收集的流量值。
class ClockViewModel @Inject constructor(
observeState: ObserveState,
private val nextState: NextState,
private val errorChannel: BroadcastChannel<Throwable>,
private val defaultDispatcher: CoroutineDispatcher
) : ViewModel() {
val viewState: LiveData<ClockViewState> = observeState()
.map { it.toViewState() }
.flowOn(defaultDispatcher)
.catch { errorChannel.send(it) }
.asLiveData(viewModelScope.coroutineContext)
}
It would enable us to write a test where we can send the test scheduler instead of hard coding one.
这将使我们能够编写一个测试,在这里可以发送测试调度程序,而不是对其进行硬编码。
@Test
fun `60 seconds clock`() = coroutineRule.testDispatcher.runBlockingTest {
whenever(observeState.invoke()).thenReturn(flowOf(State.Pomodoro(60)))
viewModel = ClockViewModel(
observeState = observeState,
nextState = nextState,
errorChannel = errorChannel,
defaultDispatcher = coroutineRule.testDispatcher
)
viewModel.viewState.observeForTesting {
assertThat(viewModel.viewState.value?.clock).isEqualTo("1:00")
}
}
I’m using runBlockingTest, an function of TestCoroutineDispatcher that is a fake implementation of a coroutine dispatcher that let you control the execution of coroutines when doing tests.
我使用runBlockingTest ,的功能TestCoroutineDispatcher那是假的实施协程调度,让做测试时你控制的协同程序的执行。
流值 (Stream Values)
A good technique to test streams is to check all the events emitted by the source under test. In RxJava we had TestObserver that would let us assert the emitted values. With Turbine we can consume the emitted values and use any assertion library. I used AssertJ.
测试流的一种好方法是检查被测源发出的所有事件。 在RxJava中,我们有TestObserver ,可以让我们断言发射的值。 使用Turbine,我们可以使用发出的值并使用任何断言库。 我使用了AssertJ。
@Test
fun `Idle State`() = runBlockingTest {
whenever(transitionApi.observeTransitionLog()).thenReturn(flowOf(Transition(State.IDLE, 0)))
val observeStateFlow = observeState()
observeStateFlow.test {
assertThat(expectItem()).isEqualTo(ObserveState.State.Idle)
expectComplete()
}
}
副作用 (Side effects)
If your are not testing the stream but rather the side effects you can collect all the elements but ignore them.
如果您不是在测试流,而是在测试副作用,则可以收集所有元素,而忽略它们。
@Test
fun `Turn on DND when Pomodoro starts`() = runBlockingTest {
zenModeApi.mode = ZenMode.Off
whenever(transitionApi.observeTransitionLog()).thenReturn(
flowOf(
Transition(State.IDLE, 0),
Transition(State.POMODORO, 0),
Transition(State.DONE, 0)
)
)
observeState().test { cancelAndIgnoreRemainingEvents() }
verify(zenModeApi).mode = ZenMode.AlarmsOnly
assertThat(zenModeApi.mode).isEqualTo(ZenMode.Off)
}
差异性 (Differences)
We already cover what’s similar but let’s talk about what’s different.
我们已经介绍了相似之处,但让我们谈谈不同之处。
Synchronous and Asynchronous. It’s the same in flow, if we have a lambda marked as suspend function but that function does not call any suspend function. It becomes a regular function
同步和异步。 如果我们有一个lambda标记为suspend函数,但该函数不调用任何suspend函数,则流程是相同的。 变成常规功能
collect is always executed in the scope it was called. In RxJava we have an operator called subscribeOn that controls where the subscription happens, since flow can only be collected inside a coroutine scope we do not have that operator we have the coroutine builders like launch
collect总是在它被调用的范围内执行。 在RxJava中,我们有一个名为subscribeOn的运算符,用于控制订阅的发生位置,因为流只能在协程范围内收集,所以我们没有该运算符,因此我们有像启动这样的协程生成器
Operators can’t modify the scope of the downstream. In RxJava we have observeOn operator that changes the downstream operators, in kotlin we can not do that instead we have coroutines scopes
操作员无法修改下游范围。 在RxJava中,我们有observeOn运算符可以更改下游运算符,在kotlin中,我们不能这样做,而是拥有协程范围
Disposables can live anywhere in the app, jobs can only live inside of a scope. This is a feature where we cannot forget to clear a composite disposable, we always have this structured coroutine scope.
一次性物品可以放在应用程序中的任何位置,而作业只能放在范围内。 这是我们不能忘记清除一次性复合材料的功能,我们始终具有这种结构化的协程范围。
结论 (Conclusion)
If you are learning reactive programing for the first time, don’t bother learning RxJava it would take a lot of time and effort. Learning flow would give you the same tools to write reactive code but in a simpler and Kotlin idiomatic way.
如果您是第一次学习React式编程,请不要花很多时间和精力去学习RxJava。 学习流程将为您提供编写React式代码的相同工具,但采用的是一种更简单且Kotlin惯用的方式。
flow won’t solve all the problems but with a good combination of architecture, design patterns and testing you would be able to scale any app over time.
流程无法解决所有问题,但只要将架构,设计模式和测试完美结合,您就可以随时间扩展任何应用程序。
翻译自: https://medium.com/@gianpamx/how-i-learned-flow-7f5e2d102dfc
javase学习流程



所有评论(0)