相关文章推荐
乖乖的砖头  ·  2021-12-07 ...·  6 月前    · 
憨厚的钥匙  ·  0028 ...·  1 年前    · 

了解如何训练机器学习模型,以根据各种因素(例如上车和下车地点、距离、日期、时间、乘客数量和费率代码)预测纽约市黄色出租车行程的总行程持续时间 (du) ration。 训练模型后,可以注册已训练的模型,并使用 Fabric 与 MLflow 框架的本机集成来记录使用的超参数和评估指标。

Microsoft Fabric 目前为 预览版

MLflow 是一个开源平台,用于管理端到端机器学习生命周期,其中包含用于跟踪试验、打包 ML 模型和项以及模型注册表的功能。 有关详细信息,请参阅 MLflow

在本教程中,你将从 lakehouse delta 表加载已清理和准备的数据,并使用它来训练回归模型来预测 tripDuration 变量。 你还将使用 Fabric MLflow 集成来创建和跟踪试验,并注册已训练的模型、模型超参数和指标。

  • Microsoft Fabric 订阅 。 或者注册免费的 Microsoft Fabric (预览版) 试用版

  • 登录到 Microsoft Fabric

  • 完成 第 1 部分:使用 Apache Spark 将数据引入 Microsoft Fabric lakehouse

  • (可选)完成 第 2 部分:使用 Microsoft Fabric 笔记本浏览和可视化数据 ,了解有关数据的详细信息。

  • 完成 第 3 部分:使用 Apache Spark 执行数据清理和准备

    在笔记本中继续操作

    04-train-and-track-machine-learning-models.ipynb 是本教程随附的笔记本。

    若要打开本教程随附的笔记本,请按照 为数据科学准备系统 中的说明操作,将教程笔记本导入工作区。

    或者,如果想要复制/粘贴此页面中的代码,请 创建新的笔记本

    在开始运行代码之前 ,请务必将 lakehouse 附加到笔记本

    训练和注册模型

  • 在第一步中,我们导入 MLflow 库并创建名为 nyctaxi_tripduration 的试验,以记录作为训练过程的一部分生成的运行和模型。

    # Create Experiment to Track and register model with mlflow
    import mlflow
    print(f"mlflow lbrary version: {mlflow.__version__}")
    EXPERIMENT_NAME = "nyctaxi_tripduration"
    mlflow.set_experiment(EXPERIMENT_NAME)
    
  • 从 lakehouse delta 表 nyctaxi_prep读取已清理和准备的数据,并从数据 (创建小数随机样本,以减少本教程) 计算时间。

    SEED = 1234
    # note: From the perspective of the tutorial, we are sampling training data to speed up the execution.
    training_df = spark.read.format("delta").load("Tables/nyctaxi_prep").sample(fraction = 0.5, seed = SEED)
    

    机器学习中的种子是一个值,用于确定伪随机数生成器的初始状态。 种子用于确保机器学习试验的结果可重现。 通过使用相同的种子,可以获取相同的数字序列,从而获得相同的结果,用于数据拆分、模型训练和涉及随机性的其他任务。

  • 执行随机拆分,通过执行以下一组命令来获取训练和测试数据集,并定义分类和数字特征。 我们还缓存训练和测试数据帧,以提高下游进程的速度。

    TRAIN_TEST_SPLIT = [0.75, 0.25]
    train_df, test_df = training_df.randomSplit(TRAIN_TEST_SPLIT, seed=SEED)
    # Cache the dataframes to improve the speed of repeatable reads
    train_df.cache()
    test_df.cache()
    print(f"train set count:{train_df.count()}")
    print(f"test set count:{test_df.count()}")
    categorical_features = ["storeAndFwdFlag","timeBins","vendorID","weekDayName","pickupHour","rateCodeId","paymentType"]
    numeric_features = ['passengerCount', "tripDistance"]
    

    Apache Spark 缓存是一项功能,可用于将中间数据存储在内存或磁盘中,并将其重复用于多个查询或操作。 缓存可以避免重新处理经常访问的数据,从而提高 Spark 应用程序的性能和效率。 可以根据需求和资源使用不同的方法和存储级别来缓存数据。 缓存对于需要重复访问相同数据的迭代算法或交互式分析特别有用。

  • 在此步骤中,我们将定义使用 Spark ML 管道和 Microsoft SynapseML 库执行更多特征工程和训练模型的步骤。 本教程使用的算法 LightGBM 是基于决策树算法的快速分布式高性能梯度提升框架。 它是 Microsoft 开发的开源项目,支持回归、分类和许多其他机器学习方案。 其main优点包括更快的训练速度、更低的内存使用量、更好的准确性和支持分布式学习。

    from pyspark.ml.feature import OneHotEncoder, VectorAssembler, StringIndexer
    from pyspark.ml import Pipeline
    from synapse.ml.core.platform import *
    from synapse.ml.lightgbm import LightGBMRegressor
    # Define a pipeline steps for training a LightGBMRegressor regressor model
    def lgbm_pipeline(categorical_features,numeric_features, hyperparameters):
       # String indexer
       stri = StringIndexer(inputCols=categorical_features, 
                            outputCols=[f"{feat}Idx" for feat in categorical_features]).setHandleInvalid("keep")
       # encode categorical/indexed columns
       ohe = OneHotEncoder(inputCols= stri.getOutputCols(),  
                            outputCols=[f"{feat}Enc" for feat in categorical_features])
       # convert all feature columns into a vector
       featurizer = VectorAssembler(inputCols=ohe.getOutputCols() + numeric_features, outputCol="features")
       # Define the LightGBM regressor
       lgr = LightGBMRegressor(
          objective = hyperparameters["objective"],
          alpha = hyperparameters["alpha"],
          learningRate = hyperparameters["learning_rate"],
          numLeaves = hyperparameters["num_leaves"],
          labelCol="tripDuration",
          numIterations = hyperparameters["iterations"],
       # Define the steps and sequence of the SPark ML pipeline
       ml_pipeline = Pipeline(stages=[stri, ohe, featurizer, lgr])
       return ml_pipeline
    
  • 通过执行以下单元格,将训练超参数定义为初始运行 lightgbm 模型的 python 字典。

    # Default hyperparameters for LightGBM Model
    LGBM_PARAMS = {"objective":"regression",
       "alpha":0.9,
       "learning_rate":0.1,
       "num_leaves":31,
       "iterations":100}
    

    超参数是可以更改的参数,用于控制机器学习模型的训练方式。 超参数可能会影响模型的速度、质量和准确性。 查找最佳超参数的一些常见方法是测试不同的值、使用网格或随机搜索,或者使用更高级的优化技术。 本教程中 LightGBM 模型的超参数已使用分布式网格搜索进行预优化, (本教程未介绍) 使用 hyperopt 库运行。

  • 接下来,我们使用 MLflow 在定义的试验中创建一个新运行,并在训练数据帧上拟合定义的管道。 然后,我们使用以下一组命令对测试数据集生成预测。

    if mlflow.active_run() is None:
       mlflow.start_run()
    run = mlflow.active_run()
    print(f"Active experiment run_id: {run.info.run_id}")
    lg_pipeline = lgbm_pipeline(categorical_features,numeric_features,LGBM_PARAMS)
    lg_model = lg_pipeline.fit(train_df)
    # Get Predictions
    lg_predictions = lg_model.transform(test_df)
    ## Caching predictions to run model evaluation faster
    lg_predictions.cache()
    print(f"Prediction run for {lg_predictions.count()} samples")
    
  • 训练模型并在测试集上生成预测后,即可使用 SynapseML 库实用工具 ComputeModelStatistics 计算模型统计信息,以评估训练的 LightGBMRegressor 模型的性能,这有助于基于算法评估各种类型的模型。 生成指标后,我们还将其转换为 python 字典对象以进行日志记录。 评估回归模型的指标包括 MSE (均方误差) 、RMSE (均方根误差) 、R^2 和 MAE (均方误差) 。

    from synapse.ml.train import ComputeModelStatistics
    lg_metrics = ComputeModelStatistics(
       evaluationMetric="regression", labelCol="tripDuration", scoresCol="prediction"
    ).transform(lg_predictions) 
    display(lg_metrics)
    

    输出值类似于下表:

    mean_squared_error root_mean_squared_error mean_absolute_error
  • 接下来,我们定义一个常规函数,以使用 MLflow 在创建的试验下使用默认超参数注册已训练的 LightGBMRegressor 模型。 我们还记录在试验运行中用于模型评估的相关超参数和指标,最后终止 MLflow 试验中的运行。

    from mlflow.models.signature import ModelSignature 
    from mlflow.types.utils import _infer_schema 
    # Define a function to register a spark model
    def register_spark_model(run, model, model_name,signature,metrics, hyperparameters):
          # log the model, parameters and metrics
          mlflow.spark.log_model(model, artifact_path = model_name, signature=signature, registered_model_name = model_name, dfs_tmpdir="Files/tmp/mlflow") 
          mlflow.log_params(hyperparameters) 
          mlflow.log_metrics(metrics) 
          model_uri = f"runs:/{run.info.run_id}/{model_name}" 
          print(f"Model saved in run{run.info.run_id}") 
          print(f"Model URI: {model_uri}")
          return model_uri
    # Define Signature object 
    sig = ModelSignature(inputs=_infer_schema(train_df.select(categorical_features + numeric_features)), 
                         outputs=_infer_schema(train_df.select("tripDuration"))) 
    ALGORITHM = "lightgbm" 
    model_name = f"{EXPERIMENT_NAME}_{ALGORITHM}"
    # Create a 'dict' object that contains values of metrics
    lg_metrics_dict = json.loads(lg_metrics.toJSON().first())
    # Call model register function
    model_uri = register_spark_model(run = run,
                                  model = lg_model, 
                                  model_name = model_name, 
                                  signature = sig, 
                                  metrics = lg_metrics_dict, 
                                  hyperparameters = LGBM_PARAMS)
    mlflow.end_run()
    
  • 训练并注册默认模型后,我们将定义优化超参数 (本教程中未涵盖的优化超参数) 并删除 paymentType 分类功能。 由于 paymentType 通常是在行程结束时选择的,因此我们假设预测行程持续时间应该没有用。

    # Tuned hyperparameters for LightGBM Model
    TUNED_LGBM_PARAMS = {"objective":"regression",
       "alpha":0.08373361416254149,
       "learning_rate":0.0801709918703746,
       "num_leaves":92,
       "iterations":200}
    # Remove paymentType
    categorical_features.remove("paymentType") 
    
  • 定义新的超参数并更新功能列表后,我们在训练数据帧上调整了优化超参数的 LightGBM 管道,并针对测试数据集生成预测。

    if mlflow.active_run() is None:
       mlflow.start_run()
    run = mlflow.active_run()
    print(f"Active experiment run_id: {run.info.run_id}")
    lg_pipeline_tn = lgbm_pipeline(categorical_features,numeric_features,TUNED_LGBM_PARAMS)
    lg_model_tn = lg_pipeline_tn.fit(train_df)
    # Get Predictions
    lg_predictions_tn = lg_model_tn.transform(test_df)
    ## Caching predictions to run model evaluation faster
    lg_predictions_tn.cache()
    print(f"Prediction run for {lg_predictions_tn.count()} samples")
    
  • 使用优化的超参数和更新的功能为新的 LightGBM 回归模型生成模型评估指标。

    lg_metrics_tn = ComputeModelStatistics(
       evaluationMetric="regression", labelCol="tripDuration", scoresCol="prediction"
    ).transform(lg_predictions_tn)
    display(lg_metrics_tn)
    
  • 在最后一步中,我们将注册第二个 LightGBM 回归模型,并将指标和超参数记录到 MLflow 试验并结束运行。

    # Define Signature object 
    sig_tn = ModelSignature(inputs=_infer_schema(train_df.select(categorical_features + numeric_features)), 
                         outputs=_infer_schema(train_df.select("tripDuration")))
    # Create a 'dict' object that contains values of metrics
    lg_metricstn_dict = json.loads(lg_metrics_tn.toJSON().first())
    model_uri = register_spark_model(run = run,
                                  model = lg_model_tn, 
                                  model_name = model_name, 
                                  signature = sig_tn, 
                                  metrics = lg_metricstn_dict, 
                                  hyperparameters = TUNED_LGBM_PARAMS)
    mlflow.end_run()
    

    输出值类似于下表:

    mean_squared_error root_mean_squared_error mean_absolute_error

    在本教程结束时,我们在 MLflow 模型注册表中训练并注册了两个 lightgbm 回归模型运行,该模型在工作区中也作为 Fabric 模型项提供。

    若要在 UI 中查看模型,请执行以下操作:

  • 导航到当前处于活动状态的 Fabric 工作区。

  • 选择名为 nyctaxi_tripduration_lightgbm 的模型项以打开模型 UI。

    如果在列表中看不到模型项,请刷新浏览器。

  • 在模型 UI 上,可以查看给定运行的属性和指标、比较各种运行的性能,以及下载与已训练的模型关联的各种文件项。

    下图显示了 Fabric 工作区中模型 UI 中各种功能的布局。

  •