了解如何训练机器学习模型,以根据各种因素(例如上车和下车地点、距离、日期、时间、乘客数量和费率代码)预测纽约市黄色出租车行程的总行程持续时间 (du) ration。 训练模型后,可以注册已训练的模型,并使用 Fabric 与 MLflow 框架的本机集成来记录使用的超参数和评估指标。
MLflow 是一个开源平台,用于管理端到端机器学习生命周期,其中包含用于跟踪试验、打包 ML 模型和项以及模型注册表的功能。 有关详细信息,请参阅
MLflow
。
在第一步中,我们导入 MLflow 库并创建名为
nyctaxi_tripduration
的试验,以记录作为训练过程的一部分生成的运行和模型。
# Create Experiment to Track and register model with mlflow
import mlflow
print(f"mlflow lbrary version: {mlflow.__version__}")
EXPERIMENT_NAME = "nyctaxi_tripduration"
mlflow.set_experiment(EXPERIMENT_NAME)
从 lakehouse delta 表 nyctaxi_prep读取已清理和准备的数据,并从数据 (创建小数随机样本,以减少本教程) 计算时间。
SEED = 1234
# note: From the perspective of the tutorial, we are sampling training data to speed up the execution.
training_df = spark.read.format("delta").load("Tables/nyctaxi_prep").sample(fraction = 0.5, seed = SEED)
机器学习中的种子是一个值,用于确定伪随机数生成器的初始状态。 种子用于确保机器学习试验的结果可重现。 通过使用相同的种子,可以获取相同的数字序列,从而获得相同的结果,用于数据拆分、模型训练和涉及随机性的其他任务。
执行随机拆分,通过执行以下一组命令来获取训练和测试数据集,并定义分类和数字特征。 我们还缓存训练和测试数据帧,以提高下游进程的速度。
TRAIN_TEST_SPLIT = [0.75, 0.25]
train_df, test_df = training_df.randomSplit(TRAIN_TEST_SPLIT, seed=SEED)
# Cache the dataframes to improve the speed of repeatable reads
train_df.cache()
test_df.cache()
print(f"train set count:{train_df.count()}")
print(f"test set count:{test_df.count()}")
categorical_features = ["storeAndFwdFlag","timeBins","vendorID","weekDayName","pickupHour","rateCodeId","paymentType"]
numeric_features = ['passengerCount', "tripDistance"]
Apache Spark 缓存是一项功能,可用于将中间数据存储在内存或磁盘中,并将其重复用于多个查询或操作。 缓存可以避免重新处理经常访问的数据,从而提高 Spark 应用程序的性能和效率。 可以根据需求和资源使用不同的方法和存储级别来缓存数据。 缓存对于需要重复访问相同数据的迭代算法或交互式分析特别有用。
在此步骤中,我们将定义使用 Spark ML 管道和 Microsoft SynapseML 库执行更多特征工程和训练模型的步骤。 本教程使用的算法 LightGBM 是基于决策树算法的快速分布式高性能梯度提升框架。 它是 Microsoft 开发的开源项目,支持回归、分类和许多其他机器学习方案。 其main优点包括更快的训练速度、更低的内存使用量、更好的准确性和支持分布式学习。
from pyspark.ml.feature import OneHotEncoder, VectorAssembler, StringIndexer
from pyspark.ml import Pipeline
from synapse.ml.core.platform import *
from synapse.ml.lightgbm import LightGBMRegressor
# Define a pipeline steps for training a LightGBMRegressor regressor model
def lgbm_pipeline(categorical_features,numeric_features, hyperparameters):
# String indexer
stri = StringIndexer(inputCols=categorical_features,
outputCols=[f"{feat}Idx" for feat in categorical_features]).setHandleInvalid("keep")
# encode categorical/indexed columns
ohe = OneHotEncoder(inputCols= stri.getOutputCols(),
outputCols=[f"{feat}Enc" for feat in categorical_features])
# convert all feature columns into a vector
featurizer = VectorAssembler(inputCols=ohe.getOutputCols() + numeric_features, outputCol="features")
# Define the LightGBM regressor
lgr = LightGBMRegressor(
objective = hyperparameters["objective"],
alpha = hyperparameters["alpha"],
learningRate = hyperparameters["learning_rate"],
numLeaves = hyperparameters["num_leaves"],
labelCol="tripDuration",
numIterations = hyperparameters["iterations"],
# Define the steps and sequence of the SPark ML pipeline
ml_pipeline = Pipeline(stages=[stri, ohe, featurizer, lgr])
return ml_pipeline
通过执行以下单元格,将训练超参数定义为初始运行 lightgbm 模型的 python 字典。
# Default hyperparameters for LightGBM Model
LGBM_PARAMS = {"objective":"regression",
"alpha":0.9,
"learning_rate":0.1,
"num_leaves":31,
"iterations":100}
超参数是可以更改的参数,用于控制机器学习模型的训练方式。 超参数可能会影响模型的速度、质量和准确性。 查找最佳超参数的一些常见方法是测试不同的值、使用网格或随机搜索,或者使用更高级的优化技术。 本教程中 LightGBM 模型的超参数已使用分布式网格搜索进行预优化, (本教程未介绍) 使用 hyperopt 库运行。
接下来,我们使用 MLflow 在定义的试验中创建一个新运行,并在训练数据帧上拟合定义的管道。 然后,我们使用以下一组命令对测试数据集生成预测。
if mlflow.active_run() is None:
mlflow.start_run()
run = mlflow.active_run()
print(f"Active experiment run_id: {run.info.run_id}")
lg_pipeline = lgbm_pipeline(categorical_features,numeric_features,LGBM_PARAMS)
lg_model = lg_pipeline.fit(train_df)
# Get Predictions
lg_predictions = lg_model.transform(test_df)
## Caching predictions to run model evaluation faster
lg_predictions.cache()
print(f"Prediction run for {lg_predictions.count()} samples")
训练模型并在测试集上生成预测后,即可使用 SynapseML 库实用工具 ComputeModelStatistics 计算模型统计信息,以评估训练的 LightGBMRegressor 模型的性能,这有助于基于算法评估各种类型的模型。 生成指标后,我们还将其转换为 python 字典对象以进行日志记录。 评估回归模型的指标包括 MSE (均方误差) 、RMSE (均方根误差) 、R^2 和 MAE (均方误差) 。
from synapse.ml.train import ComputeModelStatistics
lg_metrics = ComputeModelStatistics(
evaluationMetric="regression", labelCol="tripDuration", scoresCol="prediction"
).transform(lg_predictions)
display(lg_metrics)
输出值类似于下表:
mean_squared_error
root_mean_squared_error
mean_absolute_error
接下来,我们定义一个常规函数,以使用 MLflow 在创建的试验下使用默认超参数注册已训练的 LightGBMRegressor 模型。 我们还记录在试验运行中用于模型评估的相关超参数和指标,最后终止 MLflow 试验中的运行。
from mlflow.models.signature import ModelSignature
from mlflow.types.utils import _infer_schema
# Define a function to register a spark model
def register_spark_model(run, model, model_name,signature,metrics, hyperparameters):
# log the model, parameters and metrics
mlflow.spark.log_model(model, artifact_path = model_name, signature=signature, registered_model_name = model_name, dfs_tmpdir="Files/tmp/mlflow")
mlflow.log_params(hyperparameters)
mlflow.log_metrics(metrics)
model_uri = f"runs:/{run.info.run_id}/{model_name}"
print(f"Model saved in run{run.info.run_id}")
print(f"Model URI: {model_uri}")
return model_uri
# Define Signature object
sig = ModelSignature(inputs=_infer_schema(train_df.select(categorical_features + numeric_features)),
outputs=_infer_schema(train_df.select("tripDuration")))
ALGORITHM = "lightgbm"
model_name = f"{EXPERIMENT_NAME}_{ALGORITHM}"
# Create a 'dict' object that contains values of metrics
lg_metrics_dict = json.loads(lg_metrics.toJSON().first())
# Call model register function
model_uri = register_spark_model(run = run,
model = lg_model,
model_name = model_name,
signature = sig,
metrics = lg_metrics_dict,
hyperparameters = LGBM_PARAMS)
mlflow.end_run()
训练并注册默认模型后,我们将定义优化超参数 (本教程中未涵盖的优化超参数) 并删除 paymentType 分类功能。 由于 paymentType 通常是在行程结束时选择的,因此我们假设预测行程持续时间应该没有用。
# Tuned hyperparameters for LightGBM Model
TUNED_LGBM_PARAMS = {"objective":"regression",
"alpha":0.08373361416254149,
"learning_rate":0.0801709918703746,
"num_leaves":92,
"iterations":200}
# Remove paymentType
categorical_features.remove("paymentType")
定义新的超参数并更新功能列表后,我们在训练数据帧上调整了优化超参数的 LightGBM 管道,并针对测试数据集生成预测。
if mlflow.active_run() is None:
mlflow.start_run()
run = mlflow.active_run()
print(f"Active experiment run_id: {run.info.run_id}")
lg_pipeline_tn = lgbm_pipeline(categorical_features,numeric_features,TUNED_LGBM_PARAMS)
lg_model_tn = lg_pipeline_tn.fit(train_df)
# Get Predictions
lg_predictions_tn = lg_model_tn.transform(test_df)
## Caching predictions to run model evaluation faster
lg_predictions_tn.cache()
print(f"Prediction run for {lg_predictions_tn.count()} samples")
使用优化的超参数和更新的功能为新的 LightGBM 回归模型生成模型评估指标。
lg_metrics_tn = ComputeModelStatistics(
evaluationMetric="regression", labelCol="tripDuration", scoresCol="prediction"
).transform(lg_predictions_tn)
display(lg_metrics_tn)
在最后一步中,我们将注册第二个 LightGBM 回归模型,并将指标和超参数记录到 MLflow 试验并结束运行。
# Define Signature object
sig_tn = ModelSignature(inputs=_infer_schema(train_df.select(categorical_features + numeric_features)),
outputs=_infer_schema(train_df.select("tripDuration")))
# Create a 'dict' object that contains values of metrics
lg_metricstn_dict = json.loads(lg_metrics_tn.toJSON().first())
model_uri = register_spark_model(run = run,
model = lg_model_tn,
model_name = model_name,
signature = sig_tn,
metrics = lg_metricstn_dict,
hyperparameters = TUNED_LGBM_PARAMS)
mlflow.end_run()
输出值类似于下表:
mean_squared_error
root_mean_squared_error
mean_absolute_error
在本教程结束时,我们在 MLflow 模型注册表中训练并注册了两个 lightgbm 回归模型运行,该模型在工作区中也作为 Fabric 模型项提供。
若要在 UI 中查看模型,请执行以下操作:
导航到当前处于活动状态的 Fabric 工作区。
选择名为 nyctaxi_tripduration_lightgbm 的模型项以打开模型 UI。
如果在列表中看不到模型项,请刷新浏览器。
在模型 UI 上,可以查看给定运行的属性和指标、比较各种运行的性能,以及下载与已训练的模型关联的各种文件项。
下图显示了 Fabric 工作区中模型 UI 中各种功能的布局。