from apscheduler.schedulers.background import BackgroundScheduler
import time
def job():
print('job 3s')
time.sleep(5)
if __name__=='__main__':
sched = BackgroundScheduler(timezone='MST')
sched.add_job(job, 'interval', id='3_second_job', seconds=3)
sched.start()
while(True):
print('main 1s')
time.sleep(1)
运行这个程序,我们得到如下的输出:
main 1s
main 1s
main 1s
job 3s
main 1s
main 1s
main 1s
Execution of job "job (trigger: interval[0:00:03], next run at: 2018-05-07 02:44:29 MST)" skipped: maximum number of running instances reached (1)
main 1s
main 1s
main 1s
job 3s
main 1s
可见,3s时间到达后,并不会“重新启动一个job线程”,而是会跳过该次调度,等到下一个周期(再等待3s),又重新调度job()。
为了能让多个job()同时运行,我们也可以配置调度器的参数max_instances
,如下例,我们允许2个job()同时运行:
from apscheduler.schedulers.background import BackgroundScheduler
import time
def job():
print('job 3s')
time.sleep(5)
if __name__=='__main__':
job_defaults = { 'max_instances': 2 }
sched = BackgroundScheduler(timezone='MST', job_defaults=job_defaults)
sched.add_job(job, 'interval', id='3_second_job', seconds=3)
sched.start()
while(True):
print('main 1s')
time.sleep(1)
运行程序,我们得到如下的输出:
main 1s
main 1s
main 1s
job 3s
main 1s
main 1s
main 1s
job 3s
main 1s
main 1s
main 1s
job 3s
每个job是怎么被调度的
通过上面的例子,我们发现,调度器是定时调度job()函数,来实现调度的。
那job()函数会被以进程的方式调度运行,还是以线程来运行呢?
为了弄清这个问题,我们写了如下程序:
from apscheduler.schedulers.background import BackgroundScheduler
import time,os,threading
def job():
print('job thread_id-{0}, process_id-{1}'.format(threading.get_ident(), os.getpid()))
time.sleep(50)
if __name__=='__main__':
job_defaults = { 'max_instances': 20 }
sched = BackgroundScheduler(timezone='MST', job_defaults=job_defaults)
sched.add_job(job, 'interval', id='3_second_job', seconds=3)
sched.start()
while(True):
print('main 1s')
time.sleep(1)
运行程序,我们得到如下的输出:
main 1s
main 1s
main 1s
job thread_id-10644, process_id-8872
main 1s
main 1s
main 1s
job thread_id-3024, process_id-8872
main 1s
main 1s
main 1s
job thread_id-6728, process_id-8872
main 1s
main 1s
main 1s
job thread_id-11716, process_id-8872
可见,每个job()的进程ID都相同,但线程ID不同。所以,job()最终是以线程的方式被调度执行。
摘要本文介绍APScheduler最基本的用法“定时几秒后启动job”,解释其中两种调度器BackgroundScheduler和BlockingScheduler的区别,说明了如何做到“让job在start()后就开始运行”,详述“job执行时间大于定时调度时间”这种特殊情况的问题及解决方法,并说明了每个job都会以thread的方式被调度。基本的定时调度APScheduler是...
APScheduler是一个 Python 定时任务框架,提供了基于日期、固定时间间隔以及 crontab 类型的任务,并且可以持久化任务、并以 daemon 方式运行应用。
BlockingScheduler 是 APScheduler 中的调度器,APScheduler 中有两种常用的调度器, BlockingScheduler 和 Back
1、 线程睡眠函数 sleep() ——粗暴!一直占有 CPU 资源,导致后续操作无法执行
2、 threading.Timer(10, task, ()).start() # (间隔s,任务task, 函参)
3、 import sched
# 初始化 sched 模块的 scheduler 类
scheduler = sched.scheduler(time.time, time.sleep)
# 增加调度任务 enter(delay, priority, action, argument=(), kwargs={})
scheduler.enter(10, 1, ta
说到定时任务,你会想起 linux 自带的 crontab ,windows 自带的任务计划,都可以实现守时任务。没错,操作系统基本都会提供定时任务的实现,但是如果你想要更加精细化的控制,或者说任务程序需要跨平台运行,最好还是自己实现定时任务框架,Python 的 apscheduler 提供了非常丰富而且方便易用的定时任务接口。本文介绍如何使用 apscheduler 实现你的定时任务。
apscheduler 使用起来十分方便。提供了基于 日期、固定时间间隔、c...
from apscheduler.schedulers.background import BackgroundScheduler
from datetime import datetime
from trigger_manage import Trig
不知你有没有遇到APScheduler定时任务部分未执行的情况,我反正是遇到了,任务随机性的不执行,一看官网文档,才知道是自己的使用方法不对。
当前网上的大多数文章都只说了基本的使用方法,没说注意事项。我在这里整理一下:
1. Executor 是默认大小为10的ThreadPoolExecutor。
from apscheduler.schedulers.background import
这篇文章主要介绍了python中BackgroundScheduler和BlockingScheduler的区别,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
1、基本的定时调度
2、BlockingScheduler与BackgroundScheduler区别
APScheduler最基本的用法: “定时几秒后启动job”
两种调度器: BackgroundScheduler和BlockingScheduler的区别,
APScheduler有四个组件:
triggers: 触发器,用于设定触发任务的条件,触发器包含了调度的逻辑,每个任务都有自己的触发器决定该任务下次运行的时间。
job stores: 任务储存器,用于存放任务,把任务放在内存或者数据库中,一个
executors: 执行器,用于执行任务,可以设定执行模式为单线程或者线程池,任务完毕后,执...
#BlockingScheduler定时任务
from apscheduler.schedulers.blocking import BlockingScheduler
from datetime import datetime
首先看看周一到周五定时执行任务
# 输出时间
def job():
print(datetime.now().strtime("%Y-%m-%d %H:%M:%...
1、创建一个简单的定时任务
建一个每隔多少时间运行一次的任务进行说明
from apscheduler.schedulers.blocking import BlockingScheduler
from datetime import datetime
def my_clock():
print(f"调用时间: {datetime.now()}")
scheduler = BlockingScheduler() # 实例化一个调度器
# 每隔2秒调用一次 my_clock()函数
# 定义Actor网络
class ActorNetwork:
def __init__(self, state_size, action_size, learning_rate, name='ActorNetwork'):
with tf.variable_scope(name):
# 输入层
self.inputs = tf.placeholder(tf.float32, [None, state_size], name='inputs')
# 第一个隐藏层,使用relu激活函数
self.fc1 = tf.layers.dense(self.inputs, 64, activation=tf.nn.relu)
# 第二个隐藏层,使用relu激活函数
self.fc2 = tf.layers.dense(self.fc1, 128, activation=tf.nn.relu)
# 输出层,使用softmax激活函数,输出每个动作的概率
self.outputs = tf.layers.dense(self.fc2, action_size, activation=tf.nn.softmax)
# 定义损失函数,使用交叉熵
self.actions = tf.placeholder(tf.float32, [None, action_size], name='actions')
self.discounted_rewards = tf.placeholder(tf.float32, [None, ], name='discounted_rewards')
self.cross_entropy = tf.reduce_mean(tf.multiply(tf.log(self.outputs), self.actions))
self.loss = -tf.reduce_mean(self.discounted_rewards * self.cross_entropy)
# 定义优化器
self.optimizer = tf.train.AdamOptimizer(learning_rate).minimize(self.loss)
# 定义Critic网络
class CriticNetwork:
def __init__(self, state_size, learning_rate, name='CriticNetwork'):
with tf.variable_scope(name):
# 输入层
self.inputs = tf.placeholder(tf.float32, [None, state_size], name='inputs')
# 第一个隐藏层,使用relu激活函数
Jonathan Star: