1.APScheduler简介
APScheduler是一个Python库,可让您安排稍后执行的Python代码,是一套任务调度框架,可以用来做定时任务控制器,可以添加删除任务。如果将作业存储在数据库中,它们也将在调度程序重新启动后继续运行并保持其状态。重新启动调度程序后,它将运行它应该在脱机时运行的所有作业。
😁我这里先给出一个写调度api的模版吧
步骤一:写好自己运行的函数 def fuction()
步骤二:创建flask路由,在路由下定义一个需要调度的函数def job_function():,把之前的def fuction()函数放到里面
步骤三:创建调度任务函数,def task(),这里需要配置任务调度的方式等参数
步骤四:运行任务 task(),主要写在主函数外面
步骤五:主函数运行api接口,app.run()
from flask import Flask
from flask_apscheduler import APScheduler
import datetime
def fuction()
print("showmaker")
app = Flask(__name__)
@app.route("/test", methods=['GET','POST'])
def job_function():
fuction()
def task():
scheduler = APScheduler()
scheduler.init_app(app)
# 定时任务的格式
scheduler.add_job(func=job_function, trigger='interval',seconds=3600, id='my_job_id')
scheduler.start()
# 写在main里面,IIS不会运行
task()
if __name__ == "__main__":
app.run(debug=False, port="5005")
2.APScheduler调度方式
APScheduler有三种可以调度任务的方式:
我们的目的就是通过api接口,在终端一直运行这个接口进程,让它自己去调度我们的程序,这里的程序也就是任务。
-
可以选择调度的任务,开始和结束的时间
-
基于一个时间区间,间隔的执行任务
-
一次性任务执行
3.APScheduler组件
知道了我们的任务有哪些调度方式,现在了解一下APScheduler的组件:
-
triggers: 任务触发器组件,提供任务触发方式
-
job stores: 任务商店组件,提供任务保存方式
-
executors:任务调度组件,提供任务调度方式
-
schedulers: 任务调度组件,提供任务工作方式
-
3.1 triggers
triggers: 支持三种任务触发方式
date:
固定日期触发器,任务只运行一次,运行完毕自动清除;若错过指定运行时间,任务不会被创建
参数
|
说明
|
run_date
|
任务运行日期
|
timezone
|
时区
|
scheduler.add_job(func=job_function, trigger='date',run_date='2021-10-27 13:00:00', id='my_job_id')
interval:
时间间隔触发器,每个一定时间间隔执行一次。
参数
|
说明
|
weeks (int)
|
间隔几周
|
days (int)
|
间隔几天
|
hours (int)
|
间隔几小时
|
minutes (int)
|
间隔几分钟
|
seconds (int)
|
间隔多少秒
|
start_date (datetime 或 str)
|
开始日期
|
end_date (datetime 或 str)
|
结束日期
|
scheduler.add_job(func=job_function, trigger='interval',seconds=3600, id='my_job_id')
cron
cron风格的任务触发,这个不常用就不介绍了,我懒
memory:默认配置任务存在内存中
mongdb:支持文档数据库存储
sqlalchemy:支持关系数据库存储
redis:支持键值对数据库存储
这里如果你的数据库是mysql,我推荐使用sqlalchemy作为数据库,用过create_engine来连接数据库实现数据库操作,比如说删除数据记录,写入数据记录。这里是我自己的例子
from flask import Flask
from flask_apscheduler import APScheduler
import datetime
import pandas as pd
from sqlalchemy import create_engine
from sqlalchemy import MetaData,Table
def delete():
engine = create_engine('mysql+pymysql://用户名:密码@ip:端口号/数据库?charset=gbk')
meta = MetaData(bind=engine)
tb_1 = Table('数据表名', meta, autoload=True, autoload_with=engine)
tb_2 = Table('数据表名', meta, autoload=True, autoload_with=engine)
dlt_1=tb_1.delete()
dlt_2=tb_2.delete()
conn = engine.connect()
# 执行delete操作
conn.execute(dlt_1)
conn.execute(dlt_2)
conn.close()
def write(t):
r1=pd.DataFrame()
r3=pd.DataFrame()
engine = create_engine('mysql+pymysql://用户名:密码@ip:端口号/数据库?charset=gbk')
r1.to_sql('数据表名', con=engine, if_exists='append', index=False)
r3.to_sql('数据表名', con=engine, if_exists='append', index=False)
def write_log(buf):
print(buf)
with open('test.txt', 'a') as f:
f.write(buf + "\n")
app = Flask(__name__)
@app.route("/test", methods=['GET','POST'])
def job_function():
now_time=datetime.datetime.now()
t=now_time.strftime('%Y-%m-%d')
write_log("update time:"+t)
delete()
write_log("delete data")
write(t)
write_log("write data")
def task():
scheduler = APScheduler()
scheduler.init_app(app)
# 定时任务,每隔10s执行1次
scheduler.add_job(func=job_function, trigger='interval',seconds=60, id='my_job_id')
scheduler.start()
# 写在main里面,IIS不会运行
task()
if __name__ == "__main__":
app.run(debug=False, port="8888")
最后可以在test.txt查看自己的调用的时间
调度器主要分三种,一种独立运行的,一种是后台运行的,最后一种是配合其它程序使用
BlockingScheduler: 当这个调度器是你应用中 唯一要运行 的东西时使用
BackgroundScheduler: 当不运行其它框架 的时候使用,并使你的任务在 后台运行
AsyncIOScheduler: 当你的程序是 异步IO模型 的时候使用
GeventScheduler: 和 gevent 框架配套使用
TornadoScheduler: 和 tornado 框架配套使用
TwistedScheduler: 和 Twisted 框架配套使用
QtScheduler: 开发 qt 应用的时候使用