备案 控制台
学习
实践
活动
专区
工具
TVP
写文章
专栏首页 月亮与二进制 Python设置定时任务
2 0

海报分享

Python设置定时任务

如果需要用Python实现定时任务,包括多长时间之后执行、每隔多久循环执行、每天的几点执行等,都算定时任务。实现定时任务的方法很多,python自身的库也有多种方式可以实现。其中最简单好用的一个库我感觉是threading库中的Timer。

比如每隔5秒执行一次:

import threading
def do_job():
    print('Just do it!')
    global timer
    timer = threading.Timer(5, do_job)
    timer.start()
timer = threading.Timer(1, do_job)
timer.start()

上面的代码中要循环执行的任务就是do_job函数,利用threading.Timer就可以循环执行,其中第一个参数是等待的时间,单位为秒s,可以是浮点数。第二个参数是要执行的函数。为了实现循环执行,只需要在每次执行任务时继续递归调用就可以了,但是timer记得设为global,节省内存。

但是如果要设置每天定时执行,那可能算时间算起来就比较麻烦,为了使用方便这里也提供一个简单的写好的函数:

import threading
def do_job():
    print('Just do it!')
    global timer
    timer = threading.Timer(86400, do_job) # 86400秒就是一天
    timer.start()
# 计算当前时间到明日某时间的秒数差
def get_interval_secs():
    tomorrow = (datetime.date.today() + datetime.timedelta(days=1)).strftime('%Y%m%d')
    tomorrow_time = tomorrow + "-09:00:00"
    tomorrow_time_date = datetime.datetime.strptime(tomorrow_time, '%Y%m%d-%H:%M:%S')
    now = datetime.datetime.now()
    interval = tomorrow_time_date - now
    secs = interval.total_seconds()
    return secs
# 测试计算到今日某时的时间差
def get_interval_secs_test():
    today = (datetime.date.today()).strftime('%Y%m%d')
    today_time = today + "-16:16:00"
    today_time_date = datetime.datetime.strptime(today_time, '%Y%m%d-%H:%M:%S')
    now = datetime.datetime.now()
    print(now)
    interval = today_time_date - now
    secs = interval.total_seconds()
    return secs
timer = threading.Timer(get_interval_secs(), do_report)
timer.start()

上面的代码中,首先把任务中的调用改成定时 成一天后再执行。然后在第一次调用时,我们利用get_interval_secs函数来计算当前时间点到明日某个要求的时间点的秒数差,作为第一次Timer的参数。在自己测试的过程中,可以使用第二个get_interval_secs_test函数来计算当前时间点到今日某个时间点的描述差来执行。

进一步优化可以改为一个函数:

import threading
def do_job():
    print('Just do it!')
    global timer
    timer = threading.Timer(86400, do_job) # 86400秒就是一天
    timer.start()
# 计算当前时间到指定时间点的描述差
def get_interval_secs(target_time):
    today = (datetime.date.today()).strftime('%Y%m%d')
    today_time = today + "-" + target_time
    today_time_date = datetime.datetime.strptime(today_time, '%Y%m%d-%H:%M:%S')
    now = datetime.datetime.now()
    interval = today_time_date - now
    secs = interval.total_seconds()
    if (secs > 0): return secs
    else: return secs + 86400
timer = threading.Timer(get_interval_secs("21:00:00"), do_report)
timer.start()

这个函数直接判断目标时间点是否还能在今天执行(与目前时间点算差值看是正是负),如果能今天执行(时间点还未到来),则到时候执行,如果来不及了,那就明天那时候再执行。


另外也可以用一个很方便且轻量的第三方库: schedule

按照其github( https://github.com/dbader/schedule )中的例子,使用起来很简单:

import schedule
import time
def job():
    print("I'm working...")
schedule.every(10).seconds.do(job) # 每10秒执行一次
schedule.every(10).minutes.do(job) # 每10分钟执行一次
schedule.every().hour.do(job) # 每小时执行一次
schedule.every().day.at("10:30").do(job) # 每天十点半执行
schedule.every(5).to(10).minutes.do(job) # 不理解
schedule.every().monday.do(job) # 每周一执行
schedule.every().wednesday.at("13:15").do(job) # 每周三13点15执行
schedule.every().minute.at(":17").do(job) # 不理解
while True:
    schedule.run_pending() # 运行所有可运行的任务
    time.sleep(1)

如果要执行带参数的任务,那只需要在函数名后跟参数即可,比如:

import schedule
import time
def job(job_name):
    print("I'm working on " + job_name)
schedule.every(10).seconds.do(job, job_name) # 每10秒执行一次
schedule.every(10).minutes.do(job, job_name) # 每10分钟执行一次
while True:
    schedule.run_pending() # 运行所有可运行的任务
    time.sleep(1)

这个库用起来实在太简单,以致于宁愿多折腾一点,比如如果你要在特殊环境里开发,不想每次都要安装这个库(比如在容器中),那也可以直接把这个库的代码包放到你的代码目录下,也就是github项目中的 schedule 文件夹,只需要这个就可以正常import调用了,无须安装。

参考文章: https://lz5z.com/Python%E5%AE%9A%E6%97%B6%E4%BB%BB%E5%8A%A1%E7%9A%84%E5%AE%9E%E7%8E%B0%E6%96%B9%E5%BC%8F/ https://blog.csdn.net/saltriver/article/details/52194915 https://blog.csdn.net/liao392781/article/details/80521194

本文参与 腾讯云自媒体分享计划 ,欢迎热爱写作的你一起参与!
本文分享自作者个人站点/博客: https://www.jianshu.com/u/9ec19ab8c802 复制
如有侵权,请联系 cloudcommunity@tencent.com 删除。