本文提供了可在 Databricks 资产包中向 Azure Databricks 作业添加的各种类型任务的示例 。 请参阅 什么是 Databricks 资产捆绑包?

大多数作业任务类型在其支持的设置中都具有特定于任务的参数,但你也可以定义传递给任务的 作业参数 。 作业参数支持动态值引用,从而支持在任务之间传递特定于作业运行的值。 请参阅 什么是动态值引用?

可以替代作业任务设置。 请参阅 替代 Databricks 资产捆绑包中的作业任务设置

若要使用 Databricks CLI 快速生成现有作业的资源配置,可以使用 bundle generate job 命令。 请参阅 捆绑包命令

笔记本任务

使用此任务运行笔记本。

以下示例将笔记本任务添加到作业,并设置名为 my_job_run_id 的作业参数。 要部署的笔记本的路径是相对于声明了此任务的配置文件而言。 该任务从其在 Azure Databricks 工作区中的部署位置获取笔记本。 (为简洁起见,用省略号表示省略的内容。)

# ...
resources:
  jobs:
    my-notebook-job:
      name: my-notebook-job
      # ...
      tasks:
        - task_key: my-notebook-task
          notebook_task:
            notebook_path: ./my-notebook.ipynb
      parameters:
        - name: my_job_run_id
          default: "{{job.run_id}}"
        # ...
# ...

有关可为每个任务设置的额外映射,请参见创建作业操作请求负载(如 REST API 参考中的 POST /api/2.1/jobs/create 中所定义)中的 tasks > notebook_task,以 YAML 格式表示。 请参阅作业的笔记本任务

Python 脚本任务

使用此任务运行 Python 文件。

以下示例向作业添加 Python 脚本任务。 要部署的 Python 文件的路径是相对于声明了此任务的配置文件而言。 该任务从其在 Azure Databricks 工作区中的部署位置获取 Python 文件。 (为简洁起见,用省略号表示省略的内容。)

# ...
resources:
  jobs:
    my-python-script-job:
      name: my-python-script-job
      # ...
      tasks:
        - task_key: my-python-script-task
          spark_python_task:
            python_file: ./my-script.py
          # ...
# ...

有关可为每个任务设置的额外映射,请参见创建作业操作请求负载(如 REST API 参考中的 POST /api/2.1/jobs/create 中所定义)中的 tasks > spark_python_task,以 YAML 格式表示。 另请参阅作业的 Python 脚本任务

Python wheel 任务

使用此任务可运行 Python wheel 文件。

以下示例向作业添加 Python wheel 任务。 要部署的 Python wheel 文件的路径是相对于声明了此任务的配置文件而言。 请参阅 Databricks 资产捆绑包的库依赖项。 (为简洁起见,用省略号表示省略的内容。)

# ...
resources:
  jobs:
    my-python-wheel-job:
      name: my-python-wheel-job
      # ...
      tasks:
        - task_key: my-python-wheel-task
          python_wheel_task:
            entry_point: run
            package_name: my_package
          libraries:
            - whl: ./my_package/dist/my_package-*.whl
          # ...
# ...

有关可为每个任务设置的额外映射,请参见创建作业操作请求负载(如 REST API 参考中的 POST /api/2.1/jobs/create 中所定义)中的 tasks > python_wheel_task,以 YAML 格式表示。 另请参阅使用 Databricks 资产捆绑包开发 Python Wheel 文件作业的 Python Wheel 任务

JAR 任务

使用此任务运行 JAR。 可以引用本地 JAR 库或工作区、Unity Catalog 卷或外部云存储位置中的 JAR 库。 请参阅 Databricks 资产捆绑包的库依赖项

以下示例向作业添加 JAR 任务。 JAR 的路径是相对于指定卷的位置而言。 (为简洁起见,用省略号表示省略的内容。)

# ...
resources:
  jobs:
    my-jar-job:
      name: my-jar-job
      # ...
      tasks:
        - task_key: my-jar-task
          spark_jar_task:
            main_class_name: org.example.com.Main
          libraries:
            - jar: /Volumes/main/default/my-volume/my-project-0.1.0-SNAPSHOT.jar
          # ...
# ...

有关可为每个任务设置的额外映射,请参见创建作业操作请求负载(如 REST API 参考中的 POST /api/2.1/jobs/create 中所定义)中的 tasks > spark_jar_task,以 YAML 格式表示。 请参阅作业的 JAR 任务

SQL 文件任务

使用此任务运行位于工作区或远程 Git 存储库中的 SQL 文件。

以下示例向作业添加 SQL 文件任务。 此 SQL 文件任务使用指定的 SQL 仓库来运行指定的 SQL 文件。 (为简洁起见,用省略号表示省略的内容。)

# ...
resources:
  jobs:
    my-sql-file-job:
      name: my-sql-file-job
      # ...
      tasks:
        - task_key: my-sql-file-task
          sql_task:
            file:
              path: /Users/someone@example.com/hello-world.sql
              source: WORKSPACE
            warehouse_id: 1a111111a1111aa1
          # ...
# ...

若要获取 SQL 仓库的 ID,请打开 SQL 仓库的设置页,然后复制“概览”选项卡上“名称”字段中仓库名称后面括号中的 ID。

有关可为每个任务设置的额外映射,请参见创建作业操作请求负载(如 REST API 参考中的 POST /api/2.1/jobs/create 中所定义)中的 tasks > sql_task > file,以 YAML 格式表示。 请参阅作业的 SQL 任务

增量实时表管道任务

使用此任务运行增量实时表管道。 请参阅什么是增量实时表?

以下示例向作业添加增量实时表管道任务。 此增量实时表管道任务运行指定的管道。 (为简洁起见,用省略号表示省略的内容。)

# ...
resources:
  jobs:
    my-pipeline-job:
      name: my-pipeline-job
      # ...
      tasks:
        - task_key: my-pipeline-task
          pipeline_task:
            pipeline_id: 11111111-1111-1111-1111-111111111111
          # ...
# ...

可以通过以下方法来获取管道的 ID:在工作区中打开管道,并在管道设置页的“管道详细信息”选项卡上复制“管道 ID”值。

有关可为每个任务设置的额外映射,请参见创建作业操作请求负载(如 REST API 参考中的 POST /api/2.1/jobs/create 中所定义)中的 tasks > pipeline_task,以 YAML 格式表示。 请参阅作业的 Delta Live Tables 管道任务

dbt 任务

使用此任务运行一个或多个 dbt 命令。 请参阅连接到 dbt Cloud

以下示例向作业添加 dbt 任务。 此 dbt 任务使用指定的 SQL 仓库来运行指定的 dbt 命令。

# ...
resources:
  jobs:
    my-dbt-job:
      name: my-dbt-job
      # ...
      tasks:
        - task_key: my-dbt-task
          dbt_task:
            commands:
              - "dbt deps"
              - "dbt seed"
              - "dbt run"
            project_directory: /Users/someone@example.com/Testing
            warehouse_id: 1a111111a1111aa1
          libraries:
            - pypi:
                package: "dbt-databricks>=1.0.0,<2.0.0"
          # ...
# ...

若要获取 SQL 仓库的 ID,请打开 SQL 仓库的设置页,然后复制“概览”选项卡上“名称”字段中仓库名称后面括号中的 ID。

有关可为每个任务设置的额外映射,请参见创建作业操作请求负载(如 REST API 参考中的 POST /api/2.1/jobs/create 中所定义)中的 tasks > dbt_task,以 YAML 格式表示。 请参阅作业的 dbt 任务

Databricks 资产捆绑包还包含一个 dbt-sql 项目模板,可用于定义具有 dbt 任务的作业,以及已部署 dbt 作业的 dbt 配置文件。 有关 Databricks 资产捆绑包模板的信息,请参阅使用默认捆绑包模板

运行作业任务

使用此任务运行另一个作业。

以下示例在第二个作业中包含了运行第一个作业的运行作业任务。

# ...
resources:
  jobs:
    my-first-job:
      name: my-first-job
      tasks:
        - task_key: my-first-job-task
          new_cluster:
            spark_version: "13.3.x-scala2.12"
            node_type_id: "i3.xlarge"
            num_workers: 2
          notebook_task:
            notebook_path: ./src/test.py
    my_second_job:
      name: my-second-job
      tasks:
        - task_key: my-second-job-task
          run_job_task:
            job_id: ${resources.jobs.my-first-job.id}
  # ...

此示例使用替换来检索要运行的作业的 ID。 若要从 UI 获取作业的 ID,请在工作区中打开该作业,并从作业设置页的“作业详细信息”选项卡中的“作业 ID”值复制 ID。

有关可为每个任务设置的额外映射,请参见创建作业操作请求负载(如 REST API 参考中的 POST /api/2.1/jobs/create 中所定义)中的 tasks > run_job_task,以 YAML 格式表示。