相关文章推荐
寂寞的手术刀  ·  198 道 K8s / Docker / ...·  1 年前    · 
销魂的风衣  ·  还在用log.isDebugEnabled( ...·  1 年前    · 
逆袭的大象  ·  Mac ...·  1 年前    · 
Airflow 1.10安装

Airflow 1.10安装

前言

本次安装Airflow版本为1.10,其需要依赖Python和DB,本次选择的DB为Mysql。

本次安装组件及版本如下:Airflow == 1.10.0
Python == 3.6.5
Mysql == 5.7

Python安装

略 详见: Python3安装(Linux环境)

安装mysql

略 详见: note.youdao.com/notesha

建库、建用户

建库

库名为airflow

create database airflow;

建用户

用户名为airflow,并且设置所有ip均可以访问。

create user 'airflow'@'%' identified by 'airflow';
create user 'airflow'@'localhost' identified by 'airflow';

用户授权

这里为新建的airflow用户授予airflow库的所有权限

grant all on airflow.* to 'airflow'@'%';
flush privileges;

Airflow安装

环境变量设置如下代码配置环境变量,此环境变量仅需要设置成临时变量即可并不需要配置成永久变量。

export SLUGIFY_USES_TEXT_UNIDECODE=yes

接下来利用pip安装airflow。

pip install apache-airflow==1.10.0

此时airflow会被安装到Python下的第三方包中,请记住这个地址,路径一般为${PYTHON_HOME}/lib/python3.6/sit-packages/airflow。

修改AIRFLOW_HOME 环境变量

笔者此次安装仅把HOME变量设置成临时变量,并未设置永久变量。

export AIRFLOW_HOME=/servers/airflow

执行airflow命令

在${PYTHON_HOME}/lib/python3.6/sit-packages/airflow/bin目录下执行

./airflow

此步骤是将airflow安装到刚刚设置的AIRFLOW_HOME目录下。执行这个命令可能汇报一些错误,可以不用太关注,只要保证AIRFLOW_HOME目录下生成了文件即证明本次执行成功。

安装Mysql模块

pip install "apache-airflow[mysql]"

这里可以简单说下,airflow依赖的其他组件均可以此方式安装。在之后安装password组件同样是通过此方式。

修改Airflow DB配置

修改${AIRFLOW_HOME}/airflow.cfg

sql_alchemy_conn = mysql://airflow:airflow@localhost:3306/airflow

参数的格式为mysql://帐号:密码@ip:port/db

初始化db

新建airflow依赖的表。

./airflow initdb

修改时区为东八区

Airflow默认的时间是GMT时间,比北京时间早8小时。这种设计是为了当Airflow集群分布在不同时区的时候时间依然是相同的,不会出现时间不同步的问题。

但是在笔者接触到的场景中为单节点服务,并且即使之后拓展也是在同一个时区,所有决定将时区修改为东八时区即北京时间。

修改时区分为以下几步:

  1. 修改:airflow.cfg文件。
default_timezone = Asia/Shanghai

这里修改的是schedule的调度时间,即在编写调度时间时可以直接写北京时间。

2. 修改webserver界面右上角当前时间

修改${PYTHON_HOME}/lib/python3.6/site-packages/airflow/www/templates/admin/master.html。

将注释部分的代码修改成红框内的代码。注释内的代码即为将系统时间转为GMT时间,修改后的代码为直接取系统时间,不做时区转换。

修改后的效果如图红框处的效果。

3. 修改webserver lastRun时间

  • 修改 ${PYTHON_HOME}/lib/python3.6/site-packages/airflow/models.py在get_last_dagrun方法上添加
def utc2local(self,utc):
        import time
        epoch = time.mktime(utc.timetuple())
        offset = datetime.fromtimestamp(epoch) - datetime.utcfromtimestamp(epoch)
        return utc + offset

图示如下:

  • 修改${PYTHON_HOME}/lib/python3.6/site-packages/airflow/www/templates/airflow/dags.html中 last_run.execution_date.strftime("%Y-%m-%d %H:%M")和last_run.start_date.strftime("%Y-%m-%d %H:%M")分别为:
dag.utc2local(last_run.execution_date).strftime("%Y-%m-%d %H:%M")
dag.utc2local(last_run.start_date).strftime("%Y-%m-%d %H:%M")

图示如下:

* (可选操作) 重启webserver

如果按照本文顺序安装则不需要执行此步骤。如果在修改时区前已经启动过airflow则需要重启webserver。启动方式在后文会写到,停服务的方式直接kill进程的方式即可。

修改后效果如图:

用户认证

本文采用的用户认证方式为password方式,其他方式如LDAP同样支持但是本文不会介绍。笔者在安装时实验过LDAP方式但是未成功过。

  1. 安装passsword组件
pip install "apache-airflow[password]"

2. 修改 airflow.cfg

[webserver]
authenticate = True
auth_backend = airflow.contrib.auth.backends.password_auth

3. 在python环境中执行如下代码以添加账户:

import airflow  
from airflow import models, settings  
from airflow.contrib.auth.backends.password_auth import PasswordUser  
user = PasswordUser(models.User())  
user.username = 'admin'  # 用户名
user.email = 'emailExample@163.com' # 用户邮箱  
user.password = 'password'   # 用户密码
session = settings.Session()  
session.add(user)  
session.commit()  
session.close()  
exit() 

配置邮件服务

此配置设置的是dag的task失败或者重试时发送邮件的发送者。配置如下:

[smtp]
# If you want airflow to send emails on retries, failure, and you want to use
# the airflow.utils.email.send_email_smtp function, you have to configure an
smtp_host = smtp.163.com
smtp_starttls = True
smtp_ssl = False
# Uncomment and set the user/pass settings if you want to use SMTP AUTH
smtp_user = mailExample@163.com
smtp_password = password
smtp_port = 25
smtp_mail_from = mailExample@163.com

接下来简单把dag的Python代码列出来,以供参考:

default_args = {
  'owner': 'ownerExample',
  'start_date': datetime(2018, 9, 18),
  'email': ['mailReceiver@163.com'], # 出问题时,发送报警Email的地址,可以填多个,用逗号隔开。
  'email_on_failure': ['mailReceiver@163.com'], # 任务失败且重试次数用完时发送Email。
  'email_on_retry': True, # 任务重试时是否发送Email
  'depends_on_past': False, # 是否依赖于过去。如果为True,那么必须要昨天的DAG执行成功了,今天的DAG才能执行。
  'retries': 3,