相关文章推荐
乐观的松球  ·  javascript - Using ...·  1 年前    · 
逆袭的领结  ·  Selenium Python ...·  1 年前    · 

协程里 挂起函数仅可以异步返回 单个值,而 Flow 则可以异步返回多个值,并补全kotlin语言中响应式编程的空白。

推荐阅读: 关于Kotlin中的Collections、Sequence、Channel和Flow

  • Case 1 :比如压缩图片需要执行多个异步任务,完成一个通知一下,一般我们会使用 线程池 + 回调的方式执行 :
  • Iterator<InputStreamProvider> iterator = mStreamProviders.iterator();
    while (iterator.hasNext()) {
      final InputStreamProvider path = iterator.next();
      AsyncTask.SERIAL_EXECUTOR.execute(new Runnable() {
        @Override
        public void run() {
          try {
            File result = compress(context, path);
            mHandler.sendMessage(...);
          } catch (IOException e) {
            mHandler.sendMessage(...);
      iterator.remove();
    // 使用:
    LubanBuilder().load(path)
    .setCompressListener(object : OnCompressListener {
            override fun onSuccess(file: File) {
    }).launch()
    

    而如果你用 Kotlin Flow 一切都变得那么简单明了:

    fun zipImages(paths:List<String>):Flow<Result<File>>{ return paths.map{ path-> flow { emit(compress(context, path)) }.catch{ exception -> emit(Result.Error(exception)) }.merge().flowOn(Dispaters.IO) launch{ zipImages().collect{ result-> when(result){ is Result.Success ->{ is Result.Error ->{
  • case 2 :再或者我们有个回调需要改造成协程,这个回调会多次触发:
  • interface  SimpleInterface {
        fun onReceive(value: Int)
    suspend  fun simpleSuspend(): Int {
        return suspendCoroutine { coroutine ->
     val callback = object : SimpleInterface {
                override  fun onReceive(value: Int) {
                    coroutine.resume(value)
            callback.onReceive(1)
            //再来一次 !
            callback.onReceive(2)
    

    但如果真的resume 多次,协程则会抛异常:

    可以看看协程原理介绍

  • Google 官方也在大力推荐使用Flow
  • 仅仅是能返回多个值就值得如此力荐? 不, 推荐它的原因更多是它丰富的操作符,用Flow能低成本的异步处理数据,下面让我们结合项目实例来看看它有哪些优势。

    首先Flow 分两种:

    //使用 ,注意 collect 是个挂起函数,collect 后面如果有代码 不会立即执行 coroutineScope.launch{ simpleFlow.collect{ value-> println(value)

    推荐使用onEach + launchIn 因为 collect 是挂起函数,后面如果有代码可能不被立即执行。

  • collectcollectIndexedcollectLatesttoListtoSetlastfirstlaunchIn
  • 更多操作符参见 :一眼看全:Kotlin Flow 操作符大全

    一般的flow 是 “冷”的,即 : 不消费 则 不生产,多次消费多次生产

    顺带看下 官方提供的API 的简洁之处 :

    flow 是基于 协程的,因此其 生命周期是和CoroutineScope 挂钩的。

    fun simple(): Flow<Int> = flow { 
        for (i in 1..3) {
            delay(100)          
            println("Emitting $i")
            emit(i)
    fun main() = runBlocking<Unit> {
        withTimeoutOrNull(250) { // 在 250 毫秒后超时
            simple().collect { value -> println(value) } 
        println("Done")
    Emitting 1
    Emitting 2
    

    通过launchIn操作符我们还能拿到 Job ,来自行控制Flow 的取消:

    val job = simple().onEach { value ->
     println(value)
    } .launchIn(this)
    launch {
    delay(250)
        job.cancel()
    Emitting 1
    Emitting 2
    

    一般来说我们不需要关心流的生命周期,在Android上我们通常会使用LifecycleScope 或者 ViewModelScope ,因此在页面关闭时这些Flow 都会被取消。

    为了保证 流的 的透明,flow 构造内禁止 构建 try catch ,可以使用catch操作符来捕获异常

    flow { emit(1) throw Exception("test") }.catch { e-> // 可以继续在catch 里 throw 移除 // 也可以调用 emit 将异常转化为值 发出去 // 也可以只打印日志

    背压 (Back Pressure) ,就是生产速率大于了消费速率。 这个问题得益于 suspend 的魔力,flow 会将生产端挂起 ,同时也有操作符供我们选择:

  • buffer 添加缓冲区
  • listOf(1,2,3,4,5).asFlow().onEach {
            delay(100)
        }.buffer(capacity = 2, onBufferOverflow = BufferOverflow.SUSPEND)
        .collect {
            delay(500)
    

    capacity : 缓冲区容量 默认 64

    onBufferOverflow : 超出缓冲区之后的策略 ,有 挂起,抛弃最新,抛弃最旧 三种策略

  • 还有 conflate 、collectLatest 等操作符,不过都是 buffer的封装
  • 其他操作符

    官方提供了大量简洁好用的操作符,这里结合实际例子来介绍部分操作符简化开发工作的实例:

  • catch
  • retry 失败重试
  • 把上面的操作结合到一起,封装一下(copy from iosched)
  • 之后我们简单网络请求就可以这样写了:

    //定义  usecase
    class PopupUseCase : FlowUseCase<Unit, GetPopupsData>(CommonIOPool) {
        private val service by requestService(PopupApiClient::class.java)
        override fun execute(parameters: Unit): Flow<Result<GetPopupsData>> {
            return service.getPopups().asFlowResult()
    //在ViewModel 中使用
    private val popupUseCase = PopupUseCase()
    popupUseCase(Unit).onSuccess { result ->
        }.onLoading{
        }.onFail{
        }. launchIn(viewModelScope) 
    

    onSuccess,onLoading,onFail是项目中自己的封装。

    fun <T> Flow<Result<T>>.onSuccess(onSuccess: (T) -> Unit): Flow<Result<T>> {
        return this.onEach { result ->
            if (result is Result.Success) {
                onSuccess.invoke(result.data)
    

    更多更全操作符请参阅:一眼看全:Kotlin Flow 操作符大全

    前言之 LiveData

    LiveData 的历史要追溯到 2017 年。彼时,观察者模式有效简化了开发,但诸如 RxJava 一类的库对新手而言有些太过复杂。为此,架构组件团队打造了 LiveData: 一个专用于 Android 的具备自主生命周期感知能力的可观察的数据存储器类。LiveData 被有意简化设计,这使得开发者很容易上手;

    LiveData 对于 Java 开发者、初学者或是一些简单场景而言仍是可行的解决方案。而对于一些其他的场景,更好的选择是使用 Kotlin 数据流 (Kotlin Flow) —— 从 LiveData 迁移到 Kotlin 数据流

    liveData.observe(lifecycleOwner) { value -> textView.text = value liveData.value = 1 liveData.postValue(1)
  • 不支持背压,快速postValue 只能收到最后一次的回调
  • 粘性事件,当配置变更时再次绑定会立即收到上次的值,如果用来处理事件就会有问题
  • 观察只能在主线程
  • 提供的 Transformations.map / switchMap 都是在主线程操作
  • 没有操作符来做复杂转换
  • 和 Android 组件绑定 ,不利于单元测试
  • 上面的冷流是单播 ,即一次消费对应一次生产。而实际开发中也有许多 多播 + 热流的需求,为了应对各种场景 Flow 推出了 SharedFlowStateFlow :

    SharedFlow

    val hotData = MutableSharedFlow<Int>(replay = 1, 
                                        extraBufferCapacity = 64 ,
                                        onBufferOverflow = BufferOverflow.DROP_OLDEST)
    hotData.onEach{ value->
        println("1号观察者 观察到:$value")
    }.launchIn(coroutineScope)
    launch {
    hotData.emit(1) //emit 是个挂起函数
    hotData.onEach{ value->
        println("2号观察者 观察到:$value")
    }.launchIn(coroutineScope)
    launch {
    hotData.emit(2)
    2号观察者 观察到:1
    1号观察者 观察到:1
    1号观察者 观察到:2
    2号观察者 观察到:2
    //如果 replay = 0 
    1号观察者 观察到:1
    1号观察者 观察到:2
    2号观察者 观察到:2
    

    上面说到如果我们用LiveData是“粘性事件”,新订阅者会理解收到之前的值,如我们使用LiveData 控制 Toast ,则会再次弹出。

    LiveData会保证订阅者总能在值变化的时候观察到最新的值,并且每个初次订阅的观察者都会执行一次回调方法。这样的特性对于维持 UI 和数据的一致性没有任何问题,但想要观察LiveData发射一次性的事件就超出了其能力范围

    为了解决这个问题,你可以改造Event :

    open  class Event<out T>(private  val content: T) {
        var hasBeenHandled = false   private  set  // Allow external read but not write   fun getContentIfNotHandled(): T? {
            return  if (hasBeenHandled) {
                null  } else {
                hasBeenHandled = true  content
        }   fun peekContent(): T = content
    

    但这只是最粗暴的解法,这会导致这个Event只能有一个观察者。如果想支持多个观察者还得继续改造。

    而我们可以利用SharedFlow 来做事件回调,无需任何改造:

  • 当 replay = 0 时(默认也为0 ),我们完全可以用SharedFlow来当做事件发送载体,不用担心被重放
  • 需要注意 emittryEmit ,二者差别巨大,一般情况建议用 emit, 背后原理下期分析
  • 项目实战: 点击ViewBinder中的卡片打开子页面
  • private val _openReviewFragmentEvent = MutableSharedFlow<Unit>()
    val openReviewFragmentEvent = _openReviewFragmentEvent.asSharedFlow()
    //观察事件
    viewModel.openReviewFragmentEvent.onEach {
        toggleReviewFragment()
    } .launchWhenResumed(lifecycleScope)
    //发送事件
    viewModel {
        _openReviewFragmentEvent.emit(Unit) 
    

    即使手机配置变更,此处也不会再次回调,是用作事件发送的简单手段。如果你不想事件重复消费,可以使用 channel + flow 的方式处理。

  • 项目实战: 数据缓存池
  • 之前有个文字聊天室的需求,定时轮询拉取聊天消息,每次拉取 20条,缓存池 200 ,满了就丢掉旧数据,然后间隔500ms展示一条数据。当时写了很长的代码,现在使用SharedFlow可以轻松实现 ,甚至进行更多定制:

    //定义消息池
    val messagePool = MutableSharedFlow<Int>(replay = 0 , 
                                        extraBufferCapacity = 200 ,
                                        onBufferOverflow = BufferOverflow.DROP_OLDEST)
    //发送数据
    mesaagePool.emit(message)
    //消费数据
    mesaagePool.onEach{
      delay(500)
    }.launchIn(coroutineScope)
    

    SharedFlow加上LifecycleScope 你甚至可以用SharedFlow 改造成 FlowEventBus : FlowEventBus

    StateFlow

    SharedFlow的一种特殊实现,replay=1,无缓存配置,DROP_OLDEST。 功能和定位与LiveData相似:

  • 允许多个观察者
  • 有只读和可变两种类型
  • replay = 1
  • 但是和LiveData不同的是 :

  • 必须配置初始值
  • value 空安全
  • Flow丰富的异步数据流操作
  • 默认数据防抖(连续相同的值不会回调)
  • 项目实战:

    // viewModel 中定义 flow
    private val _pageState = MutableStateFlow<Result<Unit>>(Result.Loading)
    val pageState: StateFlow<Result<Unit>> = _pageState.asStateFlow()
    // 页面里注册观察
    viewModel.pageState.onSuccess {
    } .launchWhenResumed(lifecycleScope)
    //viewModel 获取数据后设置值
    repository.getResult(...).onStart  {
    _pageState.value = Result.Loading
    } .onSuccess  { result ->
    _pageState.value = Result.Success(Unit)
    } .onFail  { exception ->
    _pageState.value = Result.Error(exception)
    } .launchIn(viewModelScope)
    

    使用起来和LiveData差不多,但结合Flow 丰富的操作符,就能解决更多问题了:

  • 项目实战: 搜索框-联想搜索-debounce
  • val _searchQuery = MutableStateFlow(EMPTY)
    object : NormalTextWatcher() {
        override fun afterTextChanged(text: Editable?) {
            _searchQuery.value = text.toString()
    _searchQuery.filter { it.isNotEmpty() } // 过滤空内容,避免无效网络请求
                .debounce(300) // 300ms防抖
                .flatMapLatest { searchFlow(it.toString()) } //执行搜索并且新搜索覆盖旧搜索
                .flowOn(Dispatchers.IO) // 让搜索在异步线程中执行
                .onEach { updateUi(it) } // 获取搜索结果并更新界面
                .launchIn(mainScope) // 在主线程收集搜索结果// 更新界面fun updateUi(it: List<String>) {}
    

    debounce : 指定时间内的值只接收最新的一个

    SharedFlowStateFlow 怎么选?

  • 在Android 开发中, StateFlow 效果和LiveData等同,用于UI 数据绑定即可
  • SharedFlow 功能更强大,按需使用,一般可以用作事件广播
  •  fun uploadFiles(files: List<File>): Flow<UploadPicResult> {
        return callbackFlow  {
    UploadImageWorker().upload(files.map { file-> UploadPicInfo(file.name, file.absolutePath) } ,
                object : IUploadPicListener {
                    override fun onSingleUploadSuccess(result: UploadPicResult) {
                        trySendBlocking(result)
                    override fun onSingleUploadFailure(result: UploadPicResult?) {
                    override fun onUploadComplete() {
                         close()  //flow 发送结束,关闭通道
            awaitClose {
            //如果回调需要解注册,可以在这里操作
    
  • 项目实战: ViewPager2
  • ViewPager2中不可见的Fragment生命周期是 onPause ,对于LiveData而言onPause仍属于活跃状态,仍会收到事件回调。😱 但是 如果使用 Lifecycle ktx 里提供的 LaunchWhenX系列 搭配 Flow 就没这个问题啦。

    lifecycleScope.launchWhenResumed {
    flow.collect { value ->
         println(value)
    //项目中已经封装了方法,也可以按以下方式调用,少点括号
    flow.onEach{ value ->
        println(value)
    }.launchWhenResumed(lifecycleScope)
    

    因为 flowcollect 是个挂起函数,当被 pause时 就会被挂起,不会收到回调啦。

    但这个只是粗暴的挂起,我们可以使用Lifecycle-ktx 2.4.0 推出的API repeatOnLifecycle

    来进行观察,这个方法会在对应的生命周期 进行重复执行 和 取消,这样可以减少资源的浪费。

    lifecycleScope.launch {
     lifecycle.repeatOnLifecycle(Lifecycle.State.RESUMED) {
            flow.collect {
    //每次都这么写也太麻烦了 ,官方为Flow封装了一个扩展方法
    flow.flowWithLifecycle(lifecycle,Lifecycle.State.RESUMED)
    

    关于这个 API官方还发文介绍下了其背后的故事, ****repeatOnLifecycle API design story

    一言蔽之 :

    launchWhenX 暂停协程的执行,repeatOnLifecycle取消并重新启动新的协程

  • 项目实战:压缩上传图片
  •  draft.getImagesPath().map { path ->
        flow {
              //压缩文件
             emit(zipImage(draft.skuId, path))
    }.flatten().merge().flatMapMerge(6) { zipFile ->
        flow {
            //上传文件
             emit(uploadFiles(zipFile))
    }.catch { exception ->
        Logger.d(TAG, exception.toString())
    }.retry(3).cancellable().flowOn(CommonIOPool)
    

    LiveData 适用于简单的UI绑定场景,没什么问题。

    Flow 提供了大量的操作符来简化我们的开发,这也没问题,很香。

    SharedFlowStateFlow 前者 用于处理 Event,后者用于处理State 同样没问题。

    对标LiveData 的是 StateFlowFlow本身定位是类似RxJava是用于响应式编程的API,

    既然StateFlow 能做LiveData的活,并且功能更强大,可以简化数据处理, 用它何乐而不为呢。

    分类:
    Android
    标签: