本文介紹了如何開發 AnalyticDB for MySQL Spark Python作業,以及如何通過VirtualEnv技術打包Python作業的運行環境。
前提條件
-
叢集的產品系列為 企業版、基礎版或湖倉版 。
-
叢集與OSS儲存空間位於相同地區。
-
已在 企業版、基礎版或湖倉版 叢集中 建立Job型資源群組 。
-
已建立 AnalyticDB for MySQL 叢集的資料庫帳號。
-
如果是通過阿里雲帳號訪問,只需 建立高許可權帳號 。
-
如果是通過RAM使用者訪問,需要 建立高許可權帳號和普通帳號 並且將 RAM使用者綁定到普通帳號 上。
-
PySpark的基本用法
-
編寫如下樣本程式,並將樣本程式儲存為
example.py。from pyspark.sql import SparkSession if __name__ == "__main__": spark = SparkSession.builder.getOrCreate() df = spark.sql("SELECT 1+1") df.printSchema() df.show() -
將
example.py程式上傳到OSS中。具體操作,請參見 控制台上傳檔案 。 -
進入Spark開發編輯器。
-
登入 雲原生資料倉儲AnalyticDB MySQL控制台 ,在左上方選擇叢集所在地區。在左側導覽列,單擊 集群清單 ,然後單擊目的地組群ID。
-
在左側導覽列,單擊 作業開發 > Spark Jar 開發 。
-
-
在編輯器視窗上方,選擇Job型資源群組和Spark作業類型。本文以Batch類型為例。
-
在編輯器中執行以下作業內容。
{ "name": "Spark Python Test", "file": "oss://testBucketName/example.py", "conf": { "spark.driver.resourceSpec": "small", "spark.executor.instances": 1, "spark.executor.resourceSpec": "small" }參數說明請參見 參數說明 。
使用Python依賴
使用方法
如果您使用自行開發或第三方開發的依賴開發Python程式時,需將使用的依賴上傳至OSS中,並在提交Spark作業時配置
pyFiles
參數。
樣本
本文樣本以引入自訂Function Compute員工的稅後收入為例。
樣本將資料檔案
staff.csv
上傳至OSS中。
staff.csv
中的樣本資料如下:
name,age,gender,salary
Lucky,25,male,100
Lucy,23,female,150
Martin,30,male,180
Rose,31,female,200
-
開發依賴並上傳至OSS中。
-
建立名為
tools的檔案夾,並在該檔案夾下建立名為func.py的程式 。def tax(salary): convert string to int and cut 15% tax from the salary :param salary: The salary of staff worker :return: return 0.15 * int(salary) -
將
tools檔案夾壓縮後上傳至 OSS中。本文樣本為tools.zip。說明如果依賴多個Python檔案,建議您使用gz壓縮包進行壓縮。您可以在Python代碼中以module方式引用Python檔案。
-
-
編寫名為
example.py的 樣本程式。from __future__ import print_function from pyspark.sql import SparkSession from pyspark.sql.functions import udf from pyspark.sql.types import FloatType import sys # import third party file from tools import func if __name__ == "__main__": # init pyspark context spark = SparkSession.builder.appName("Python Example").getOrCreate() # read csv from oss to a dataframe, show the table cvs_file = sys.argv[1] df = spark.read.csv(cvs_file, mode="DROPMALFORMED", inferSchema=True, header=True) # print schema and data to the console df.printSchema() df.show() # create an udf taxCut = udf(lambda salary: func.tax(salary), FloatType()) # cut tax from salary and show result df.select("name", taxCut("salary").alias("final salary")).show() spark.stop() -
將
example.py程式上傳到OSS中。具體操作,請參見 控制台上傳檔案 。 -
進入Spark開發編輯器。
-
登入 雲原生資料倉儲AnalyticDB MySQL控制台 ,在左上方選擇叢集所在地區。在左側導覽列,單擊 集群清單 ,然後單擊目的地組群ID。
-
在左側導覽列,單擊 作業開發 > Spark Jar 開發 。
-
-
在編輯器視窗上方,選擇Job型資源群組和Spark作業類型。本文以Batch類型為例。
-
在編輯器中執行以下作業內容。
{ "name": "Spark Python", "file": "oss://testBucketName/example.py", "pyFiles": ["oss://testBucketName/tools.zip"], "args": [ "oss://testBucketName/staff.csv" "conf": { "spark.driver.resourceSpec": "small", "spark.executor.instances": 2, "spark.executor.resourceSpec": "small" }參數說明:
-
file : Python 程式所在的OSS路徑。
-
pyFiles : PySpark依賴的Python檔案所在的OSS路徑,尾碼為zip。多個壓縮包使用英文逗號(,)分隔。
說明PySpark應用所依賴的所有Python檔案必須儲存在OSS中。
-
args :使用JAR包時需要使用的參數。本文為
staff.csv樣本資料所在的OSS路徑。
更多參數,請參見 參數說明 。
-
使用Virtual Environments打包依賴環境
開發Python作業時,如果您遇到複雜的依賴環境,可以通過Python的Virtual Environments技術進行環境管理和隔離。 AnalyticDB for MySQL Spark支援使用Virtual Environments將本地依賴的環境打包並上傳到OSS中。關於Virtual Environments的更多資訊,請參見 Python官方社區文檔 。
AnalyticDB for MySQL Spark使用的glibc-devel版本為2.28,若Virtual Environments不相容2.28版本,PySpark任務可能無法正常執行。
使用方法
使用Virtual Environments打包Python環境,需將壓縮包上傳至OSS中,並在提交Spark作業時配置相關參數,以指定Python環境壓縮包所在的OSS路徑和使用的Python解譯器的本地路徑。
-
指定Python環境壓縮包所在的OSS路徑:
-
若Python環境的壓縮包較小,您可配置
archives參數。 -
若Python環境的壓縮包較大,您可配置
spark.executorEnv.ADB_SPARK_DOWNLOAD_FILES和spark.kubernetes.driverEnv.ADB_SPARK_DOWNLOAD_FILES參數。
-
-
指定使用的Python解譯器的本地路徑:
spark.pyspark.python參數。
樣本
-
準備Linux環境。
Virtual Environments需在Linux作業系統中打包Python環境,您可以通過以下三種方式準備Linux環境。本文以購買阿里雲ECS執行個體為例。
-
購買作業系統為Centos 7或AnolisOS 8的阿里雲ECS執行個體。具體操作,請參見 自訂購買執行個體 。
-
在本地安裝Centos 7或者AnolisOS 8以上版本的作業系統。
-
使用Centos或AnolisOS的官方Docker鏡像,在鏡像內部打包Python環境。
-
-
使用Virtual Environments打包Python運行環境,並將壓縮包上傳至OSS中。
使用Virtualenv或Conda打包專案依賴的Python環境, 打包時可自訂Python的版本。 此處以Virtualenv打包為例。
# Create directory venv at current path with python3 # MUST ADD --copies ! virtualenv --copies --download --python python3.7 venv # active environment source venv/bin/activate # install third party modules pip install scikit-spark==0.4.0 # check the result pip list # compress the environment tar -czvf venv.tar.gz venv說明如果您想通過 Conda 打包專案依賴,請參見 Conda管理虛擬環境 。
-
進入Spark開發編輯器。
-
登入 雲原生資料倉儲AnalyticDB MySQL控制台 ,在左上方選擇叢集所在地區。在左側導覽列,單擊 集群清單 ,然後單擊目的地組群ID。
-
在左側導覽列,單擊 作業開發 > Spark Jar 開發 。
-
-
在編輯器視窗上方,選擇Job型資源群組和Spark作業類型。本文以Batch類型為例。
-
在編輯器中執行以下作業內容。
{ "name": "venv example", "archives": [ "oss://testBucketname/venv.tar.gz#PY3" "conf": { "spark.driver.resourceSpec": "small", "spark.executor.instances": 1, "spark.pyspark.python": "./PY3/venv/bin/python3", "spark.executor.resourceSpec": "small" "file": "oss://testBucketname/example.py" }或
說明Python環境的壓縮包過大時,可參考如下代碼。
{ "name": "venv example", "conf": { "spark.executorEnv.ADB_SPARK_DOWNLOAD_FILES": "oss://testBucketname/venv_py36.tar.gz#PY3", "spark.kubernetes.driverEnv.ADB_SPARK_DOWNLOAD_FILES": "oss://atestBucketname/venv_py36.tar.gz#PY3,", "spark.driver.resourceSpec": "small", "spark.executor.instances": 1, "spark.pyspark.python": "./PY3/venv/bin/python3", "spark.executor.resourceSpec": "small" "file": "oss://testBucketname/example.py" }參數說明:
-
archives :Python環境壓縮包所在的OSS路徑。本文樣本為
venv.tar.gz壓縮包所在的OSS路徑。 -
spark.executorEnv.ADB_SPARK_DOWNLOAD_FILES :Spark Executor節點參數,用於指定Python環境壓縮包所在的OSS路徑。
-
spark.kubernetes.driverEnv.ADB_SPARK_DOWNLOAD_FILES :Spark Driver節點參數,用於指定Python環境壓縮包所在的OSS路徑。
-
spark.pyspark.python :指定要使用的Python解譯器的本地路徑。
其他參數,請參見 參數說明 。
-