python实现定时任务
有时希望每隔一段时间就执行一段程序,或者循环往复执行一个任务,比如最近比较火的给女朋友每天发送关爱信息,以及实时爬取某些内容等等。这里主要学习下常见的使用python方式实现定时任务,包括while True:+sleep()方式、threading.Timer定时器、调度模块schedule和任务框架APScheduler这四种。
先定义一个要完成的任务task,方便后续调用参考:
import datetime,time
def task():
now=datetime.datetime.now()
result=now.strftime("%Y-%m-%d %H:%M:%S")
print(result)
1、while True:+sleep()
该方法缺点是只能实现同步任务,无法执行异步任务。
def loop():
while True:
task()
#每五秒检查一次
time.sleep(3)
#实际操作时为防止显示太多数据,可设定次数n来控制结果
2、threading.Timer定时器
threading.Timer是threading.Thread的派生类,是在指定时间n秒后执行一个函数功能。该方法的缺点是,运行次数过多会出现报错,虽然可以进行优化修改最大递归深度,但是运行达到最大CPU时python会直接销毁程序。
threading.Timer(interval,function,args=None,kwargs=None)
interval,秒为单位,指定该线程过多久启动
function,制定该线程调用什么函数,执行什么功能
args,适用于function的参数列表
kwargs,适用于function的参数字典
from threading import Timer
def timer():
task()
t=Timer(3,timer)
t.start()
3、schedule调度模块
schedule是一个第三方轻量级的任务调度模块,可以按照秒seconds、分minutes、小时hour、日期day/month/wednesday或自定义事件执行时间,如果想执行多个任务也可添加多个task。但是他依然需要配合while True来使用,且占用CPU也比较多。
import schedule
#清空任务
schedule.clear()
#创建一个按3秒间隔执行任务
schedule.every(3).seconds.do(task)
#创建一个按2秒间隔执行任务
schedule.every(2).seconds.do(task)
while True:
schedule.run_pending()
4、APScheduler任务框架
(1)APScheduler
APScheduler是python的一个定时任务框架,用于执行周期或定时任务,可进行添加、删除定时任务,还可将任务存储到数据库中,实现任务的持久化,用起来非常方便。
它提供了基于日期、固定时间间隔以及crontab类型的任务,可持久化任务,可以很方便实现python定时任务系统。APScheduler包含了四种组件:
一是触发器triggers ,包含了调度逻辑,每个作业都有自己的触发器,用于确定下一次作业运行时间,触发器包括cron定时调度(某一刻执行)、interval间隔调度(多久执行一次)和date定时调度(只执行一次)三种。
二是作业存储job stores ,包含了预定的作业,默认将作业存储是将作业保存到内存中,其他作业存储在各种数据库中;作业数据在保存到持久化存储时被序列化,并在从它加载回来时反序列化;作业存储不会将数据保存在内存中,而是充当后端保存、加载、更新和搜索作业的中间人。作业存储器选择有两种,第一种是默认配置即内存,适合在程序崩溃重启整个程序时作业被重新添加到调度其中的环境;第二种是数据库,配合多种不同作业存储后端一起使用,包括memory、sqlalchemy、MongoDB、Redis等,适用于程序崩溃重启程序时作业需要从中断恢复正常运行的环境。
三是执行器executors ,包含了作业运行,通过将作业中指定的可调度对象提高给线程或进程池来完成操作,作业完成后执行程序通知调度程序,调度程序发出适当的事件。执行器默认THreadPoolExecutor,也可用ProcessPoolExecutor。
四是调度器schedulers ,调度程序;作业存储和执行器的配置是通过调度程序完成的,添加删除修改作业也是;调度器包括BlockingScheduler(适用调度程序是进程唯一运行的程序,调用start函数会阻塞当前线程,不能立即返回)、BackgroundScheduler(适用调度程序在应用程序的后台运行,调用start后主线程不会阻塞)、AsyclOScheduler(适用使用asyncio模块的应用程序)、GevenScheduler(适用使用gevent模块的应用程序)、TwistScheduler(适用构建Twisted的应用程序)、QtScheduler(适用构建Qt的应用程序)
from apscheduler.schedulers.blocking import BlockingScheduler
def APSche():
#创建调度器
scheduler=BlockingScheduler()
#添加时间间隔为3秒的任务
scheduler.add_job(task,"interval",seconds=3,id="test1")
#添加时间间隔为5秒的任务
scheduler.add_job(task,"interval",seconds=5,id="test1")
#启动调度
scheduler.start()
通过BlockingScheduler调度器,配合mysql数据库,可实现定期向数据库中插入数据内容。
#定义一个任务,向数据库中插入数据
def task():
import random
import pandas as pd
import numpy as np
import sqlalchemy
from sqlalchemy import create_engine
#在数据库中搭建用户存储数据的表python_mysql_test
import sqlalchemy
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker,scoped_session
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column,String,Integer,Float
#创建连接引擎和连接会话
engine=create_engine("mysql+pymysql://root:123456@localhost:3306/mytest")
session=scoped_session(sessionmaker(bind=engine))
se=session()
#创建模型类映射
base=declarative_base()
class Python_mysql_test(base):
__tablename__="python_mysql_test"
user_id=Column(Float,primary_key=True)
name=Column(String(50))
age=Column(Integer)
base.metadata.create_all(engine)
#随机生成5个整数,用作id;0~50随机生成5个不重复整数
id1=np.random.randn(5)
#随机生成5个整数,用作age;18~90随机生成5个不重复整数
age1=random.sample(range(18,90),5)
#随机生成一个字母,用作name,字母小写编码为97~122
n=random.sample(range(96,122),5)
name1=[]
for i in n:
name1.append(chr(i))
for j in list(range(0,5)):
test=Python_mysql_test(user_id=id1[j],name=name1[j],age=age1[j])
se.add(test)
se.commit()
#定义一个定时任务
def APSche1():
from apscheduler.schedulers.blocking import BlockingScheduler
#创建调度器
scheduler=BlockingScheduler()
#添加时间间隔为5秒的任务
scheduler.add_job(task,"interval",seconds=3,id="jod_id",coalesce=True,replace_existing=True)
#启动调度
scheduler.start()
#运行任务
if __name__=="__main__":
APSche1()
实际上,配置pandas的to_sql函数,直接使用调度器BlockingScheduler也可完成持续的数据存储。
def task():
import random
import pandas as pd
import sqlalchemy
from sqlalchemy import create_engine
#随机生成5个整数,用作id;0~50随机生成5个不重复整数
id=random.sample(range(0,50),5)
#随机生成5个整数,用作age;18~90随机生成5个不重复整数
age=random.sample(range(18,90),5)
#随机生成一个字母,用作name,字母小写编码为97~122
n=random.sample(range(96,122),5)
name=[]
for i in n:
name.append(chr(i))
label=pd.DataFrame({"id":id,"name":name,"age":age})
en=create_engine("mysql+pymysql://root:123456@localhost:3306/mytest")
label.to_sql("python_mysql_test",con=en,if_exists="append",index=False)
print("保存完成!!!")
def APSche():
from apscheduler.schedulers.blocking import BlockingScheduler
#创建调度器
scheduler=BlockingScheduler()
#添加时间间隔为5秒的任务
scheduler.add_job(task,"interval",seconds=3,id="jod_id",coalesce=True,replace_existing=True)
#启动调度
scheduler.start()
if __name__=="__main__":
APSche()
#在代码环境不关闭的情况下,就可以持续运行代码,进行数据保存
(2)APScheduler+mysql实现定时任务持久化存储
这时,就要选择并配置调度器BackgroundScheduler,它不会阻塞主线程的运行;主要在框架程序(Django、flask等)后台使用。
from apscheduler.schedulers.background import BackgroundScheduler
#配置调度器
scheduler = BackgroundScheduler({
#apscheduler.jobstores.default配置存储器
'apscheduler.jobstores.default': {
'type': 'sqlalchemy', #数据库类型
'url': 'mysql+pymysql://root:123456@localhost:3306/mytest?charset=utf8',#存储器访问地址包括用户名密码地址端口等
'tablename': 'python_mysql_test' #将定时任务数据存储的表名
'apscheduler.executors.default': {
'class': 'apscheduler.executors.pool:ThreadPoolExecutor',
'max_workers': '20'
'apscheduler.executors.processpool': {
'type': 'processpool',
'max_workers': '10'
'apscheduler.job_defaults.coalesce': 'false',
'apscheduler.job_defaults.max_instances': '10',
'apscheduler.timezone': 'UTC',