Collectives™ on Stack Overflow
Find centralized, trusted content and collaborate around the technologies you use most.
Learn more about Collectives
Teams
Q&A for work
Connect and share knowledge within a single location that is structured and easy to search.
Learn more about Teams
I have a simple BackgroundScheduler and a simple task. The BackgroundScheduler is configured to run only a single instance for that task:
from apscheduler.schedulers.background import BackgroundScheduler
scheduler.add_job(run_task, 'interval', seconds=10)
scheduler.start()
When a tasks starts, it takes much more than 10 seconds to complete and I get the warning:
Execution of job "run_tasks (trigger: interval[0:00:10], next run at: 2020-06-17 18:25:32 BST)" skipped: maximum number of running instances reached (1)
This works as expected.
My problem is that I can't find a way to check if an instance of that task is currently running.
In the docs, there are many ways to get all and individual scheduled tasks, but I can't find a way to check if a task is currently running or not.
I would ideally want something like:
def job_in_progress():
job = scheduler.get_job(id=job_id)
instances = job.get_instances()
return instances > 0
Any ideas?
Not great because you have to access a private attribute, but the only thing I could find.
def job_in_progress():
job = scheduler.get_job(id=job_id)
instances = scheduler._instances[job_id]
return instances > 0
If someone else has another idea, don't use this.
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.