Airflow是比较流行的开源调度工具,可以实现各类工作负载的DAG编排与调度。您可以通过 AnalyticDB MySQL Spark Airflow Operator、Spark-Submit命令行工具来实现Airflow调度Spark任务。本文介绍如何通过Airflow调度 AnalyticDB MySQL Spark作业。
注意事项
-
AnalyticDB MySQL Spark支持的配置参数,请参见 Conf配置参数 。
-
如果您使用的是Apache Livy的调度方式, AnalyticDB MySQL Spark Livy Proxy相关工具会在近期发布,可与维护团队联系申请邀测使用。
Spark Airflow Operator命令行工具
准备工作
-
安装Airflow服务并启动。具体操作,请参见 Airflow社区文档 。
-
安装Airflow Spark插件。执行如下命令:
pip install https://static-aliyun-doc.oss-cn-hangzhou.aliyuncs.com/file-manage-files/zh-CN/20230608/qvjf/adb_spark_airflow-0.0.1-py3-none-any.whl
操作步骤
-
准备Connection,示例如下。具体操作,请参见 创建Connection 。
{ "auth_type": "AK", "access_key_id": "<your_access_key_ID>", "access_key_secret": "<your_access_key_secret>", "region": "<your_region>" } -
创建DAG声明Spark工作流,本文的DAG声明文件为
spark_dags.py。from datetime import datetime from airflow.models.dag import DAG from airflow_alibaba.providers.alibaba.cloud.operators.analyticdb_spark. import AnalyticDBSparkBatchOperator, AnalyticDBSparkSQLOperator with DAG( dag_id=DAG_ID, start_date=datetime(2021, 1, 1), default_args={"cluster_id": "<your_cluster_ID>", "rg_name": "<your_resource_group>", "region": "<your_region>"}, max_active_runs=1, catchup=False, ) as dag: spark_batch = AnalyticDBSparkBatchOperator( task_id="task1", file="oss://<bucket_name>/tmp/spark-examples.jar", class_name="org.apache.spark.examples.SparkPi" spark_sql = AnalyticDBSparkSQLOperator( task_id="task2", sql="SHOW DATABASES;" spark_batch >> spark_sql参数说明如下。
AnalyticDBSparkBatchOperator 支持配置的参数。
参数
是否必填
说明
file
是
Spark应用主文件的存储路径,文件路径需为绝对路径。主文件是入口类所在的JAR包或者Python的入口执行文件。
重要Spark应用主文件目前只支持存储在OSS中。
OSS Bucket与AnalyticDB MySQL集群需要在同一地域。
class_name
是
-
Java或Scala程序入口类名称,必填参数。
-
Python不需要指定入口类,非必填参数。
args
否
Spark应用参数。
conf
否
与开源Spark中的配置项基本一致,参数格式为key: value形式。与开源Spark用法不一致的配置参数及AnalyticDB MySQL特有的配置参数,请参见 Conf配置参数 。
jars
否
Spark应用依赖的JAR包。需填写JAR包文件的绝对路径。JAR包在运行时会被加入到Driver和Executor JVM的ClassPath里面。
重要Spark应用所依赖的所有JAR包必须存储在OSS中。
OSS Bucket与AnalyticDB MySQL集群需要在同一地域。
py_files
否
PySpark依赖的Python文件,后缀可以是ZIP、PY和EGG。如果依赖多个Python文件,建议使用ZIP或者EGG压缩包。您可以在Python代码中以module方式引用Python文件。
重要Spark应用所依赖的所有Python文件须存储在OSS中。
files
否
Spark应用依赖的文件资源,文件会被下载到Driver和Executor进程的当前执行目录下。
支持配置文件别名,例如oss://<testBucketName>/test/test1.txt#test1,test1为文件别名,您可以使用./test1或者./test1.txt访问文件。
说明files中包含名为log4j.properties的文件时,Spark会使用该log4j.properties文件作为日志配置。
Spark应用所依赖的所有文件须存储在OSS中。
driver_resource_spec
否
Spark driver的资源规格。默认值为medium。
不同型号的取值对应不同的规格,详情请参见 Spark资源规格列表 的型号列。
说明spark.driver.resourceSpec 与 spark.executor.resourceSpec 参数取值相同。
仅提交Spark离线应用时,可使用开源Spark参数,且取值需为 Spark资源规格列表 中的核数和内存。
executor_resource_spec
否
Spark executor的资源规格。默认值为medium。
不同型号的取值对应不同的规格,详情请参见 Spark资源规格列表 的型号列。
num_executors
否
Spark Executor个数。默认值为3。
archives
否
Spark应用依赖的压缩包资源,目前支持.TAR.GZ后缀。压缩包会被解压到当前Spark进程的当前目录下。
支持配置文件别名,例如oss://testBucketName/test/test1.tar.gz#test1,test1为文件别名。假设test2.txt是test1.tar.gz压缩包中的文件,您可以使用./test1/test2.txt或者./test1.tar.gz/test2.txt访问解压后的文件。
说明Spark应用所依赖的所有压缩包须存储在OSS中。压缩包解压缩失败,任务会失败。
name
否
Spark应用名称。
cluster_id
是
AnalyticDB MySQL 湖仓版(3.0) 集群ID。
rg_name
是
AnalyticDB MySQL 湖仓版(3.0) 集群的Job型资源组名称。
adb_spark_conn_id
是
AnalyticDB MySQL Spark Airflow Connection ID。默认值为
adb_spark_default。region
是
AnalyticDB MySQL 湖仓版(3.0) 集群所属地域ID。
polling_interval
否
扫描Spark应用状态周期。
AnalyticDBSparkSQLOperator 支持配置的参数。
参数
是否必填
说明
SQL
是
Spark SQL语句。
conf
否
与开源Spark中的配置项基本一致,参数格式为key: value形式。与开源Spark用法不一致的配置参数及AnalyticDB MySQL特有的配置参数,请参见 Conf配置参数 。
driver_resource_spec
否
Spark driver的资源规格。默认值为medium。
不同型号的取值对应不同的规格,详情请参见 Spark资源规格列表 的型号列。
说明spark.driver.resourceSpec 与 spark.executor.resourceSpec 参数取值相同。
仅提交Spark离线应用时,可使用开源Spark参数,且取值需为 Spark资源规格列表 中的核数和内存。
executor_resource_spec
否
Spark executor的资源规格。默认值为medium。
不同型号的取值对应不同的规格,详情请参见 Spark资源规格列表 的型号列。
num_executors
否
Spark Executor个数。默认值为3。
name
否
Spark应用名称。
cluster_id
是
AnalyticDB MySQL 湖仓版(3.0) 集群ID。
rg_name
是
AnalyticDB MySQL 湖仓版(3.0) 集群的Job型资源组名称。
adb_spark_conn_id
是
AnalyticDB MySQL Spark Airflow Connection ID。默认值为
adb_spark_default。region
是
AnalyticDB MySQL 湖仓版(3.0) 集群所属地域ID。
polling_interval
否
扫描Spark应用状态周期。
-
-
将
spark_dags.py文件存放至Airflow Configuration声明dags_folder所在的文件夹中。 -
执行DAG。具体操作请参见 Airflow社区文档 。
Spark-Submit命令行工具
对于
AnalyticDB MySQL
特有的配置项,例如clusterId、regionId、keyId、secretId、ossUploadPath,您可以在
AnalyticDB MySQL
Spark工具包的配置文件
conf/spark-defaults.conf
中进行配置,也可以通过Airflow参数来配置。详情请参见
参数配置
。
准备工作
准备工作一:安装Airflow服务
-
安装Airflow服务并启动。具体操作请参见 Airflow社区文档 。
-
安装Airflow Spark插件。执行如下命令:
pip3 install apache-airflow-providers-apache-spark重要-
您需要使用Python3来安装Airflow Spark插件。
-
安装apache-airflow-providers-apache-spark会默认安装社区版Pyspark,需要执行如下命令将pyspark卸载。
pip3 uninstall pyspark
-
准备工作二:下载并配置Spark-Submit命令行工具
-
下载Spark-Submit命令行工具包并进行配置。具体操作请参见 AnalyticDB MySQL Spark-Submit命令行工具 。
-
配置PATH路径。执行以下命令,将Spark-Submit命令行工具的地址加入Airflow执行地址。
export PATH=PATH:</your/adb/spark/path/bin>重要在启动Airflow之前需要将Spark-Submit加入到PATH中,否则调度任务可能会找不到Spark-Submit命令。
操作步骤
-
准备DAG声明文件。本文以创建Airflow DAG的demo.py文件为例。
from airflow.models import DAG from airflow.providers.apache.spark.operators.spark_jdbc import SparkJDBCOperator from airflow.providers.apache.spark.operators.spark_sql import SparkSqlOperator from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator from airflow.utils.dates import days_ago args = { 'owner': 'Aliyun ADB Spark', with DAG( dag_id='example_spark_operator', default_args=args, schedule_interval=None, start_date=days_ago(2), tags=['example'], ) as dag: adb_spark_conf = { "spark.driver.resourceSpec": "medium", "spark.executor.resourceSpec": "medium" # [START howto_operator_spark_submit] submit_job = SparkSubmitOperator( conf = adb_spark_conf, application="oss://<bucket_name>/jar/pi.py", task_id="submit_job", verbose=True # [END howto_operator_spark_submit] # [START howto_operator_spark_sql] sql_job = SparkSqlOperator( conn_id="spark_default", sql="SELECT * FROM yourdb.yourtable", conf=",".join([k+"="+v for k,v in adb_spark_conf.items()]), task_id="sql_job", verbose=True # [END howto_operator_spark_sql] submit_job >> sql_job -
将编辑完成的demo.py文件放至Airflow安装目录的dags目录下。
-
执行DAG。具体操作请参见 Airflow社区文档 。