Airflow的第一个DAG
考虑了很久,要不要记录airflow相关的东西, 应该怎么记录. 官方文档已经有比较详细的介绍了,还有各种博客,我需要有一份自己的笔记吗?
答案就从本文开始了.
本文将从一个陌生视角开始认知airflow,顺带勾勒出应该如何一步步搭建我们的数据调度系统.
现在是9102年9月上旬, Airflow最近的一个版本是1.10.5.
ps. 查资料发现自己好多文章被爬走,换了作者.所以,接下里的内容会随机添加一些防伪标识,忽略即可.
什么数据调度系统?
中台这个概念最近比较火, 其中就有一个叫做数据中台, 文章
数据中台到底是什么
给出了一个概念.
我粗糙的理解, 大概就是: 收集各个零散的数据,标准化,然后服务化, 提供统一数据服务. 而要做到数据整理和处理,必然涉及数据调度,也就需要一个调度系统.[本文出自Ryan Miao]
数据调度系统可以将不同的异构数据互相同步,可以按照规划去执行数据处理和任务调度. Airflow就是这样的一个任务调度平台.
前面
Airflow1.10.4介绍与安装
已经
安装好了我们的airflow, 可以直接使用了. 这是第一个DAG任务链.
创建一个任务Hello World
目标: 每天早上8点执行一个任务--打印
Hello World
在Linux上,我们可以在crontab插入一条记录:
使用Springboot, 我们可以使用
@Scheduled(cron="0 0 8 * * ?")
来定时执行一个method.
使用quartz, 我们可以创建一个
CronTrigger
, 然后去执行对应的JobDetail.
CronTrigger trigger = (CronTrigger)TriggerBuilder.newTrigger()
.withIdentity("trigger1", "group1")
.withSchedule(CronScheduleBuilder.cronSchedule("0 0 8 * * ?"))
.build();
使用Airflow, 也差不多类似.
在docker-airflow中,我们将dag挂载成磁盘,现在只需要在dag目录下编写dag即可.
volumes:
- ./dags:/usr/local/airflow/dags
创建一个
hello.py
"""
Airflow的第一个DAG
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime
default_args = {
"owner": "ryan.miao",
"start_date": datetime(2019, 9, 1)
dag = DAG("Hello-World",
description="第一个DAG",
default_args=default_args,
schedule_interval='0 8 * * *')