在应用开发中我们经常有一些功能场景需要在指定时间(时机)去触发,例如在启动流程时对缓存信息的更新,以及触发某些优先级低的行为逻辑,又或者空闲时上传图片不影响占用过多的用户行为。 在Jetpack还没有提供WorkMananger之前,我们经常采取 长生命周期的线程池 AlarmManager + BroadcastReceiver JobScheduler 等待去处理异步任务,往往有时这些方法在某种场景下具有稳定性问题导致不可靠,并且需要我们去处理事件调度比如优先级等等,因此WorkManager的出现解决了这些问题,不过也带来了一些问题,让我们来探索一下。

取消api

目前根据源码以及文档发现,对于取消任务提供了四个比较明显方法,举个其中某一个方法尝试取消任务。

WorkManager创建任务(自行封装)

WorkManager取消任务

通过注释以及日常使用实践来看,发现取消方法其实是一种可能取消不了的方法,可能存在申请取消但实际上内部耗时任务还是在执行的情况。

例举其中一个方法我们看下实现逻辑。

@Override
public @NonNull Operation cancelWorkById(@NonNull UUID id) {
    CancelWorkRunnable runnable = CancelWorkRunnable.forId(id, this);
    mWorkTaskExecutor.executeOnBackgroundThread(runnable);
    return runnable.getOperation();

从代码1中我们可以看到根据UUID从CancelWorkRunnable找到对应的task的Runnable,代码2处我们可以看到直接从线程池中执行这个Runnable。

public static CancelWorkRunnable forId(
        @NonNull final UUID id,
        @NonNull final WorkManagerImpl workManagerImpl) {
    return new CancelWorkRunnable() {
        @WorkerThread
        @Override
        void runInternal() {
            WorkDatabase workDatabase = workManagerImpl.getWorkDatabase();
            workDatabase.beginTransaction();
            try {
                cancel(workManagerImpl, id.toString());
                workDatabase.setTransactionSuccessful();
            } finally {
                workDatabase.endTransaction();
            reschedulePendingWorkers(workManagerImpl);

从代码1中看到task的状态信息是保存在数据库中的,因此获取了Room数据库对象,并开启了事务。

从代码2中看到调用了cancel方法,具体内部是如何实现的我们探究一下。

void cancel(WorkManagerImpl workManagerImpl, String workSpecId) {
    iterativelyCancelWorkAndDependents(workManagerImpl.getWorkDatabase(), workSpecId);
    Processor processor = workManagerImpl.getProcessor();
    processor.stopAndCancelWork(workSpecId);
    for (Scheduler scheduler : workManagerImpl.getSchedulers()) {
        scheduler.cancel(workSpecId);

代码1从方法名称可知目的是迭代的取消工作与相关依赖于当前task的其他tasks.

private void iterativelyCancelWorkAndDependents(WorkDatabase workDatabase, String workSpecId) {
    WorkSpecDao workSpecDao = workDatabase.workSpecDao();
    DependencyDao dependencyDao = workDatabase.dependencyDao();
    LinkedList<String> idsToProcess = new LinkedList<>();
    idsToProcess.add(workSpecId);
    while (!idsToProcess.isEmpty()) {
        String id = idsToProcess.remove();
        // Don't fail already cancelled work.
        WorkInfo.State state = workSpecDao.getState(id);
        if (state != SUCCEEDED && state != FAILED) {
            workSpecDao.setState(CANCELLED, id);
        idsToProcess.addAll(dependencyDao.getDependentWorkIds(id));

iterativelyCancelWorkAndDependents()方法代码1处对根据UUID从数据库查找的task 以及 依赖tasks进行标记取消,workSpecDao.setState(CANCELLED, id), 并存入数据库中。

我们可以看到依赖表,表是记录的依赖于当前task的所有子tasks。

public interface DependencyDao { * Attempts to insert a {@link Dependency} into the database. * @param dependency The {@link Dependency}s to insert @Insert(onConflict = IGNORE) void insertDependency(Dependency dependency); * Determines if a {@link WorkSpec} has completed all prerequisites. * @param id The identifier for the {@link WorkSpec} * @return {@code true} if the {@link WorkSpec} has no pending prerequisites. @Query("SELECT COUNT(*)=0 FROM dependency WHERE work_spec_id=:id AND prerequisite_id IN " + "(SELECT id FROM workspec WHERE state!=" + WorkTypeConverters.StateIds.SUCCEEDED + ")") boolean hasCompletedAllPrerequisites(String id); * Gets all the direct prerequisites for a particular {@link WorkSpec}. * @param id The {@link WorkSpec} identifier * @return A list of all prerequisites for {@code id} @Query("SELECT prerequisite_id FROM dependency WHERE work_spec_id=:id") List<String> getPrerequisites(String id); * Gets all {@link WorkSpec} id's dependent on a given id * @param id A {@link WorkSpec} identifier * @return A list of all identifiers that depend on the input @Query("SELECT work_spec_id FROM dependency WHERE prerequisite_id=:id") List<String> getDependentWorkIds(String id); * Determines if a {@link WorkSpec} has any dependents. * @param id A {@link WorkSpec} identifier * @return {@code true} if the {@link WorkSpec} has WorkSpecs that depend on it @Query("SELECT COUNT(*)>0 FROM dependency WHERE prerequisite_id=:id") boolean hasDependents(String id);

此时我们回过头再看下cancel方法,代码2处从WorkManagerImpl中获取了processor对象, 并调用了stopAndCancelWork()方法。

public boolean stopAndCancelWork(@NonNull String id) {
    synchronized (mLock) {
        Logger.get().debug(TAG, String.format("Processor cancelling %s", id));
        mCancelledIds.add(id);
        WorkerWrapper wrapper;
        // Check if running in the context of a foreground service
        wrapper = mForegroundWorkMap.remove(id);
        boolean isForegroundWork = wrapper != null;
        if (wrapper == null) {
            // Fallback to enqueued Work
            wrapper = mEnqueuedWorkMap.remove(id);
        boolean interrupted = interrupt(id, wrapper);
        if (isForegroundWork) {
            stopForegroundService();
        return interrupted;

代码1处从前台工作队列中根据UUID移除,然后获取到移除的WorkerWrapper(本质上就是task实现了Runnable接口),如果wrapper不为null,则认为是前台工作,后续会停止前台服务。如果不是前台wrapper,就会从mEnqueuedWorkMap队列中根据UUID寻找task并移除,最终会调用wrapper自身的interrupt中断方法。

@RestrictTo(RestrictTo.Scope.LIBRARY_GROUP)
public void interrupt() {
    mInterrupted = true;
    tryCheckForInterruptionAndResolve();
    boolean isDone = false;
    if (mInnerFuture != null) {
        // Propagate the cancellations to the inner future.
        isDone = mInnerFuture.isDone();
        mInnerFuture.cancel(true);
    // Worker can be null if run() hasn't been called yet
    if (mWorker != null && !isDone) {
        mWorker.stop();
    } else {
        String message =
                String.format("WorkSpec %s is already done. Not interrupting.", mWorkSpec);
        Logger.get().debug(TAG, message);

code1处是对worker的取消逻辑。

private boolean tryCheckForInterruptionAndResolve() {
    if (mInterrupted) {
        WorkInfo.State currentState = mWorkSpecDao.getState(mWorkSpecId);
        if (currentState == null) {
            resolve(false);
        } else {
            resolve(!currentState.isFinished());
        return true;
    return false;

根据当前worker的状态来进行的不同处理。若当前的状态为null,说明当用户再次调用beginUniqueWork方法时并且设置的是 Replace 的模式,则对woker状态进行清理,所以才会有null这种情况。

private void resolve(final boolean needsReschedule) {
    mWorkDatabase.beginTransaction();
    try {
        boolean hasUnfinishedWork = mWorkDatabase.workSpecDao().hasUnfinishedWork();
        if (!hasUnfinishedWork) {
            PackageManagerHelper.setComponentEnabled(
                    mAppContext, RescheduleReceiver.class, false);
        if (needsReschedule) {
            mWorkSpecDao.setState(ENQUEUED, mWorkSpecId);
            mWorkSpecDao.markWorkSpecScheduled(mWorkSpecId, SCHEDULE_NOT_REQUESTED_YET);
        if (mWorkSpec != null && mWorker != null && mWorker.isRunInForeground()) {
            mForegroundProcessor.stopForeground(mWorkSpecId);
        mWorkDatabase.setTransactionSuccessful();
    } finally {
        mWorkDatabase.endTransaction();
    mFuture.set(needsReschedule);

code1当没有未完成的工作的时候,会将RescheduleReceiver设置为false, 这个RescheduleReceiver是一个广播,用于由于还有队列没有处理完成,唤起workmananger的作用,或者对一些场景做告警,这个广播的逻辑我们以后有时间再详勘。code2处就比较好理解了,由于我们要直接取消WorkMananger的执行,势必有些工作是需要再次排期执行的,只是说在当前过程中我们需要中止WorkManager,因此将对数据库对worker的状态重新设置为ENQUEUED,并对执行的时刻表设置为SCHEDULE_NOT_REQUESTED_YET(-1).

此时我们回到interrupt()方法,code2处这里的mInnerFuture是用来监听work的异步任务结果的,这里多说一嘴,我们知道创建线程的几种方式,其中的一种方式是通过 Future 接口来调用线程池的execute(future: Future)方法来获取线程任务的结果的,也就是说FutureTask + Callable的方式。但是Future接口并未提供当线程执行完毕自动回调,所以Google针对这种需求在原有的基础之上又自行封装了一份ListenableFuture,在code3处调用了cancel(true)方法是为了切断ListenableFuture中的任务执行。

code4处ListenableWorker这里他的实现类是CoroutineWorker,在调用stop方法时实际是调用了onStopped()方法,回到CorotineWorker#onStop()方法,他内部依然存在一个ListenableFuture去监听doWork()最终的状态,用于对协程调度的取消操作。

internal val job = Job()
internal val future: SettableFuture<Result> = SettableFuture.create()
init {
    future.addListener(
        Runnable {
            if (future.isCancelled) {
                job.cancel()
        taskExecutor.backgroundExecutor
 * The coroutine context on which [doWork] will run. By default, this is [Dispatchers.Default].
@Deprecated(message = "use withContext(...) inside doWork() instead.")
public open val coroutineContext: CoroutineDispatcher = Dispatchers.Default
@Suppress("DEPRECATION")
public final override fun startWork(): ListenableFuture<Result> {
    val coroutineScope = CoroutineScope(coroutineContext + job)
    coroutineScope.launch {
        try {
            val result = doWork()
            future.set(result)
        } catch (t: Throwable) {
            future.setException(t)
    return future
public final override fun onStopped() {
    super.onStopped()
    future.cancel(false)

他的实现类是AbstractFuture.

所以对于为什么我们去调用workManager自带的取消api只是尽力去取消,是因为Future的cancel(true)方法只是一个尽力而为的行为,并不能确定一定会将异步任务完全的取消执行。

自己思考了一下,其实WorkerManager本身定位就是轻量级异步任务,我们其实不用过多的去考虑完全取消的问题,但我为了这个问题并简化模板代码自己封装了一下,大家可以参考一下,由于使用定位问题,没有应用到项目中,不过我觉得是一个思路。

Step1
@Singleton
class CityInfoCacheTask @Inject constructor(private val cityRepository: CityRepository) : AsyncTask {
    override suspend fun call(): ListenableWorker.Result {
        return cityRepository.refreshSelectedCityInfo().toWorkerResult()
    companion object {
        const val TAG: String = "CityInfoCacheTask"
Step2
taskHelper.apply {
    addTask(TaskEntity(task = cacheTask, retryTimes = 1, tag = CityInfoCacheTask.TAG))
    addTask(TaskEntity(task = cacheTask, retryTimes = 1, tag = CityInfoCacheTask.TAG))
    addTask(TaskEntity(task = cacheTask, retryTimes = 1, tag = CityInfoCacheTask.TAG))
    addTask(TaskEntity(task = cacheTask, retryTimes = 1, tag = CityInfoCacheTask.TAG))
    addTask(TaskEntity(task = cacheTask, retryTimes = 1, tag = CityInfoCacheTask.TAG))
    addTask(TaskEntity(task = cacheTask, retryTimes = 1, tag = CityInfoCacheTask.TAG))
    addTask(TaskEntity(task = cacheTask, retryTimes = 1, tag = CityInfoCacheTask.TAG))
    addTask(TaskEntity(task = cacheTask, retryTimes = 1, tag = CityInfoCacheTask.TAG))
    // ....
// start task
workerHelper.startOnce()
Cancel Work
taskHelper.cancelTaskByTag(CityInfoCacheTask.TAG)
// Now, I think we don't need support to canceledAllTaskByTag(), because use workermanager we should be canceled specific task. 
taskHelper.cancelAllTasksByTag()

就这么简单。

internal class SingleRunnableStrategy(private val taskQueue: TaskQueue, val coDispatcherProvider: CoDispatcherProvider)
    : BaseRunnableStrategy(taskQueue) {
    private var cancelLoopJob: Job? = null
    @VisibleForTesting
    internal var runningJobInfo = Pair<Job?, String?>(null, null)
    override suspend fun run() {
        supervisorScope {
            cancelLoopJob = async { cancelTaskLooper() }
                val loopTask = getTask()
                if (loopTask != null) {
                    val runningJob = async {
                        val taskResult = loopTask.task.call()
                        handleResult(loopTask, taskResult)
                    runningJobInfo = runningJob to loopTask.tag
                    runningJob.join()
            } while (loopTask != null)
            cancelLoopJob?.cancel()
    @VisibleForTesting
    internal suspend fun cancelTaskLooper() {
        while (true) {
            if (runningJobInfo.first == null || runningJobInfo.second == null){
                continue
            val runningJob = runningJobInfo.first!!
            val runningTag = runningJobInfo.second!!
            if (taskQueue.isCanceledTask(runningTag) && runningJob.isActive) {
                taskQueue.removeCanceledTaskByTag(runningTag)
                runningJob.cancel()
internal class ParallelRunnableStrategy(val taskQueue: TaskQueue, val coDispatcherProvider: CoDispatcherProvider) :
    BaseRunnableStrategy(taskQueue) {
    @VisibleForTesting
    var isFinished = false
    @VisibleForTesting
    var parallelRunningTasks = ConcurrentHashMap<String?, Job>()
    @VisibleForTesting
    var releaseChannel = Channel<TaskEntity>()
    @VisibleForTesting
    var blockingJob: Job? = null
    private var loopChannelJob: Job? = null
    private val lock = Mutex()
    // available cpu core + 1 = parallel threshold
    private val taskThreshold = Runtime.getRuntime().availableProcessors() + 1
    override suspend fun run() {
        prepareRun()
        supervisorScope {
            loopChannelJob = async {
                loopChannel()
                val loopTask = getTask()
                loopTask?.let { currentTask ->
                    lock.withLock {
                        parallelRunningTasks[currentTask.tag] = async {
                            runTask(currentTask)
                if (taskThreshold <= parallelRunningTasks.size) {
                    blockingJob = async { delay(Long.MAX_VALUE) }
                    blockingJob?.join()
            } while (loopTask != null)
            release()
    private suspend fun runTask(currentTask: TaskEntity) {
        val taskResult = currentTask.task.call()
        handleResult(currentTask, taskResult)
        // task finished send a message, to check blocking and whether need to await.
        releaseChannel.send(currentTask)
    @VisibleForTesting
    internal suspend fun loopChannel() {
        while (true) {
            if (isFinished) {
                break
            verifyCancelTask()
            // check to release block if parallel number < DEFAULT_PARALLEL_NUMBER
            releaseChannel.tryReceive().getOrNull()?.let {
                lock.withLock {
                    parallelRunningTasks.remove(it.tag)
                    if (parallelRunningTasks.size < taskThreshold && blockingJob?.isActive == true) {
                        blockingJob?.cancel()
    @VisibleForTesting
    internal fun verifyCancelTask() {
        // check cancel.
        val taskIterator = parallelRunningTasks.iterator()
        while (taskIterator.hasNext()) {
            val runningTask = taskIterator.next()
            runningTask.key?.let {
                if (taskQueue.isCanceledTask(it) && runningTask.value.isActive) {
                    runningTask.value.cancel()
                    taskQueue.removeCanceledTaskByTag(it)
                    taskIterator.remove()
    private fun prepareRun() {
        parallelRunningTasks.filter { it.value.isActive }.map { it.value.cancel() }
        parallelRunningTasks.clear()
        loopChannelJob?.cancel()
    private fun release() {
        isFinished = true
        releaseChannel.close()
        loopChannelJob?.cancel()

核心的取消流程是创建了一个loop,不停地去判断当前任务的运行态,若运行态是取消状态,则去cancel当前执行任务的job。 这样的话其实 WorkManager只开辟一次,内部细分成多个子task,这样我们的执行与取消逻辑由自己去控制。

分类:
Android
  •