本文介紹了如何開發 AnalyticDB for MySQL Spark Python作業,以及如何通過VirtualEnv技術打包Python作業的運行環境。

前提條件

PySpark的基本用法

  1. 編寫如下樣本程式,並將樣本程式儲存為 example.py

    from pyspark.sql import SparkSession
    if __name__ == "__main__":
        spark = SparkSession.builder.getOrCreate()
        df = spark.sql("SELECT 1+1")
        df.printSchema()
        df.show()
    
  2. example.py 程式上傳到OSS中。具體操作,請參見 控制台上傳檔案

  3. 進入Spark開發編輯器。

    1. 登入 雲原生資料倉儲AnalyticDB MySQL控制台 ,在左上方選擇叢集所在地區。在左側導覽列,單擊 集群清單 ,然後單擊目的地組群ID。

    2. 在左側導覽列,單擊 作業開發 > Spark Jar 開發

  4. 在編輯器視窗上方,選擇Job型資源群組和Spark作業類型。本文以Batch類型為例。

  5. 在編輯器中執行以下作業內容。

    {
     "name": "Spark Python Test",
     "file": "oss://testBucketName/example.py",
     "conf": {
     "spark.driver.resourceSpec": "small",
     "spark.executor.instances": 1,
     "spark.executor.resourceSpec": "small"
    }

    參數說明請參見 參數說明

使用Python依賴

使用方法

如果您使用自行開發或第三方開發的依賴開發Python程式時,需將使用的依賴上傳至OSS中,並在提交Spark作業時配置 pyFiles 參數。

樣本

本文樣本以引入自訂Function Compute員工的稅後收入為例。 樣本將資料檔案 staff.csv 上傳至OSS中。 staff.csv 中的樣本資料如下:

name,age,gender,salary
Lucky,25,male,100
Lucy,23,female,150
Martin,30,male,180
Rose,31,female,200
  1. 開發依賴並上傳至OSS中。

    1. 建立名為 tools 的檔案夾,並在該檔案夾下建立名為 func.py 的程式

      def tax(salary):
          convert string to int and cut 15% tax from the salary
          :param salary: The salary of staff worker
          :return:
          return 0.15 * int(salary)
      
    2. tools 檔案夾壓縮後上傳至 OSS中。本文樣本為 tools.zip

      說明

      如果依賴多個Python檔案,建議您使用gz壓縮包進行壓縮。您可以在Python代碼中以module方式引用Python檔案。

  2. 編寫名為 example.py 樣本程式。

    from __future__ import print_function
    from pyspark.sql import SparkSession
    from pyspark.sql.functions import udf
    from pyspark.sql.types import FloatType
    import sys
    # import third party file
    from tools import func
    if __name__ == "__main__":
        # init pyspark context
        spark = SparkSession.builder.appName("Python Example").getOrCreate()
        # read csv from oss to a dataframe, show the table
        cvs_file = sys.argv[1]
        df = spark.read.csv(cvs_file, mode="DROPMALFORMED", inferSchema=True, header=True)
        # print schema and data to the console
        df.printSchema()
        df.show()
        # create an udf
        taxCut = udf(lambda salary: func.tax(salary), FloatType())
        # cut tax from salary and show result
        df.select("name", taxCut("salary").alias("final salary")).show()
        spark.stop()
    
  3. example.py 程式上傳到OSS中。具體操作,請參見 控制台上傳檔案

  4. 進入Spark開發編輯器。

    1. 登入 雲原生資料倉儲AnalyticDB MySQL控制台 ,在左上方選擇叢集所在地區。在左側導覽列,單擊 集群清單 ,然後單擊目的地組群ID。

    2. 在左側導覽列,單擊 作業開發 > Spark Jar 開發

  5. 在編輯器視窗上方,選擇Job型資源群組和Spark作業類型。本文以Batch類型為例。

  6. 在編輯器中執行以下作業內容。

    {
     "name": "Spark Python",
     "file": "oss://testBucketName/example.py",
     "pyFiles": ["oss://testBucketName/tools.zip"],
     "args": [
     "oss://testBucketName/staff.csv"
     "conf": {
     "spark.driver.resourceSpec": "small",
     "spark.executor.instances": 2,
     "spark.executor.resourceSpec": "small"
    }

    參數說明:

    • file Python 程式所在的OSS路徑。

    • pyFiles PySpark依賴的Python檔案所在的OSS路徑,尾碼為zip。多個壓縮包使用英文逗號(,)分隔。

      說明

      PySpark應用所依賴的所有Python檔案必須儲存在OSS中。

    • args :使用JAR包時需要使用的參數。本文為 staff.csv 樣本資料所在的OSS路徑。

    更多參數,請參見 參數說明

使用Virtual Environments打包依賴環境

開發Python作業時,如果您遇到複雜的依賴環境,可以通過Python的Virtual Environments技術進行環境管理和隔離。 AnalyticDB for MySQL Spark支援使用Virtual Environments將本地依賴的環境打包並上傳到OSS中。關於Virtual Environments的更多資訊,請參見 Python官方社區文檔

重要

AnalyticDB for MySQL Spark使用的glibc-devel版本為2.28,若Virtual Environments不相容2.28版本,PySpark任務可能無法正常執行。

使用方法

使用Virtual Environments打包Python環境,需將壓縮包上傳至OSS中,並在提交Spark作業時配置相關參數,以指定Python環境壓縮包所在的OSS路徑和使用的Python解譯器的本地路徑。

  • 指定Python環境壓縮包所在的OSS路徑:

    • 若Python環境的壓縮包較小,您可配置 archives 參數。

    • 若Python環境的壓縮包較大,您可配置 spark.executorEnv.ADB_SPARK_DOWNLOAD_FILES spark.kubernetes.driverEnv.ADB_SPARK_DOWNLOAD_FILES 參數。

  • 指定使用的Python解譯器的本地路徑: spark.pyspark.python 參數。

樣本

  1. 準備Linux環境。

    Virtual Environments需在Linux作業系統中打包Python環境,您可以通過以下三種方式準備Linux環境。本文以購買阿里雲ECS執行個體為例。

    • 購買作業系統為Centos 7或AnolisOS 8的阿里雲ECS執行個體。具體操作,請參見 自訂購買執行個體

    • 在本地安裝Centos 7或者AnolisOS 8以上版本的作業系統。

    • 使用Centos或AnolisOS的官方Docker鏡像,在鏡像內部打包Python環境。

  2. 使用Virtual Environments打包Python運行環境,並將壓縮包上傳至OSS中。

    使用Virtualenv或Conda打包專案依賴的Python環境, 打包時可自訂Python的版本。 此處以Virtualenv打包為例。

    # Create directory venv at current path with python3
    # MUST ADD --copies !
    virtualenv --copies --download --python python3.7 venv
    # active environment
    source venv/bin/activate
    # install third party modules
    pip install scikit-spark==0.4.0
    # check the result
    pip list
    # compress the environment
    tar -czvf venv.tar.gz venv
    說明

    如果您想通過 Conda 打包專案依賴,請參見 Conda管理虛擬環境

  3. 進入Spark開發編輯器。

    1. 登入 雲原生資料倉儲AnalyticDB MySQL控制台 ,在左上方選擇叢集所在地區。在左側導覽列,單擊 集群清單 ,然後單擊目的地組群ID。

    2. 在左側導覽列,單擊 作業開發 > Spark Jar 開發

  4. 在編輯器視窗上方,選擇Job型資源群組和Spark作業類型。本文以Batch類型為例。

  5. 在編輯器中執行以下作業內容。

    {
     "name": "venv example",
     "archives": [
     "oss://testBucketname/venv.tar.gz#PY3"
     "conf": {
     "spark.driver.resourceSpec": "small",
     "spark.executor.instances": 1,
     "spark.pyspark.python": "./PY3/venv/bin/python3",
     "spark.executor.resourceSpec": "small"
     "file": "oss://testBucketname/example.py"
    }

    說明

    Python環境的壓縮包過大時,可參考如下代碼。

    {
     "name": "venv example",
     "conf": {
     "spark.executorEnv.ADB_SPARK_DOWNLOAD_FILES": "oss://testBucketname/venv_py36.tar.gz#PY3",
     "spark.kubernetes.driverEnv.ADB_SPARK_DOWNLOAD_FILES": "oss://atestBucketname/venv_py36.tar.gz#PY3,",
     "spark.driver.resourceSpec": "small",
     "spark.executor.instances": 1,
     "spark.pyspark.python": "./PY3/venv/bin/python3",
     "spark.executor.resourceSpec": "small"
     "file": "oss://testBucketname/example.py"
    }

    參數說明:

    • archives :Python環境壓縮包所在的OSS路徑。本文樣本為 venv.tar.gz 壓縮包所在的OSS路徑。

    • spark.executorEnv.ADB_SPARK_DOWNLOAD_FILES :Spark Executor節點參數,用於指定Python環境壓縮包所在的OSS路徑。

    • spark.kubernetes.driverEnv.ADB_SPARK_DOWNLOAD_FILES :Spark Driver節點參數,用於指定Python環境壓縮包所在的OSS路徑。

    • spark.pyspark.python :指定要使用的Python解譯器的本地路徑。

    其他參數,請參見 參數說明