“最近好像没啥热点,还是说太忙了没空摸鱼看新闻了?人大又要召开了,真心希望这一届的委员们能够提一些靠谱的提案,也不枉我上周网购的商品这周还没到北京了···
前一节(Kotlin 学习笔记(六)—— Flow 数据流学习实践指北(二)StateFlow 与 SharedFlow)介绍完了两种热流的构造方法以及它们的特点,那有没有方法可以将冷流转化为热流呢?当然是有的。那为什么需要将冷流转化为热流呢?
假如有这么一个场景:一开始有一个冷流 coldFlow 和它对应的消费者,后来下游又有几个新来的消费者需要使用这个 coldFlow,并且还需要之前已发送过的数据。而冷流的生产者与消费者是一对一的关系,且没有 replay
缓存机制,为新的消费者再创建一个冷流开销较大,这种情况下将冷流转为热流就显得事半功倍了。
Flow 中的 shareIn
操作符就可以将冷流转为热流,它的方法声明是:
// code 1
public fun <T> Flow<T>.shareIn(
scope: CoroutineScope,
started: SharingStarted,
replay: Int = 0
): SharedFlow<T>
首先看返回值,最终确实会转化为一个热流 SharedFlow 实例。方法参数先来看最简单的 replay
参数,就是设置回播到每个新增消费者的数据个数,默认为 0。所以默认情况下,新增的消费者只能收到从它开始收集的时间点之后,生产者发送的数据。
再来看第一个 scope
参数,用于设置一个 CoroutineScope 作用域,注意其生命周期的长度需要比任何消费者都要长,保证被转化成的热流能在所有消费者收集数据进行消费时,都能处于活跃状态。新被转化的热流其实就是一个共享数据流,可以被所有的消费者共享使用。
第二个参数 started
复杂一些,它是用于设置被转化为共享数据流的启动方式,官方提供有 3 种方式,下面一个个说:
SharingStarted.Eagerly
勤快式启动方式。不等第一个消费者出现就会立即启动,需要注意的是,这种方式只会保留启动时数据流发送的前 replay
个数据,再之前的数据会立即丢弃。即不对数据流缓存区以外的数据负责,所以 replay
缓存区大小设置很重要。
SharingStarted.Lazily
懒汉式启动方式。需要等第一个消费者出现才会启动,第一个消费者可以接收到数据流所有发送的数据;但其他后面的消费者只能接收到最近的 replay
个数据。这种方式启动的数据流会一直保持活跃状态,甚至所有的的消费者都退出观察不再接收了,数据流仍然会缓存最近的 replay
个数据。
SharingStarted.WhileSubscribed()
灵活式启动方式。默认情况下就是有消费者来它就立即启动,没消费者接收了它就立即停止。所以在第一个消费者出现数据流就启动,当最后一个消费者退出它就立即停止,但它仍会永久缓存最近的 replay
个数据。此外,这种启动方式还可以根据需求自定义设置参数:
// code 2
public fun WhileSubscribed(
stopTimeoutMillis: Long = 0,
replayExpirationMillis: Long = Long.MAX_VALUE
): SharingStarted =
StartedWhileSubscribed(stopTimeoutMillis, replayExpirationMillis)
stopTimeoutMillis:设置最后一个消费者退出后,多长时间后再关闭数据流。默认是 0,即立即关闭。replayExpirationMillis:设置关闭流之后等待多长时间后,再重置清空缓存区 replay cache
的数据。默认是 Long.MAX_VALUE
,即永远保存。
自定义 SharingStarted
其实还可以自定义启动方式,自己实现 SharingStarted
接口即可。如果看了前三种启动方式的源码,不难会发现,其实启动方式都是使用固定的几个 SharingCommand
实现的。SharingCommand
有三种:
// code 3
public enum class SharingCommand {
/**
* 开始启动,并开始收集上游数据流.
* 多次发送这个命令并没有什么用(支持防抖),如果先发送 STOP 再发送 START 则是重启一个上游数据流。
*/
START,
/**
* 停止数据流, 取消上游数据流的收集所在协程。
*/
STOP,
/**
* 停止数据流, 取消上游数据流的收集所在协程。并且将 replayCache 缓冲区的值重置为初始状态。
* 如果是 shareIn 操作符,则会调用 [MutableSharedFlow.resetReplayCache] 方法;
* 如果是 stateIn 操作符,则会将缓冲数据重置为最初设置的初始值.
*/
STOP_AND_RESET_REPLAY_CACHE
}
感兴趣的同学可以看看 SharingStarted.WhileSubscribed()
的具体实现类 StartedWhileSubscribed
里面的源码。如果需要自定义启动方式,照着葫芦画瓢即可。
既然有 shareIn
,那自然就少不了 stateIn
了。
方法声明:
// code 4
public fun <T> Flow<T>.stateIn(
scope: CoroutineScope,
started: SharingStarted,
initialValue: T
): StateFlow<T>
首先可以看出返回值是一个热流 StateFlow 实例,那么自然而然就需要一个参数给它设置一个初始值,即第三个参数 initialValue
。前两个参数与 shareIn
一样,这里就不再赘述。
从上面的介绍可知,这种启动方式可以在没有消费者时自动取消上游数据流,从而避免资源的浪费。但在实际使用中,建议使用 SharingStarted.WhileSubscribed(5000)
,即在最后一个消费者停止后再保持数据流 5 秒钟的活跃状态。避免在某些特定情况下(如配置改变——最常见就是横竖屏切换、暗夜模式切换)重启上游的数据流。
shareIn
和 stateIn
都会创建一个新的数据流,具体说就是 shareIn
会构建一个 ReadonlySharedFlow 实例;stateIn
则会构建一个 ReadonlyStateFlow 实例。而新创建的数据流会一直保存在内存中,直到传入数据流的作用域被取消或者没有任何引用时才会被 GC 回收。
所以下面代码中,前一部分代码是禁止使用的,正确的使用应该是如后一部分的代码,即在属性中使用。
// code 5
//错误示例:每次调用方法都会构建新的数据流
fun getUser(): Flow<User> =
userLocalDataSource.getUser()
.shareIn(externalScope, WhileSubscribed())
//正确示例:在属性中使用 shareIn 或 stateIn
val user: Flow<User> =
userLocalDataSource.getUser().shareIn(externalScope, WhileSubscribed())
这个参数表示的是 MutableSharedFlow 中活跃的消费者数目,即订阅者的个数。可用于监听消费者的数目变更,下面就是一个例子:
// code 6
sharedFlow.subscriptionCount
.map { count -> count > 0 } // count > 0 说明有消费者,返回 true;= 0 说明没有消费者了,返回 false
.distinctUntilChanged() // only react to true<->false changes
.onEach { isActive -> // configure an action
if (isActive) { // do something } else { // do something }
}
.launchIn(scope) // launch it
这个例子可以在有消费者收集数据流时,做一些自己的操作;当所有消费者都停止收集时,再处理另外的一些操作,比如资源回收等。
distinctUntilChanged
操作符比较面生,它就是过滤掉前面接收到的重复值,从而使得后面只会接收到发生了变化的新值,和 StateFlow 特性一样。
onEach
操作符也比较常见,可以在流上新增一些处理操作,再发给下游。
如果在实际使用中,需要得知上游数据流的一些状态,比如开始、完成等,则需要在上游数据流转为热流之前添加一些操作符起到监听的作用。
onStart、onCompletion 操作符监听启动和完成
// code 7
private fun shareInOnStartDemo() {
val testFlow = flow {
println("++++emit before")
emit(4)
delay(1000)
emit(5)
delay(1000)
emit(6)
}.onStart {
emit(-1)
println("++++ onStart")
}.onCompletion {
emit(-100)
println("++++ onCompletion")
}.shareIn(
lifecycleScope,
SharingStarted.WhileSubscribed(),
8
)
lifecycleScope.launch {
testFlow.collect {
println("++++ collector receive $it")
}
}
}
从打印的 log 可以看到,确实可以监听状态。当然也可以在相同的位置添加 catch
操作符用于监听异常的发生,感兴趣的同学可以试试看。
说了这么多 Flow 的东西,最后以一个实际的例子结束这一章节的学习笔记吧!
下面我将用一个应用实例来讲解 StateFlow 的实际应用。这个例子将会用到 debounce
、distinctUnitChanged
、flatMapLatest
等操作符,用这些操作符去实现一个文本输入中实时查询的例子。
假设有个需求,要实现一个浏览器搜索的功能,根据用户不断输入的字符去查询相关的内容。如果不做任何处理,用户对键入的字符串做的任何修改,都会去请求一次接口,那后端服务器肯定是吃不消的;对于用户而言,在不断输入的过程中返回的结果用户并不会很关心,他只会关心最终输入完成之后请求的数据。那么,如何减少后端的接口请求次数是关键所在。
先来看看核心的代码:
// code 8 ViewModel.kt 文件
val queryStateFlow = MutableStateFlow("")
fun getQueryResult(): Flow<String> {
return queryStateFlow
.debounce(300L)
.distinctUntilChanged()
.flatMapLatest {
if (it.isNullOrBlank()) {
flow { emit("") }
} else {
dataFromNetwork(it).catch {
emitAll( flow { emit("") } )
}
}
}
.flowOn(Dispatchers.IO)
}
// 模拟网络请求的耗时操作
private fun dataFromNetwork(query: String): Flow<String> {
return flow {
delay(2000)
emit(query) // 返回请求的结果
}
}
首先可以直观地感受到,使用 Flow 去处理这一逻辑较为简单,代码量较少,这也是 Flow 的魅力所在。我们按顺序介绍一下所使用到的 Flow 操作符:
debounce 操作符 具体的操作符方法声明:
// code 9
public fun <T> Flow<T>.debounce(timeoutMillis: Long): Flow<T>
用于过滤掉最新的发射值之前 timeoutMillis 时间内发射的值,返回一个过滤后的 Flow。官方栗子非常清楚:
// code 10
flow {
emit(1)
delay(90)
emit(2)
delay(90)
emit(3)
delay(1010)
emit(4)
delay(1010)
emit(5)
}.debounce(1000)
最终会发射出下面的三个值:
3, 4, 5
发射 1 之后不到 1000ms 又发射了 2,所以 1 就会被过滤掉不会发射了,以此类推。所以最后发射的值是一定可以发射成功的。通过这个操作符,我们就可以有效减少频繁请求接口的问题,这里设置的 timeout 为 300ms,即在用户连续输入过程中每间隔 300ms 才去请求一次数据。
distinctUntilChanged 操作符 具体操作符声明为:
// code 11
public fun <T> Flow<T>.distinctUntilChanged(): Flow<T>
用于过滤掉重复的发射值。虽然 StateFlow 本身就可过滤掉没有变化的发射值,但在这里还是需要的,因为用户可能会删除刚输入的字符,这一操作符可进一步减少不必要的接口请求。
flatMapLatest 操作符 我看的代码版本这个操作符还是实验性api,后续可能被移除。具体操作符声明为:
// code 12
@ExperimentalCoroutinesApi
public inline fun <T, R> Flow<T>.flatMapLatest(@BuilderInference crossinline transform: suspend (value: T) -> Flow<R>): Flow<R>
这个操作符可以在原流的基础上生成一个新流,当原流依次发出 a、b 两值时,新流都会接收,但如果新流 a 值的相关操作还未结束,则会取消 a 值的相关操作,并用 b 值进行操作。简单说就是,丢弃旧值操作,换用新值操作。下面是一个例子:
// code 13
fun flatMapLatestDemo() {
val testFlow = flow {
emit("a")
delay(100)
emit("b")
}.flatMapLatest {
flow {
emit("receive $it")
delay(200)
emit("send $it")
}
}
lifecycleScope.launch {
testFlow.collect {
println("----$it")
}
}
}
通过打印的 log 可以看出,a,b 都被 flatMapLatest
操作符接收到了,只有 b 最终通过。这是因为 a 先到达,等待了 100ms 后新的值 b 也到了,但 a 还在等待中,这时 flatMapLatest
就会取消掉 a 后续的操作。如果把 delay(200)
改成 delay(50)
,那最终 a,b 都能被打印出来。
所以这个操作符在 code 8 中的作用就是进一步减少接口请求的次数。当输入的新字符串到来时,就会将之前旧字符串还未结束的请求操作取消掉,用新的字符串去请求数据。
ViewModel.kt
的代码终于说完了,其他的代码就比较常规了,直接上码:
// code 14 MainActivity.kt
binding.editText.addTextChangedListener(object : TextWatcher{
override fun beforeTextChanged(s: CharSequence?, start: Int, count: Int, after: Int) { }
override fun onTextChanged(input: CharSequence?, start: Int, before: Int, count: Int) {
viewModel.queryStateFlow.value = input.toString()
}
override fun afterTextChanged(s: Editable?) { }
})
lifecycleScope.launch {
repeatOnLifecycle(Lifecycle.State.STARTED) {
viewModel.getQueryResult()
.collect {
binding.tvText.text = it
}
}
}
有关 Flow 的相关知识就到此结束了,来个简单总结吧~
1)shareIn
和 stateIn
都可将冷流转化为热流,将数据共享给多个消费者,无需为每个消费者创建同一个数据流的新实例。两者通常用于提升性能,在没有消费者时缓存数据;
2)SharingStarted
启动方式有 Eagerly
、Lazily
、WhileSubscribed
三种,最常用的还是 WhileSubscribed
,有消费者就启动,没有就停止,还能设置停止延时时长和缓存过期时长;3)注意 shareIn
、stateIn
都会新建一个 Flow,不要用于方法的返回值,建议赋值给属性;4)shareIn
、stateIn
与 onStart
、onCompletion
等搭配可监听转成的热流的状态;5)distinctUntilChanged
操作符可过滤重复数据,一般用于 SharedFlow;debounce
可用于在某一时间段内防抖;flatMapLatest
操作符可以用最新值替换旧值发送给下游,旧值直接被取消作废。