Airflow 1.10安装
前言
本次安装Airflow版本为1.10,其需要依赖Python和DB,本次选择的DB为Mysql。
本次安装组件及版本如下:Airflow == 1.10.0
Python == 3.6.5
Mysql == 5.7
Python安装
略 详见: Python3安装(Linux环境)
安装mysql
略 详见: http:// note.youdao.com/notesha re?id=d9233511a08f557b9bf4cf939f42d0b2
建库、建用户
建库
库名为airflow
create database airflow;
建用户
用户名为airflow,并且设置所有ip均可以访问。
create user 'airflow'@'%' identified by 'airflow';
create user 'airflow'@'localhost' identified by 'airflow';
用户授权
这里为新建的airflow用户授予airflow库的所有权限
grant all on airflow.* to 'airflow'@'%';
flush privileges;
Airflow安装
环境变量设置如下代码配置环境变量,此环境变量仅需要设置成临时变量即可并不需要配置成永久变量。
export SLUGIFY_USES_TEXT_UNIDECODE=yes
接下来利用pip安装airflow。
pip install apache-airflow==1.10.0
此时airflow会被安装到Python下的第三方包中,请记住这个地址,路径一般为${PYTHON_HOME}/lib/python3.6/sit-packages/airflow。
修改AIRFLOW_HOME 环境变量
笔者此次安装仅把HOME变量设置成临时变量,并未设置永久变量。
export AIRFLOW_HOME=/servers/airflow
执行airflow命令
在${PYTHON_HOME}/lib/python3.6/sit-packages/airflow/bin目录下执行
./airflow
此步骤是将airflow安装到刚刚设置的AIRFLOW_HOME目录下。执行这个命令可能汇报一些错误,可以不用太关注,只要保证AIRFLOW_HOME目录下生成了文件即证明本次执行成功。
安装Mysql模块
pip install "apache-airflow[mysql]"
这里可以简单说下,airflow依赖的其他组件均可以此方式安装。在之后安装password组件同样是通过此方式。
修改Airflow DB配置
修改${AIRFLOW_HOME}/airflow.cfg
sql_alchemy_conn = mysql://airflow:airflow@localhost:3306/airflow
参数的格式为mysql://帐号:密码@ip:port/db
初始化db
新建airflow依赖的表。
./airflow initdb
修改时区为东八区
Airflow默认的时间是GMT时间,比北京时间早8小时。这种设计是为了当Airflow集群分布在不同时区的时候时间依然是相同的,不会出现时间不同步的问题。
但是在笔者接触到的场景中为单节点服务,并且即使之后拓展也是在同一个时区,所有决定将时区修改为东八时区即北京时间。
修改时区分为以下几步:
- 修改:airflow.cfg文件。
default_timezone = Asia/Shanghai
这里修改的是schedule的调度时间,即在编写调度时间时可以直接写北京时间。
2. 修改webserver界面右上角当前时间
修改${PYTHON_HOME}/lib/python3.6/site-packages/airflow/www/templates/admin/master.html。
将注释部分的代码修改成红框内的代码。注释内的代码即为将系统时间转为GMT时间,修改后的代码为直接取系统时间,不做时区转换。
修改后的效果如图红框处的效果。
3. 修改webserver lastRun时间
- 修改 ${PYTHON_HOME}/lib/python3.6/site-packages/airflow/models.py在get_last_dagrun方法上添加
def utc2local(self,utc):
import time
epoch = time.mktime(utc.timetuple())
offset = datetime.fromtimestamp(epoch) - datetime.utcfromtimestamp(epoch)
return utc + offset
图示如下:
- 修改${PYTHON_HOME}/lib/python3.6/site-packages/airflow/www/templates/airflow/dags.html中 last_run.execution_date.strftime("%Y-%m-%d %H:%M")和last_run.start_date.strftime("%Y-%m-%d %H:%M")分别为:
dag.utc2local(last_run.execution_date).strftime("%Y-%m-%d %H:%M")
dag.utc2local(last_run.start_date).strftime("%Y-%m-%d %H:%M")
图示如下:
* (可选操作) 重启webserver
如果按照本文顺序安装则不需要执行此步骤。如果在修改时区前已经启动过airflow则需要重启webserver。启动方式在后文会写到,停服务的方式直接kill进程的方式即可。
修改后效果如图:
用户认证
本文采用的用户认证方式为password方式,其他方式如LDAP同样支持但是本文不会介绍。笔者在安装时实验过LDAP方式但是未成功过。
- 安装passsword组件
pip install "apache-airflow[password]"
2. 修改 airflow.cfg
[webserver]
authenticate = True
auth_backend = airflow.contrib.auth.backends.password_auth
3. 在python环境中执行如下代码以添加账户:
import airflow
from airflow import models, settings
from airflow.contrib.auth.backends.password_auth import PasswordUser
user = PasswordUser(models.User())
user.username = 'admin' # 用户名
user.email = 'emailExample@163.com' # 用户邮箱
user.password = 'password' # 用户密码
session = settings.Session()
session.add(user)
session.commit()
session.close()
exit()
配置邮件服务
此配置设置的是dag的task失败或者重试时发送邮件的发送者。配置如下:
[smtp]
# If you want airflow to send emails on retries, failure, and you want to use
# the airflow.utils.email.send_email_smtp function, you have to configure an
smtp_host = smtp.163.com
smtp_starttls = True
smtp_ssl = False
# Uncomment and set the user/pass settings if you want to use SMTP AUTH
smtp_user = mailExample@163.com
smtp_password = password
smtp_port = 25
smtp_mail_from = mailExample@163.com
接下来简单把dag的Python代码列出来,以供参考:
default_args = {
'owner': 'ownerExample',
'start_date': datetime(2018, 9, 18),
'email': ['mailReceiver@163.com'], # 出问题时,发送报警Email的地址,可以填多个,用逗号隔开。
'email_on_failure': ['mailReceiver@163.com'], # 任务失败且重试次数用完时发送Email。
'email_on_retry': True, # 任务重试时是否发送Email
'depends_on_past': False, # 是否依赖于过去。如果为True,那么必须要昨天的DAG执行成功了,今天的DAG才能执行。
'retries': 3,