相关文章推荐
酷酷的茴香  ·  Jetpack ...·  2 周前    · 
安静的棒棒糖  ·  AttributeError: ...·  2 年前    · 
耍酷的墨镜  ·  VB.net ...·  2 年前    · 
才高八斗的椅子  ·  Chat GPT 和 ...·  2 年前    · 
呐喊的槟榔  ·  X-MOL·  2 年前    · 

如何用Kotlin coroutines实现定时器

51 人关注

我想用Kotlin coroutines实现定时器,类似于用RxJava实现的这个。

       Flowable.interval(0, 5, TimeUnit.SECONDS)
                    .observeOn(AndroidSchedulers.mainThread())
                    .map { LocalDateTime.now() }
                    .distinctUntilChanged { old, new ->
                        old.minute == new.minute
                    .subscribe {
                        setDateTime(it)

它将每隔一分钟发出LocalDateTime。

2 个评论
我认为你可以使用股票频道。 kotlinlang.org/docs/reference/coroutines/...
@marstran 不再是了,他们现在已经被淘汰了。
android
kotlin
kotlin-coroutines
Roman Nazarevych
Roman Nazarevych
发布于 2019-02-22
12 个回答
Joffrey
Joffrey
发布于 2020-05-18
已采纳
0 人赞同

Edit 请注意,原答案中建议的API现在被标记为 @ObsoleteCoroutineApi

Ticker通道目前没有与结构化并发集成,其api将在未来改变。

你现在可以使用 Flow 的API来创建你自己的代码流。

import kotlin.time.Duration
import kotlin.time.Duration.Companion.seconds
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun tickerFlow(period: Duration, initialDelay: Duration = Duration.ZERO) = flow {
    delay(initialDelay)
    while (true) {
        emit(Unit)
        delay(period)

而且你可以用一种与你目前的代码非常相似的方式来使用它。

tickerFlow(5.seconds)
    .map { LocalDateTime.now() }
    .distinctUntilChanged { old, new ->
        old.minute == new.minute
    .onEach {
        setDateTime(it)
    .launchIn(viewModelScope) // or lifecycleScope or other

注意:在这里写的代码中,处理元素的时间没有被tickerFlow考虑在内,所以延迟可能不是正常的(这是一个延迟between元素处理)。如果你希望滴答器独立于每个元素的处理,你可能希望使用一个buffer或一个专门的线程(例如通过flowOn)。

Original answer

我相信这仍然是实验性的,但你可以用一个票据频道每隔X毫秒产生一次值。

val tickerChannel = ticker(delayMillis = 60_000, initialDelayMillis = 0)
repeat(10) {
    tickerChannel.receive()
    val currentTime = LocalDateTime.now()
    println(currentTime)

如果你需要在你的 "subscribe "为每个 "tick "做一些事情时继续做你的工作,你可以launch一个后台的coroutine,它将从这个通道读取并做你想要的事情。

val tickerChannel = ticker(delayMillis = 60_000, initialDelayMillis = 0)
launch {
    for (event in tickerChannel) {
        // the 'event' variable is of type Unit, so we don't really care about it
        val currentTime = LocalDateTime.now()
        println(currentTime)
delay(1000)
// when you're done with the ticker and don't want more events
tickerChannel.cancel()

如果你想从循环内停止,你可以简单地脱离它,然后取消这个通道。

val ticker = ticker(500, 0)
var count = 0
for (event in ticker) {
    count++
    if (count == 4) {
        break
    } else {
        println(count)
ticker.cancel()
    
有什么办法可以 "取消 "勾选吗?怎样才能暂停/取消勾选?
@Lifes 你可能需要有某种 "活动 "状态的变量,以便在你收到tick时检查。当你想 "暂停 "时,你可以把它设置为假,当你想 "恢复 "时,再设置为真。
谢谢你的快速回复。鉴于我的使用情况,我不希望它一直滴答作响,所以我打算取消并根据需要重新创建它。
aLx
ticker在 "1.3.2 "版本中被标记为 "ObsoleteCoroutinesApi",这意味着。"标志着声明是 obsolete 在coroutines API中,这意味着相应的声明的设计有严重的已知缺陷,它们将在未来被重新设计。粗略的说,这些声明在未来会被废弃,但目前还没有替代品,所以不能马上被废弃。"
Steffen Funke
Steffen Funke
发布于 2020-05-18
0 人赞同

使用Kotlin Flows的一个非常务实的方法可以是。

// Create the timer flow
val timer = (0..Int.MAX_VALUE)
    .asSequence()
    .asFlow()
    .onEach { delay(1_000) } // specify delay
// Consume it
timer.collect { 
    println("bling: ${it}")
    
How to be notified when ends?
请确保使用以下语句导入流程: import kotlinx.coroutines.flow.collect
为什么我们在这里使用asSequence()函数?
@Hassa是被懒惰地创建的Ints的序列。否则,从0 ... Int.MAX_VALUE的所有Ints将被立即加载到内存中,这可能是你不想要的。
Muhammad Naveed
@SteffenFunke 你能不能详细说明一下,我们怎样才能重置定时器的流程?
Raphael C
Raphael C
发布于 2020-05-18
0 人赞同

另一个可能的解决方案是作为 CoroutineScope 的可重复使用的kotlin扩展。

fun CoroutineScope.launchPeriodicAsync(
    repeatMillis: Long,
    action: () -> Unit
) = this.async {
    if (repeatMillis > 0) {
        while (isActive) {
            action()
            delay(repeatMillis)
    } else {
        action()

然后用法为。

var job = CoroutineScope(Dispatchers.IO).launchPeriodicAsync(100) {
  //...

然后打断它。

job.cancel()

另注:我们在这里认为action是非阻塞的,不需要时间。

由于 delay() 的调用,它在这里并不重要,但在一般情况下,我们应该避免 while (true) 在coroutines中出现,而应该选择 while(isActive) 来正确支持取消。
@Joffrey 这只是一个例子,请随意修改,以便更好地发挥作用。
使用 async() 而不是 launch() 的原因是什么?
@Phileo99 I think you could do it either way, but if you use Async it returns a Deferred<T> which gives you a few more options than a launch {}, such as await(). Not sure that'd be all that useful in this case, but I don't think it adds much overhead. Deferred extends Job, so anything that launch can do async can also do.
请记住,随后的 action() 调用之间的间隔不是定义的 repeatMillis 的时间,而是 repeatMillis 的时间。+ action() 的执行时间。因此,只要 action() 不需要太长的时间,这个解决方案就没有问题。通过使用带有 buffer() conflate() flowOn 的流量,我们可以得到大约恒定的间隔。
murgupluoglu
murgupluoglu
发布于 2020-05-18
0 人赞同

你可以像这样创建一个倒数计时器

GlobalScope.launch(Dispatchers.Main) {
            val totalSeconds = TimeUnit.MINUTES.toSeconds(2)
            val tickSeconds = 1
            for (second in totalSeconds downTo tickSeconds) {
                val time = String.format("%02d:%02d",
                    TimeUnit.SECONDS.toMinutes(second),
                    second - TimeUnit.MINUTES.toSeconds(TimeUnit.SECONDS.toMinutes(second))
                timerTextView?.text = time
                delay(1000)
            timerTextView?.text = "Done!"
    
使用 lifecycleScope 代替,以避免泄露片段或活动。
好的解决方案,但我不同意GlobalScope,viewModelScope或lifecycleScope更为可取e
Lukas Lechner
我只想说,这个解决方案并不是100%准确的。倒计时会比120秒长一点,因为日期格式化和设置 TextView 上的文字也会花一些时间。我想在大多数情况下,这不会是一个问题,否则你应该坚持使用 flow{} 的解决方案(结合 buffer() conflate() flowOn )。
Dario Pellegrini
Dario Pellegrini
发布于 2020-05-18
0 人赞同

下面是一个使用Kotlin Flow的可能解决方案

fun tickFlow(millis: Long) = callbackFlow<Int> {
    val timer = Timer()
    var time = 0
    timer.scheduleAtFixedRate(
        object : TimerTask() {
            override fun run() {
                try { offer(time) } catch (e: Exception) {}
                time += 1
        millis)
    awaitClose {
        timer.cancel()
val job = CoroutineScope(Dispatchers.Main).launch {
   tickFlow(125L).collect {
      print(it)
job.cancel()
    
你在用冠词来包装定时器,为什么?这根本就没有意义;要么使用定时器,要么使用轮子。
例如,它在一个视图模型中可能很有用,它的作用范围是CoroutineScope(Dispatchers.Main + viewModelJob)。如果你需要定期进行网络检查,你可以使用该范围和所有其他的程序(如网络请求或数据库查询)启动tick coroutine,然后一次性取消viewModelJob。顺便说一下,如果它对你来说没有用,那就没问题,这很公平。
为了明确起见,取消coroutine不会对Timer做任何事情,你必须使你的流程 cancellable() 。然而,即使你使你的流程 cancellable() ,取消你的流程和工作也不会停止定时器的 "滴答"。除此之外,Timer已经在使用另一个线程了,我真的不明白为什么要用flow来包装它。
我确认,在上述代码中,tick在job.cancel()中停止。我刚刚在Fragment中的一个真实案例应用中使用了它。
Benjamin Ledet
Benjamin Ledet
发布于 2020-05-18
0 人赞同

Edit: Joffrey 已经用一个更好的方法编辑了他的解决方案。

Old :

Joffrey 的解决方案对我有用,但我遇到了for循环的问题。

我必须像这样在for循环中取消我的ticker。

            val ticker = ticker(500, 0)
            for (event in ticker) {
                if (...) {
                    ticker.cancel()
                } else {

但是ticker.cancel()却抛出了一个cancellationException,因为在这之后for循环一直在进行。

我不得不使用一个while循环来检查通道是否没有关闭,以避免得到这个异常。

                val ticker = ticker(500, 0)
                while (!ticker.isClosedForReceive && ticker.iterator().hasNext()) {
                    if (...) {
                        ticker.cancel()
                    } else {
    
如果你知道你想让它停止,为什么不直接 break 出循环?然后你就可以在循环外取消ticker,这对我来说很有效。另外,你在每次循环中都要创建一个新的迭代器,这可能不是你想做的。
有时我们没有想到最简单的解决办法......你说得很对,谢谢!
没问题 :)也就是说,我没有想到 cancel() 在从循环中调用时会失败,所以你在这个问题上教会了我一些东西。我需要进一步调查以弄清这个问题的真相。
在coroutines的1.2.2版本中,它没有失败!但我升级到1.3.2版本,现在它失败了。但我升级到了1.3.2版本,现在它又出现了。也许在1.2.2版本中它应该失败,但他们修复了它,或者它是一个引入的错误...
Mattia Ferigutti
Mattia Ferigutti
发布于 2020-05-18
0 人赞同

具有启动、暂停和停止功能的定时器。

使用方法。

val timer = Timer(millisInFuture = 10_000L, runAtStart = false)
timer.start()

Timer class:

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.asStateFlow
enum class PlayerMode {
    PLAYING,
    PAUSED,
    STOPPED
class Timer(
    val millisInFuture: Long,
    val countDownInterval: Long = 1000L,
    runAtStart: Boolean = false,
    val onFinish: (() -> Unit)? = null,
    val onTick: ((Long) -> Unit)? = null
    private var job: Job = Job()
    private val _tick = MutableStateFlow(0L)
    val tick = _tick.asStateFlow()
    private val _playerMode = MutableStateFlow(PlayerMode.STOPPED)
    val playerMode = _playerMode.asStateFlow()
    private val scope = CoroutineScope(Dispatchers.Default)
    init {
        if (runAtStart) start()
    fun start() {
        if (_tick.value == 0L) _tick.value = millisInFuture
        job.cancel()
        job = scope.launch(Dispatchers.IO) {
            _playerMode.value = PlayerMode.PLAYING
            while (isActive) {
                if (_tick.value <= 0) {
                    job.cancel()
                    onFinish?.invoke()
                    _playerMode.value = PlayerMode.STOPPED
                    return@launch
                delay(timeMillis = countDownInterval)
                _tick.value -= countDownInterval
                onTick?.invoke(this@Timer._tick.value)
    fun pause() {
        job.cancel()
        _playerMode.value = PlayerMode.PAUSED
    fun stop() {
        job.cancel()
        _tick.value = 0
        _playerMode.value = PlayerMode.STOPPED

我从以下方面获得了灵感here.

为什么改用dispatcher io?
Jemshit Iskenderov
Jemshit Iskenderov
发布于 2020-05-18
0 人赞同

以下是根据乔佛里的回答而编写的 Flow 版本的 Observable.intervalRange(1, 5, 0, 1, TimeUnit.SECONDS)

fun tickerFlow(start: Long,
               count: Long,
               initialDelayMs: Long,
               periodMs: Long) = flow<Long> {
    delay(initialDelayMs)
    var counter = start
    while (counter <= count) {
        emit(counter)
        counter += 1
        delay(periodMs)
//...
tickerFlow(1, 5, 0, 1_000L)
    
0 人赞同

制作了 Observable.intervalRange(0, 90, 0, 1, TimeUnit.SECONDS) 的副本(将在90秒内发出项目,每1秒)。

fun intervalRange(start: Long, count: Long, initialDelay: Long = 0, period: Long, unit: TimeUnit): Flow<Long> {
        return flow<Long> {
            require(count >= 0) { "count >= 0 required but it was $count" }
            require(initialDelay >= 0) { "initialDelay >= 0 required but it was $initialDelay" }
            require(period > 0) { "period > 0 required but it was $period" }
            val end = start + (count - 1)
            require(!(start > 0 && end < 0)) { "Overflow! start + count is bigger than Long.MAX_VALUE" }
            if (initialDelay > 0) {
                delay(unit.toMillis(initialDelay))
            var counter = start
            while (counter <= count) {
                emit(counter)
                counter += 1
                delay(unit.toMillis(period))

使用方法。

lifecycleScope.launch {
intervalRange(0, 90, 0, 1, TimeUnit.SECONDS)
                .onEach {
                    Log.d(TAG, "intervalRange: ${90 - it}")
                .lastOrNull()
    
Hoàng Anh Chung
Hoàng Anh Chung
发布于 2020-05-18
0 人赞同

在此输入图片描述

enter code here
private val updateLiveShowTicker = flow {
    while (true) {
        emit(Unit)
        delay(1000L * UPDATE_PROGRAM_INFO_INTERVAL_SECONDS)
private val updateShowProgressTicker = flow {
    while (true) {
        emit(Unit)
        delay(1000L * UPDATE_SHOW_PROGRESS_INTERVAL_SECONDS)
private val liveShow = updateLiveShowTicker
    .combine(channelId) { _, channelId -> programInfoRepository.getShow(channelId) }
    .catch { emit(LiveShow(application.getString(R.string.activity_channel_detail_info_error))) }
    .shareIn(viewModelScope, SharingStarted.WhileSubscribed(), replay = 1)
    .distinctUntilChanged()

我的解决方案,你现在可以使用Flow API来创建你自己的ticker flow。

John Michael Pirie
John Michael Pirie
发布于 2020-05-18
0 人赞同

最近用于根据计时器和最大缓冲区大小对数值进行分块。

private object Tick
@Suppress("UNCHECKED_CAST")
fun <T : Any> Flow<T>.chunked(size: Int, initialDelay: Long, delay: Long): Flow<List<T>> = flow {
    if (size <= 0) throw IllegalArgumentException("invalid chunk size $size - expected > 0")
    val chunkedList = mutableListOf<T>()
    if (delay > 0L) {
        merge(this@chunked, timerFlow(initialDelay, delay, Tick))
    } else {
        this@chunked
        .collect {
            when (it) {
                is Tick -> {
                    if (chunkedList.isNotEmpty()) {
                        emit(chunkedList.toList())
                        chunkedList.clear()
                else -> {
                    chunkedList.add(it as T)
                    if (chunkedList.size >= size) {
                        emit(chunkedList.toList())
                        chunkedList.clear()
    if (chunkedList.isNotEmpty()) {
        emit(chunkedList.toList())
fun <T> timerFlow(initialDelay: Long, delay: Long, o: T) = flow {
    if (delay <= 0) throw IllegalArgumentException("invalid delay $delay - expected > 0")
    if (initialDelay > 0) delay(initialDelay)
    while (currentCoroutineContext().isActive) {
        emit(o)
        delay(delay)
    
Bassam Helal
Bassam Helal
发布于 2020-05-18
0 人赞同

它没有使用Kotlin coroutines,但如果你的用例足够简单,你总是可以直接使用像 fixedRateTimer timer ( docs here ),它们被解析为JVM本地的 Timer .

我在一个相对简单的场景中使用了RxJava的 interval ,当我转而使用Timer时,我发现 显著的 性能和内存的改进。

你也可以通过使用 View.post() 或它的多个变体,在Android上的主线程上运行你的代码。

唯一真正的麻烦是你需要自己跟踪旧时间的状态,而不是依靠RxJava为你做这件事。

但这总是要快得多(如果你正在做性能关键的事情,如UI动画等,这很重要),而且不会有RxJava的Flowables的内存开销。

下面是该问题的代码,使用的是 fixedRateTimer

var currentTime: LocalDateTime = LocalDateTime.now() fixedRateTimer(period = 5000L) { val newTime = LocalDateTime.now() if (currentTime.minute != newTime.minute) { post { // post the below code to the UI thread to update UI stuff setDateTime(newTime)