publicinterfaceDependencyDao {
* Attempts to insert a {@link Dependency} into the database.
* @param dependency The {@link Dependency}s to insert
@Insert(onConflict = IGNORE)voidinsertDependency(Dependency dependency);
* Determines if a {@link WorkSpec} has completed all prerequisites.
* @param id The identifier for the {@link WorkSpec}
* @return {@code true} if the {@link WorkSpec} has no pending prerequisites.
@Query("SELECT COUNT(*)=0 FROM dependency WHERE work_spec_id=:id AND prerequisite_id IN "
+ "(SELECT id FROM workspec WHERE state!="
+ WorkTypeConverters.StateIds.SUCCEEDED + ")")booleanhasCompletedAllPrerequisites(String id);
* Gets all the direct prerequisites for a particular {@link WorkSpec}.
* @param id The {@link WorkSpec} identifier
* @return A list of all prerequisites for {@code id}
@Query("SELECT prerequisite_id FROM dependency WHERE work_spec_id=:id")
List<String> getPrerequisites(String id);
* Gets all {@link WorkSpec} id's dependent on a given id
* @param id A {@link WorkSpec} identifier
* @return A list of all identifiers that depend on the input
@Query("SELECT work_spec_id FROM dependency WHERE prerequisite_id=:id")
List<String> getDependentWorkIds(String id);
* Determines if a {@link WorkSpec} has any dependents.
* @param id A {@link WorkSpec} identifier
* @return {@code true} if the {@link WorkSpec} has WorkSpecs that depend on it
@Query("SELECT COUNT(*)>0 FROM dependency WHERE prerequisite_id=:id")booleanhasDependents(String id);
taskHelper.apply {
addTask(TaskEntity(task = cacheTask, retryTimes = 1, tag = CityInfoCacheTask.TAG))
addTask(TaskEntity(task = cacheTask, retryTimes = 1, tag = CityInfoCacheTask.TAG))
addTask(TaskEntity(task = cacheTask, retryTimes = 1, tag = CityInfoCacheTask.TAG))
addTask(TaskEntity(task = cacheTask, retryTimes = 1, tag = CityInfoCacheTask.TAG))
addTask(TaskEntity(task = cacheTask, retryTimes = 1, tag = CityInfoCacheTask.TAG))
addTask(TaskEntity(task = cacheTask, retryTimes = 1, tag = CityInfoCacheTask.TAG))
addTask(TaskEntity(task = cacheTask, retryTimes = 1, tag = CityInfoCacheTask.TAG))
addTask(TaskEntity(task = cacheTask, retryTimes = 1, tag = CityInfoCacheTask.TAG))
// ....// start task
workerHelper.startOnce()
Cancel Work
taskHelper.cancelTaskByTag(CityInfoCacheTask.TAG)
// Now, I think we don't need support to canceledAllTaskByTag(), because use workermanager we should be canceled specific task.
taskHelper.cancelAllTasksByTag()
就这么简单。
internalclassSingleRunnableStrategy(privateval taskQueue: TaskQueue, val coDispatcherProvider: CoDispatcherProvider)
: BaseRunnableStrategy(taskQueue) {
privatevar cancelLoopJob: Job? = null@VisibleForTestinginternalvar runningJobInfo = Pair<Job?, String?>(null, null)
overridesuspendfunrun() {
supervisorScope {
cancelLoopJob = async { cancelTaskLooper() }
val loopTask = getTask()
if (loopTask != null) {
val runningJob = async {
val taskResult = loopTask.task.call()
handleResult(loopTask, taskResult)
runningJobInfo = runningJob to loopTask.tag
runningJob.join()
} while (loopTask != null)
cancelLoopJob?.cancel()
@VisibleForTestinginternalsuspendfuncancelTaskLooper() {
while (true) {
if (runningJobInfo.first == null || runningJobInfo.second == null){
continueval runningJob = runningJobInfo.first!!
val runningTag = runningJobInfo.second!!
if (taskQueue.isCanceledTask(runningTag) && runningJob.isActive) {
taskQueue.removeCanceledTaskByTag(runningTag)
runningJob.cancel()
internalclassParallelRunnableStrategy(val taskQueue: TaskQueue, val coDispatcherProvider: CoDispatcherProvider) :
BaseRunnableStrategy(taskQueue) {
@VisibleForTestingvar isFinished = false@VisibleForTestingvar parallelRunningTasks = ConcurrentHashMap<String?, Job>()
@VisibleForTestingvar releaseChannel = Channel<TaskEntity>()
@VisibleForTestingvar blockingJob: Job? = nullprivatevar loopChannelJob: Job? = nullprivateval lock = Mutex()
// available cpu core + 1 = parallel thresholdprivateval taskThreshold = Runtime.getRuntime().availableProcessors() + 1overridesuspendfunrun() {
prepareRun()
supervisorScope {
loopChannelJob = async {
loopChannel()
val loopTask = getTask()
loopTask?.let { currentTask ->
lock.withLock {
parallelRunningTasks[currentTask.tag] = async {
runTask(currentTask)
if (taskThreshold <= parallelRunningTasks.size) {
blockingJob = async { delay(Long.MAX_VALUE) }
blockingJob?.join()
} while (loopTask != null)
release()
privatesuspendfunrunTask(currentTask: TaskEntity) {
val taskResult = currentTask.task.call()
handleResult(currentTask, taskResult)
// task finished send a message, to check blocking and whether need to await.
releaseChannel.send(currentTask)
@VisibleForTestinginternalsuspendfunloopChannel() {
while (true) {
if (isFinished) {
break
verifyCancelTask()
// check to release block if parallel number < DEFAULT_PARALLEL_NUMBER
releaseChannel.tryReceive().getOrNull()?.let {
lock.withLock {
parallelRunningTasks.remove(it.tag)
if (parallelRunningTasks.size < taskThreshold && blockingJob?.isActive == true) {
blockingJob?.cancel()
@VisibleForTestinginternalfunverifyCancelTask() {
// check cancel.val taskIterator = parallelRunningTasks.iterator()
while (taskIterator.hasNext()) {
val runningTask = taskIterator.next()
runningTask.key?.let {
if (taskQueue.isCanceledTask(it) && runningTask.value.isActive) {
runningTask.value.cancel()
taskQueue.removeCanceledTaskByTag(it)
taskIterator.remove()
privatefunprepareRun() {
parallelRunningTasks.filter { it.value.isActive }.map { it.value.cancel() }
parallelRunningTasks.clear()
loopChannelJob?.cancel()
privatefunrelease() {
isFinished = true
releaseChannel.close()
loopChannelJob?.cancel()