文章目录
-
1 周期性执行任务【schedule_interual】
-
1.1 cron使用
-
1.2 常见用例
1 周期性执行任务【schedule_interual】
使用的参数为
schedule_interual
,也就是定时任务执行的时间。
需要注意的是
Airflow使用的是UTC
,即世界时间,比中国北京时间慢8小时。
需要注意的是任务开始执行的时间为
start_date
加上
schedule_interual
的第二个周期开始执行,例如
start_date
为11月3日,
schedule_interual
为
0 3 * * *
,那么任务将会在11月4日凌晨3点开始执行。
一般使用的格式是
cron格式
,格式如下图。
1.1 cron使用
cron格式如下:
格式含义:
参数范围:
标点含义:
中文解释:
-
逗号(,):逗号用于分隔列表中的项目。例如,在第五个字段(星期几)中使用“ MON,WED,FRI”表示星期一,星期三和星期五。
-
破折号(-):破折号定义范围。例如,2000–2010表示2000年至2010年(含)之间的每一年。
-
百分比(%):除非使用反斜杠(\)进行转义,否则命令中的百分号(%)会更改为换行符,并且将第一个%之后的所有数据作为标准输入发送至命令。
-
非标准字符:以下是非标准字符,仅在某些cron实现中存在,例如Quartz Java Scheduler。
-
L:“ L”代表“最后”。在“星期几”字段中使用时,它允许指定给定月份的结构,例如“最后一个星期五”(“ 5L ”)。在“每月的日期”字段中,它指定该月的最后一天。
-
W:“月”字段中允许使用“ W”字符。此字符用于指定最接近给定日期的工作日(星期一至星期五)。例如,如果将“ 15W ”指定为“月日”字段的值,则含义是:“离该月15日最近的工作日”。因此,如果15号是星期六,那么触发器将在14号星期五触发。如果15日是星期日,则触发器将在16日星期一触发。如果15号是星期二,那么它将在15号星期二触发。但是,如果将“ 1W”指定为月份的值,而第1个是星期六,则触发器将在第3个星期一触发,因为它不会“跳过”一个月日的边界。仅当月份中的某天是一天而不是范围或天数列表时,才可以指定“ W”字符。
-
哈希(#):星期几字段中允许使用“#”,并且必须在其后跟一个介于1到5之间的数字。它允许指定结构,例如给定月份的“第二个星期五”。例如,在“星期几”字段中输入“ 5#3”对应于每月的第三个星期五。
-
问号(?):在某些实现中,使用“ * ”代替“ * ”来保留月日或星期几为空。其他cron实现替代“?” cron守护程序的启动时间,因此它将更新为cron在上午8:25启动,并且每天每天该时间运行一次,直到再次重新启动为止。? ? * * * *25 8 * * * *
-
斜线(/):在vixie-cron中,可以将斜线与范围组合以指定步长值。例如,分钟字段中的* / 5表示每5分钟一次(有关频率,请参见下面的注释)。它是更详细的POSIX格式的缩写,形式为5,10,15,20,25,30,35,40,45,50,55,00。POSIX没有定义斜杠的用法;它的基本原理(在BSD扩展上做注释)指出该定义基于System V格式,但不排除扩展的可能性。
-
H:在詹金斯(Jenkins)连续积分系统中使用“ H”表示替换为“哈希”值。因此,代替表示诸如“ 20 * * * *”的固定数字(表示在每小时的每小时之后20分钟),“ H * * * *”表示每小时在每个任务的未指定但不变的时间执行任务。这允许随着时间的推移分散任务,而不是让所有任务同时开始并争夺资源。
请注意,通常无法表达频率;只有平均分配其范围的步长值才能表示准确的频率(对于分钟和秒,分别为/ 2,/ 3,/ 4,/ 5,/ 6,/ 10,/ 12,/ 15,/ 20和/ 30, 因为60为被这些数字整除的数小时;分别是/ 2,/ 3,/ 4,/ 6,/ 8和/ 12); 所有其他可能的“步骤”和所有其他字段在“重置”至下一分钟,第二天或一天之前,在时间单位末尾产生不一致的“短”时间段;例如,根据月份和leap年的不同,在天字段中输入* / 5有时会在1天,2天或3天之后执行;这是因为cron是无状态的(它不记得上一次执行的时间,也不计算精确频率计数所需要的时间与现在之间的差,而cron仅仅是模式匹配器)。
1.2 常见用例
-
schedule_interval=‘15 00 * * *’
-
每天08:01,09:01,10:01 到 22:01
-
schedule_interval=‘01 08-22/1 * * *’
-
schedule_interval=‘45 23 * * 6’
-
每天01:00, 01:05, 01:10, 直到 03:55
-
schedule_interval=‘*/5 1,2,3 * * *’
-
'*/5 * * * * ’
以上案例参考网址
2 任务超时后会怎么处理
2.1 实验
这里使用如下代码进行测试,
实验一【单个DGA超时,剩余的task怎么处理?】:通过运行两个py文件,两个文件都是60s【每10s写一次】向文件中写入6个
Test!
,t2.py文件的执行依赖于t1.py。通过控制
dagrun_timeout_sec
参数,来控制DAG执行的执行,分别设置为20s,80s,来看执行结果。
实验一结果:只要某个task开始执行,那么无论是否超时,都会把该任务执行完,而后续依赖于该任务执行的任务都不会被执行。也就是20s会把t1.py执行完,80s会把t1.py和t2.py都执行完。
实验二【在开始下一个周期的DGA任务执行,上个DGA执行超时,上一个DAG怎么执行?】:通过运行两个py文件,两个文件都是700s【700s写四次】向文件中写入4个
Test!
,t2.py文件的执行依赖于t1.py。通过控制
dagrun_timeout_sec
参数,来控制DAG执行的执行,分别设置为3600s,来看执行结果。设置
schedule_interval
为
0 * * * *
,也就是每小时执行一次,
实验二结果:当在一个执行周期开始时,上一个周期的任务无论执行完没有,都会直接开始第二个周期的任务。
部分执行结果如下:
执行方法:
nohup airflow webserver --port 10008 &
airflow scheduler
python3 template.py
airflow dags unpause a_test_dag548
template.py:
from airflow import DAG
import dagfactory
import sys
dag_factory = dagfactory.DagFactory("/root/airflow/dags/template.yml")
dag_factory.clean_dags(globals())
dag_factory.generate_dags(globals())
template.yml:
a_test_dag548:
default_args:
owner: 'jk_owner'
start_date: 2021-11-03
end_date: 2021-11-04
retries
: 1
retry_delay_sec: 20
schedule_interval: '59 2 * * *'
concurrency: 2
max_active_runs: 2
dagrun_timeout_sec: 20
default_view: 'tree'
orientation: 'LR'
description: 'this is an python test!'
on_success_callback_name: write_log
on_success_callback_file: /root/airflow/dags/write_log.py
on_failure_callback_name: write_log
on_failure_callback_file: /root/airflow/dags/write_log.py
tasks:
task_1:
operator: airflow.operators.bash.BashOperator
bash_command: 'python3 /root/airflow/dags/t1.py'
task_2:
operator: airflow.operators.bash.BashOperator
bash_command: 'python3 /root/airflow/dags/t2.py'
dependencies: [task_1]
关键参数解读:
-
start_date::开始日期;
-
end_date:结束日期;
-
retries:重试次数;
-
retry_delay_sec:再次尝试的时间;
-
concurrency:一个dag的同一时间最大可以运行的task数量;
-
max_active_runs:同一时间最多可以运行的dag runs的数量;
-
dagrun_timeout_sec:dag最长的执行时长;
-
default_view:视图;
-
orientation:方向;
-
on_success_callback_name:成功回调文件中的函数方法名;
-
on_success_callback_file:成功回调的文件绝对路径;
-
dependencies:依赖;
-
operator:执行的操作类型;
-
bash_command:执行的操作命令;
t1.py:
from threading import Timer
import time
import datetime
def writeText():
with open('/home/1.txt', 'a+') as f:
f.write("Test!\n")
def loop_func(func, second, num):
for i in range(num):
timer = Timer(second, func)
timer.start()
timer.join()
loop_func(writeText, 10, 6)
t2.py:
cat t2.py
from threading import Timer
import time
def writeText():
with open('/home/2.txt', 'a+') as f:
f.write("Test!\n")
def loop_func(func, second, num):
for i in range(num):
timer = Timer(second, func)
timer.start()
timer.join()
loop_func(writeText, 10, 6)
3 附录
AirFlow常见问题汇总
Airflow 中文文档
Airflow 中文文档2
AIrflow2.2.1英文官网