一、Airflow是什么

airflow 是一个编排、调度和监控workflow的平台,由Airbnb开源,现在在Apache Software Foundation 孵化。airflow 将workflow编排为由tasks组成的DAGs(有向无环图),调度器在一组workers上按照指定的依赖关系执行tasks。同时,airflow 提供了丰富的命令行工具和简单易用的用户界面以便用户查看和操作,并且airflow提供了监控和报警系统。

二、Airflow的核心概念

  1. DAGs:即有向无环图(Directed Acyclic Graph),将所有需要运行的tasks按照依赖关系组织起来,描述的是所有tasks执行的顺序。
  2. Operators:airflow内置了很多operators,如BashOperator 执行一个bash 命令,PythonOperator 调用任意的Python 函数,EmailOperator 用于发送邮件,HTTPOperator 用于发送HTTP请求, SqlOperator 用于执行SQL命令...同时,用户可以自定义Operator,这给用户提供了极大的便利性。可以理解为用户需要的一个操作,是Airflow提供的类
  3. Tasks:Task 是 Operator的一个实例
  4. Task Instance:由于Task会被重复调度,每次task的运行就是不同的task instance了。Task instance 有自己的状态,包括"running", "success", "failed", "skipped", "up for retry"等。
  5. Task Relationships:DAGs中的不同Tasks之间可以有依赖关系

三、使用AirFlow完成天级的任务调度

说了这么多抽象的概念,估计看官还是云里雾里,下面就直接举个例子来说明吧。

1. 安装airflow

Airflow可以约等于只支持linux和mac,Windows上极其难装,笔者放弃了.
安装也很简单,以下代码来自官方文档,使用了Python的pip管理:

# airflow needs a home, ~/airflow is the default,
# but you can lay foundation somewhere else if you prefer
# (optional)
export AIRFLOW_HOME=~/airflow
# install from pypi using pip
pip install apache-airflow
# initialize the database
airflow initdb
# start the web server, default port is 8080
airflow webserver -p 8080
# start the scheduler
airflow scheduler
# visit localhost:8080 in the browser and enable the example dag in the home page

安装好了以后访问localhost:8080即可访问ui界面

2. 基本配置

  1. 需要创建 ~/airflow/dags 目录,这个目录是默认的存放DAG的地方,想修改的话可以修改 ~/airflow/airflow.cfg 文件
  2. 修改airflow的数据库
    airflow会使用sqlite作为默认的数据库,此情况下airflow进行调度的任务都只能单个的执行.在调度任务量不大的情况下,可以使用sqlite作为backend.如果想scale out的话,需要修改配置文件,官方推荐使用mysql或者postgresql作为backend数据库.

3. 使用PostgresOperator执行SQL完成ETL任务

通过搜集信息,了解到PostgresOperator能执行SQL,并且还支持传参数.能解决大多数ETL任务中的传参问题.传参使用的是Python的Jinjia模块.

  1. 创建DAG
    首先创建一个test_param_sql.py文件.内容如下:
from datetime import datetime, timedelta
import airflow
from airflow.operators.postgres_operator import PostgresOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.models import Variable
args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2019, 7, 26), #start_date会决定这个DAG从哪天开始生效
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
# Variable是Airflow提供的用户自定义变量的功能,在UI界面的Admin -> Variable下可以进行增删改查,此处笔者定义了sql_path作为存放sql文件的地方
tmpl_search_path = Variable.get("sql_path")  
dag = airflow.DAG(
    'test_param_sql',
    schedule_interval=timedelta(days=1), # schedule_interval是调度的频率
    template_searchpath=tmpl_search_path, 
    default_args=args,
    max_active_runs=1)
test_param_sql = PostgresOperator(
    task_id='test_param_sql',
    postgres_conn_id='postgres_default',
    sql='param_sql.sql',
    dag=dag,
    params={'period': '201905'},
    pool='pricing_pool')
match_finish = DummyOperator(
    task_id='match_finish',
    dag=dag
test_param_sql >> match_finish
  1. 准备要执行的Sql文件
    创建test_sql.sql文件.
    SQL文件会被Jinjia解析,可以使用一些宏来实现时间的替换 例

{ { ds }} 会被转换为当天的 YYYY-MM-DD 格式的日期

{ { ds_nodash }} 会被转换为当天的 YYYYMMDD的格式的日期

在本例里则是通过{ {params.period}} 取到了 DAG上传入的参数,

insert into test.param_sql_test
select * from test.dm_input_loan_info_d
where period = {{params.period}};
  1. 整体的目录结构如下
    dags/
    test_param_sql.py
    sql/
    test_sql.sql

  2. 测试dag是否正确
    可以使用 airflow test dag_id task_id date 进行测试,测试会执行Operator,Operator指定的行为会进行调度. 但是不会将执行的行为记录到Airflow的数据库里

  3. 发布
    把文件放到~/airflow/dags目录下,sql文件不要放在dags目录下,可以找其他地方(比如同级目录),配置好上文说到的Variable,能找到即可.笔者的理解是,airflow会扫描dags目录下的内容,并尝试解析成dag,如果有不能成功解析的内容,ui界面上会有错误提示,导致dag显示不出来等问题.

其他有用的信息

  1. 如何在dag.py里引入其他的本地python模块
    需要把本地的python模块放到一个zip文件里,例如:
    my_dag1.py
    my_dag2.py
    package1/ init .py
    package1/functions.py
    然后把这个zip文件放到dags目录下,才能被正确解析

  2. pooling可以控制任务的并行度,如果给DAG指定了一个不存在的pooling,任务会一直处于scheduled的状态,不继续进行

转载于:https://www.cnblogs.com/yuuken/p/11270159.html

一、Airflow是什么airflow 是一个编排、调度和监控workflow的平台,由Airbnb开源,现在在Apache Software Foundation 孵化。airflow 将workflow编排为由tasks组成的DAGs(有向无环图),调度器在一组workers上按照指定的依赖关系执行tasks。同时,airflow 提供了丰富的命令行工具和简单易用的用户界面以便用户... 1. 如何安装和配置 Airflow ? 2. 如何通过 Airflow UI 监控 data pipeline (管道)并对其进行故障排除 3. 什么是 Airflow Platform ? 4. Airflow 是如何进行数据分析,追踪数据,调试数据流的? 5. Airflow 命令行接口的基本操作有哪些? 一、基础概念 Airflow 是Airbnb内部发起并开源的一个 ETL 管理平台, 使用 Python 编写 实现 的任务管理、 调度 、监控工作流平台。这是其官方文档地址:Apache Airflow (incubating) Documentation ,关于 airflow 产品的 使用 ,里面有详细的介绍。
Airflow 是 Apache 基金会的一套用于创建、管理和监控工作流程的开源平台,是一套非常优秀的任务 调度 工具。截至2022年7月,在GitHub上已经拥有近27k的star。 本文主要介绍一下 Airflow 2.3.2版本,各个功能模块,以及如何 使用 。只浅浅提了一下 Airflow 的功能点,具体内容还是链接到了官方文档。......
最近工作任务需要把原来 使用 Kettle的 ETL 流程迁移到Hadoop平台上,就需要找一个替代Kettle工作流部分的工具。在大数据环境下,常用的无非是Oozie, Airflow 或者Azkaban。经过简单的评估之后,我们选择了轻量化的 Airflow 作为我们的工作流工具。 Airflow 是一个工作流分配管理系统,通过有向非循环图的方式管理任务流程,设置任务依赖关系和时间 调度 Airflow ... 真的是想不通, Airflow 不论社区活跃度还是Github的star数都是远胜于Azkaban还有EasyScheduler的,但是为何却连一个完备的安装 教程 都没有呢?是我的需求太高?真的是心累不已,整整把搜索引擎还有youtube翻来覆去也没让我感到满足……不过好在,一步一坑一脚印的最终搭建连通好了环境以及Operator。好了,废话不多说,开始 Airflow 今日份安装 教程 。 1.安...
Airflow 作为一款开源分布式任务 调度 框架,已经在业内广泛应用。本文总结了 Freewheel Transformer 团队近两年 使用 Airflow 作为 调度 器,编排各种批处理场景下 ETL Data Pipelines 的经验,希望能为正在探索 Airflow 的技术团队提供一些参考价值。 为什么选择 Airflow ? FreeWheel 的批数据处理 使用 场景主要分成两种,一种是固定时间 调度 ETL pipelines, 比如 hourly、daily、weekly 等 pipel...
很幸运,能参加公司产品组的数据中台项目研发,在离线业务中,针对 ETL 任务的 调度 ,以及拖拽脚本, 使用 的是Apache公司开源的 调度 平台 Airflow 进行封装。 文章目录前言一、 Airflow 介绍二、 Airflow 的 Web 界面1. 选中其中一个DAG三、 Airflow 中的作业和任务1. DAG2. Task四、 Airflow 调度 时间五、 Airflow 调度 方式1. 调度 方式六、作业操作框七、 Airflow 的 菜单栏界面1. 池2.连接3.XComs4.变量总结 随着离线业务的逐渐发展,定时 调度
Airflow 能做什么 Airflow 是一个工作流分配管理系统,通过有向非循环图的方式管理任务流程,设置任务依赖关系和时间 调度 Airflow 独立于我们要运行的任务,只需要把任务的名字和运行方式提供给 Airflow 作为一个task就可以。安装和 使用 最简单安装在Linux终端运行如下命令 (需要已安装好 python 2.x和pip):pip install airflow pip install "air
from airflow import DAG from airflow .operators.bash_operator import BashOperator from datetime import datetime, timedelta # 默认参数 default_args = { 'owner': 'me', 'start_date': datetime(2022, 1, 1), 'depends_on_past': False, 'retries': 1, 'retry_delay': timedelta(minutes=5), # DAG 定义 dag = DAG( 'my_dag_id', default_args=default_args, schedule_interval=timedelta(hours=1), # 任务定义 task = BashOperator( task_id='my_task_id', bash_command='hive -f my_hive_script.hql', dag=dag, 希望这些代码对您有所帮助!如果您有其他问题,请随时告诉我。