相关文章推荐
欢快的四季豆  ·  python提示框用法-掘金·  1 年前    · 
耍酷的马克杯  ·  asp.net - ...·  1 年前    · 

MaxCompute支持您使用Apache Airflow通过Python接口实现作业调度。本文为您介绍如何使用Apache Airflow的Python Operator调度MaxCompute作业。

Apache Airflow是Airbnb开源的、基于Python编写的调度工具,基于有向无环图(DAG),可以定义一组有依赖的作业,并按照依赖顺序依次执行作业。还支持通过Python定义子作业,并支持各种Operate操作器,灵活性大,能满足用户的各种需求。更多Apache Airflow信息,请参见 Apache Airflow

在执行操作前,请确认您已满足如下条件:
  • 已安装PyODPS。

    更多安装PyODPS操作,请参见 安装PyODPS

  • 已安装并启动Apache Airflow。

    更多安装及启动Apache Airflow操作,请参见 Apache Airflow快速入门

    本文中的Apache Airflow示例版本为1.10.7。

    步骤一:在Apache Airflow家目录编写调度Python脚本

    编写作业调度Python脚本并保存为.py文件,脚本文件中会呈现完整的调度逻辑及对应的调度作业名称。假设Python脚本名称为Airiflow_MC.py,脚本内容示例如下:
    # -*- coding: UTF-8 -*-
    import sys
    import os
    from odps import ODPS
    from odps import options
    from airflow import DAG
    from airflow.operators.python_operator import PythonOperator
    from datetime import datetime, timedelta
    from configparser import ConfigParser
    import time
    reload(sys)
    sys.setdefaultencoding('utf8')
    #修改系统默认编码
    #MaxCompute参数设置
    options.sql.settings = {'options.tunnel.limit_instance_tunnel': False, 'odps.sql.allow.fullscan': True}
    cfg = ConfigParser()
    cfg.read("odps.ini")
    print(cfg.items())
    odps = ODPS(cfg.get("odps","<access_id>"),cfg.get("odps","<secret_access_key>"),cfg.get("odps","project"),cfg.get("odps","endpoint"))
    default_args = {
        'owner': 'airflow',
        'depends_on_past': False,
        'retry_delay': timedelta(minutes=5),
        'start_date':datetime(2020,1,15)
        # 'email': ['airflow@example.com'],
        # 'email_on_failure': False,
        # 'email_on_retry': False,
        # 'retries': 1,
        # 'queue': 'bash_queue',
        # 'pool': 'backfill',
        # 'priority_weight': 10,
        # 'end_date': datetime(2016, 1, 1),
    #调度流程
    dag = DAG(
        'Airiflow_MC', default_args=default_args, schedule_interval=timedelta(seconds=30))
    def read_sql(sqlfile):
        with io.open(sqlfile, encoding='utf-8', mode='r') as f:
            sql=f.read()
        f.closed
        return sql
    #调度作业
    def get_time():
        print '当前时间是{}'.format(time.time())
        return time.time()
    #调度作业
    def mc_job ():
        project = odps.get_project()  # 取到默认项目。
        instance=odps.run_sql("select * from long_chinese;")
        print(instance.get_logview_address())
        instance.wait_for_success()
        with instance.open_reader() as reader:
            count = reader.count
        print("查询表数据条数:{}".format(count))
        for record in reader:
            print record
        return count
    t1 = PythonOperator (
        task_id = 'get_time' ,
        provide_context = False ,
        python_callable = get_time,
        dag = dag )
    t2 = PythonOperator (
        task_id = 'mc_job' ,
        provide_context = False ,
        python_callable = mc_job ,
        dag = dag )
    t2.set_upstream(t1)

    步骤二:提交调度脚本

  • 在系统的命令行窗口执行如下命令提交 步骤一 中编写的调度作业Python脚本。
    python Airiflow_MC.py
  • 在系统的命令行窗口执行如下命令生成调度流程并测试调度作业。
    # print the list of active DAGs
    airflow list_dags
    # prints the list of tasks the "tutorial" dag_id
    airflow list_tasks Airiflow_MC
    # prints the hierarchy of tasks in the tutorial DAG
    airflow list_tasks Airiflow_MC --tree
    #测试task
    airflow test Airiflow_MC get_time 2010-01-16
    airflow test Airiflow_MC mc_job 2010-01-16
  •