备案 控制台
学习
实践
活动
专区
工具
TVP
写文章
专栏首页 Ryan Miao 调度系统Airflow的第一个DAG
3 0

海报分享

调度系统Airflow的第一个DAG

Airflow的第一个DAG

考虑了很久,要不要记录airflow相关的东西, 应该怎么记录. 官方文档已经有比较详细的介绍了,还有各种博客,我需要有一份自己的笔记吗?

答案就从本文开始了.

本文将从一个陌生视角开始认知airflow,顺带勾勒出应该如何一步步搭建我们的数据调度系统.

现在是9102年9月上旬, Airflow最近的一个版本是1.10.5.

ps. 查资料发现自己好多文章被爬走,换了作者.所以,接下里的内容会随机添加一些防伪标识,忽略即可.

什么数据调度系统?

中台这个概念最近比较火, 其中就有一个叫做数据中台, 文章 数据中台到底是什么 给出了一个概念. 我粗糙的理解, 大概就是: 收集各个零散的数据,标准化,然后服务化, 提供统一数据服务. 而要做到数据整理和处理,必然涉及数据调度,也就需要一个调度系统.[本文出自Ryan Miao] 数据调度系统可以将不同的异构数据互相同步,可以按照规划去执行数据处理和任务调度. Airflow就是这样的一个任务调度平台.

前面 Airflow1.10.4介绍与安装 已经 安装好了我们的airflow, 可以直接使用了. 这是第一个DAG任务链.

创建一个任务Hello World

目标: 每天早上8点执行一个任务--打印 Hello World

在Linux上,我们可以在crontab插入一条记录:

使用Springboot, 我们可以使用 @Scheduled(cron="0 0 8 * * ?") 来定时执行一个method.

使用quartz, 我们可以创建一个 CronTrigger , 然后去执行对应的JobDetail.

 CronTrigger trigger = (CronTrigger)TriggerBuilder.newTrigger()
            .withIdentity("trigger1", "group1")
            .withSchedule(CronScheduleBuilder.cronSchedule("0 0 8 * * ?"))
            .build();

使用Airflow, 也差不多类似.

在docker-airflow中,我们将dag挂载成磁盘,现在只需要在dag目录下编写dag即可.

 volumes:
            - ./dags:/usr/local/airflow/dags

创建一个 hello.py

"""
Airflow的第一个DAG
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime
default_args = {
    "owner": "ryan.miao",
    "start_date": datetime(2019, 9, 1)
dag = DAG("Hello-World", 
        description="第一个DAG",
        default_args=default_args, 
        schedule_interval='0 8 * * *')