相关文章推荐
豪爽的生菜  ·  C#使用 ...·  1 年前    · 
小眼睛的紫菜  ·  Uncaught TypeError: ...·  1 年前    · 
首发于 数据分析
python实现定时任务

python实现定时任务

有时希望每隔一段时间就执行一段程序,或者循环往复执行一个任务,比如最近比较火的给女朋友每天发送关爱信息,以及实时爬取某些内容等等。这里主要学习下常见的使用python方式实现定时任务,包括while True:+sleep()方式、threading.Timer定时器、调度模块schedule和任务框架APScheduler这四种。

先定义一个要完成的任务task,方便后续调用参考:

import datetime,time
def task():
    now=datetime.datetime.now()
    result=now.strftime("%Y-%m-%d %H:%M:%S")
    print(result)

1、while True:+sleep()

该方法缺点是只能实现同步任务,无法执行异步任务。

def loop():
    while True:
        task()
        #每五秒检查一次
        time.sleep(3)
#实际操作时为防止显示太多数据,可设定次数n来控制结果

2、threading.Timer定时器

threading.Timer是threading.Thread的派生类,是在指定时间n秒后执行一个函数功能。该方法的缺点是,运行次数过多会出现报错,虽然可以进行优化修改最大递归深度,但是运行达到最大CPU时python会直接销毁程序。

threading.Timer(interval,function,args=None,kwargs=None)
interval,秒为单位,指定该线程过多久启动
function,制定该线程调用什么函数,执行什么功能
args,适用于function的参数列表
kwargs,适用于function的参数字典
from threading import Timer
def timer():
    task()
    t=Timer(3,timer)
    t.start()

3、schedule调度模块

schedule是一个第三方轻量级的任务调度模块,可以按照秒seconds、分minutes、小时hour、日期day/month/wednesday或自定义事件执行时间,如果想执行多个任务也可添加多个task。但是他依然需要配合while True来使用,且占用CPU也比较多。

import schedule
#清空任务
 schedule.clear()
#创建一个按3秒间隔执行任务
schedule.every(3).seconds.do(task)
#创建一个按2秒间隔执行任务
schedule.every(2).seconds.do(task)
while True:
    schedule.run_pending()

4、APScheduler任务框架

(1)APScheduler

APScheduler是python的一个定时任务框架,用于执行周期或定时任务,可进行添加、删除定时任务,还可将任务存储到数据库中,实现任务的持久化,用起来非常方便。

它提供了基于日期、固定时间间隔以及crontab类型的任务,可持久化任务,可以很方便实现python定时任务系统。APScheduler包含了四种组件:

一是触发器triggers ,包含了调度逻辑,每个作业都有自己的触发器,用于确定下一次作业运行时间,触发器包括cron定时调度(某一刻执行)、interval间隔调度(多久执行一次)和date定时调度(只执行一次)三种。

二是作业存储job stores ,包含了预定的作业,默认将作业存储是将作业保存到内存中,其他作业存储在各种数据库中;作业数据在保存到持久化存储时被序列化,并在从它加载回来时反序列化;作业存储不会将数据保存在内存中,而是充当后端保存、加载、更新和搜索作业的中间人。作业存储器选择有两种,第一种是默认配置即内存,适合在程序崩溃重启整个程序时作业被重新添加到调度其中的环境;第二种是数据库,配合多种不同作业存储后端一起使用,包括memory、sqlalchemy、MongoDB、Redis等,适用于程序崩溃重启程序时作业需要从中断恢复正常运行的环境。

三是执行器executors ,包含了作业运行,通过将作业中指定的可调度对象提高给线程或进程池来完成操作,作业完成后执行程序通知调度程序,调度程序发出适当的事件。执行器默认THreadPoolExecutor,也可用ProcessPoolExecutor。

四是调度器schedulers ,调度程序;作业存储和执行器的配置是通过调度程序完成的,添加删除修改作业也是;调度器包括BlockingScheduler(适用调度程序是进程唯一运行的程序,调用start函数会阻塞当前线程,不能立即返回)、BackgroundScheduler(适用调度程序在应用程序的后台运行,调用start后主线程不会阻塞)、AsyclOScheduler(适用使用asyncio模块的应用程序)、GevenScheduler(适用使用gevent模块的应用程序)、TwistScheduler(适用构建Twisted的应用程序)、QtScheduler(适用构建Qt的应用程序)

from apscheduler.schedulers.blocking import BlockingScheduler
def APSche():
    #创建调度器
    scheduler=BlockingScheduler()
    #添加时间间隔为3秒的任务
    scheduler.add_job(task,"interval",seconds=3,id="test1")
    #添加时间间隔为5秒的任务
    scheduler.add_job(task,"interval",seconds=5,id="test1")
    #启动调度
    scheduler.start()

通过BlockingScheduler调度器,配合mysql数据库,可实现定期向数据库中插入数据内容。

#定义一个任务,向数据库中插入数据
def task():    
    import random    
    import pandas as pd    
    import numpy as np    
    import sqlalchemy    
    from sqlalchemy import create_engine    
    #在数据库中搭建用户存储数据的表python_mysql_test
    import sqlalchemy
    from sqlalchemy import create_engine
    from sqlalchemy.orm import sessionmaker,scoped_session
    from sqlalchemy.ext.declarative import declarative_base
    from sqlalchemy import Column,String,Integer,Float
    #创建连接引擎和连接会话
    engine=create_engine("mysql+pymysql://root:123456@localhost:3306/mytest")
    session=scoped_session(sessionmaker(bind=engine))
    se=session()
    #创建模型类映射
    base=declarative_base()
    class Python_mysql_test(base):
        __tablename__="python_mysql_test"
        user_id=Column(Float,primary_key=True)
        name=Column(String(50))
        age=Column(Integer)
    base.metadata.create_all(engine)
    #随机生成5个整数,用作id;0~50随机生成5个不重复整数
    id1=np.random.randn(5)
    #随机生成5个整数,用作age;18~90随机生成5个不重复整数
    age1=random.sample(range(18,90),5)
    #随机生成一个字母,用作name,字母小写编码为97~122
    n=random.sample(range(96,122),5)
    name1=[]
    for i in n:
        name1.append(chr(i))
        for j in list(range(0,5)):
        test=Python_mysql_test(user_id=id1[j],name=name1[j],age=age1[j])
        se.add(test)
        se.commit()
#定义一个定时任务
def APSche1():
    from apscheduler.schedulers.blocking import BlockingScheduler
    #创建调度器
    scheduler=BlockingScheduler()
    #添加时间间隔为5秒的任务
    scheduler.add_job(task,"interval",seconds=3,id="jod_id",coalesce=True,replace_existing=True)
    #启动调度
    scheduler.start()
#运行任务
if __name__=="__main__":
    APSche1()

实际上,配置pandas的to_sql函数,直接使用调度器BlockingScheduler也可完成持续的数据存储。

def task():
    import random
    import pandas as pd
    import sqlalchemy
    from sqlalchemy import create_engine
    #随机生成5个整数,用作id;0~50随机生成5个不重复整数
    id=random.sample(range(0,50),5)
    #随机生成5个整数,用作age;18~90随机生成5个不重复整数
    age=random.sample(range(18,90),5)
    #随机生成一个字母,用作name,字母小写编码为97~122
    n=random.sample(range(96,122),5)
    name=[]
    for i in n:
        name.append(chr(i))
    label=pd.DataFrame({"id":id,"name":name,"age":age})
    en=create_engine("mysql+pymysql://root:123456@localhost:3306/mytest")
    label.to_sql("python_mysql_test",con=en,if_exists="append",index=False)
    print("保存完成!!!")
def APSche():
    from apscheduler.schedulers.blocking import BlockingScheduler
    #创建调度器
    scheduler=BlockingScheduler()
    #添加时间间隔为5秒的任务
    scheduler.add_job(task,"interval",seconds=3,id="jod_id",coalesce=True,replace_existing=True)
    #启动调度
    scheduler.start()
if __name__=="__main__":
    APSche()
#在代码环境不关闭的情况下,就可以持续运行代码,进行数据保存

(2)APScheduler+mysql实现定时任务持久化存储

这时,就要选择并配置调度器BackgroundScheduler,它不会阻塞主线程的运行;主要在框架程序(Django、flask等)后台使用。

from apscheduler.schedulers.background import BackgroundScheduler
#配置调度器
scheduler = BackgroundScheduler({
    #apscheduler.jobstores.default配置存储器
    'apscheduler.jobstores.default': {
        'type': 'sqlalchemy', #数据库类型
        'url': 'mysql+pymysql://root:123456@localhost:3306/mytest?charset=utf8',#存储器访问地址包括用户名密码地址端口等
        'tablename': 'python_mysql_test' #将定时任务数据存储的表名
    'apscheduler.executors.default': {
        'class': 'apscheduler.executors.pool:ThreadPoolExecutor',
        'max_workers': '20'
    'apscheduler.executors.processpool': {
        'type': 'processpool',
        'max_workers': '10'
    'apscheduler.job_defaults.coalesce': 'false',
    'apscheduler.job_defaults.max_instances': '10',
    'apscheduler.timezone': 'UTC',