将pytorch 模型嵌入到spark中进行大规模预测

虽然深度学习日益盛行,但目前spark还不支持深度学习算法。虽然也有相关库 sparktorch 能够将spark和pytorch结合起来,但是使用发现并非那么好用,而且此库目前活跃度较低,不方便debug。因此,本地训练深度学习模型并部署到spark中是一种有效的利用深度学习进行大规模预测的方法。

将pytorch模型嵌入部署到spark中进行大规模预测主要包括三步:

  • 利用spark进行特征工程预处理,以保证训练集和测试集特征处理一致;
  • 将训练集转化为pandas(或collect)来进行本地单机模型训练;
  • 将本地训练好的模型分发(broadcast)到集群的各个worker中来进行大规模预测。
  • 第一二步都比较简单,这里省去。主要对第三步进行说明。

    模型分发(broadcast)分两种情况,第一种是简单可通过 nn.Sequential 定义的模型。对于这种情况可以,模型可以直接用。如下:

    # 生成测试数据
    from sklearn.datasets import make_classification
    X, y = make_classification(n_samples=50000, n_features=100, random_state=0)
    df = pd.DataFrame(X)
    df['label'] = np.random.randint(2,size=(50000))
    df1 = spark.createDataFrame(df)
    df1 = df1.withColumn('features', Func.array([col(f"{i}") for i in range(0, 100)])).repartition(1000)
    # 创建模型并进行预测
    %spark2_1.pyspark
    import torch.nn as nn
    network = nn.Sequential(
        nn.Linear(100, 2560),
        nn.ReLU(),
        nn.Linear(2560, 2560),
        nn.ReLU(),
        nn.Linear(2560, 2)
        #nn.Softmax(dim=1)
    class network(nn.Module):
        def __init__(self):
            super(network, self).__init__()
            self.l1 = nn.Linear(100, 2560)
            self.l2 = nn.Linear(2560, 2560)
            self.l3 = nn.Linear(2560, 2)
        def forward(self, x):
            x = self.l1(x)
            x = self.l2(x)
            x = self.l3(x)
            return x
    net = network()
    bc_model_state = spark.sparkContext.broadcast(net.state_dict())
    def get_model_for_eval():
      # Broadcast the model state_dict
      net.load_state_dict(bc_model_state.value)
      net.eval()
      return net
    def one_row_predict(x):
        model = get_model_for_eval()
        t = torch.tensor(x, dtype=torch.float32)
        t = model(t).cpu().detach().numpy()
        #prediction = model(t).cpu().detach().item()
        # return prediction
        return list([float(i) for i in t])
    one_row_udf = udf(one_row_predict, ArrayType(FloatType()))
    df1 = df1.withColumn('pred_one_row', one_row_udf(col('features')))
    

    在上面我们定义了一个简单模型,然后将其直接分发进行预测(这里省去了模型训练过程)。

    但是当我们想使用一个比较复杂的模型来进行预测时(简单来讲就是不能使用nn.Sequential改写),使用上面的方法则会报错。

    这时候需要将模型写入一个文件中,假设模型文件的路径为/export/models/item2vec.py, 使用pyspark中的addFile对其进行分发,然后import导入模型。
    假设我们的模型文件/export/models/item2vec.py如下:

    class Item2vec(nn.Module):
        def __init__(self, cv_dict, csr_cols):
            super(Item2vec, self).__init__()
        def forward(self, x):
        def predict(self, x):
    

    假设模型已经训练好,现在要使用训练好的模型进行大规模预测:

    from pyspark import SparkFiles
    sc.addFile('/export/models/item2vec.py')
    import sys
    sys.path.append('/export/models/')
    from item2vec import Item2vec
    # model 表示训练好的模型
    bc_model_state = sc.broadcast(model.state_dict())
    net = Item2vec(cv_dict, csr_cols)
    def get_model_for_eval_demo():
      # Broadcast the model state_dict
      net.load_state_dict(bc_model_state.value)
      net.eval()
      return net
    

    上面的操作已经将模型分发(broadcast)出去,接下来就可以进行预测了。预测这里介绍两种方式:一种是使用udf + withColumn, 另一种则是使用 rdd + mapPartitions。由于这里使用的是pyspark 2.1,还没有pandas udf,因此使用udf + withColumn时只能一行一行的预测,运行速度上来说是比不上rdd + mapPartitions。对于pyspark 2.3以后的版本多了pandas udf后则可以使用batch predict了,具体可以参考https://docs.databricks.com/_static/notebooks/deep-learning/pytorch-images.html

    udf + withColumn 的方式
    # udf + withColumn 的方式
    def one_row_predict_demo(x)
        x = torch.tensor(x, dtype=torch.float)
        _, prob = bc_model.predict(x)
        return round(float(prob[0]), 4)
    one_row_predict_demo_udf = udf(one_row_predict_demo, DoubleType())
    one_row_predict_demo_udf = udf(one_row_predict_demo, DoubleType())
    df = demo.withColumn('demo_prob', one_row_predict_demo_udf('features'))
    
    rdd + map 方式
    def one_row_predict_map(rdds):
        bc_model = get_model_for_eval_demo()
        for row in rdds:
            x = torch.tensor(row.x, dtype=torch.float)
            _, prob = bc_model.predict(x)
            yield (row['id'], round(float(prob[0]), 4))
    df = demo.rdd.mapPartitions(one_row_predict_map).toDF(['id', 'pred_prob'])
    

    效率优化(1)——mapPartition

    上面的方法已经可以使得我们将训练好的深度学习模型部署到spark进行大规模预测了,但是其速度是非常慢的。通过在mapPartitions中进行一些处理,我们可以对预测进行加速:

    # 代码源自 https://github.com/SaeedNajafi/infer-pytorch-pyspark
    def basic_row_handler(row):
        return row
    def predict_map(index, partition, ml_task,
                    batch_size=16,
                    row_preprocessor=basic_row_handler,
                    row_postprocessor=basic_row_handler):
        # local model loading within each executor
        model = LocalPredictor(ml_task=ml_task, batch_size=batch_size,
                               partition_index=index)
        batch = []
        count = 0
        for row in partition:
            row_dict = row.asDict()
            # apply preprocessor on each row.
            row_dict_prep = row_preprocessor(row_dict)
            batch.append(row_dict_prep)
            count += 1
            if count == batch_size:
                # predict the ml and apply the postprocessor.
                for ret_row in model.predict(batch):  # ml prediction
                    ret_row_post = row_postprocessor(ret_row)
                    if ret_row_post is not None:
                        yield Row(**ret_row_post)
                batch = []
                count = 0
        # Flush remaining rows in the batches.
        if count != 0:
            for ret_row in model.predict(batch):  # ml prediction
                ret_row_post = row_postprocessor(ret_row)
                if ret_row_post is not None:
                    yield Row(**ret_row_post)
            batch = []
            count = 0
    

    上面的代码可以看作是在mapPartitions中进行了“延迟”预测——即先将一个partition中的多行数据进行处理然后合并为一个batch进行一起预测,这样能大大的提升运行效率。一个比较极端的情况是每个partition仅进行一次预测。

    效率优化(2)——pandas_udf

    pandas_udf在udf的基础上进行了进一步的优化,利用pandas_udf程序运行效率更高。在这里我们可以借助于pandas_udf提升我们程序的运行效率:

    # Enable Arrow support.
    spark.conf.set("spark.sql.execution.arrow.enabled", "true")
    spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", "64")
    sc.addFile('get_model.py')
    from get_model import get_model
    model_path = '/path/to/model.pt'
    data_path = '/path/to/data'
    # model 表示训练好的模型
    model = torch.load(model_path)
    bc_model_state = sc.broadcast(model.state_dict())
    def get_model_for_eval():
      # Broadcast the model state_dict  
      model = get_model()
      model.load_state_dict(bc_model_state.value)
      model.eval()
      return model
    # model = torch.load(model_path)
    # model = sc.broadcast(model)
    @pandas_udf(FloatType())
    def predict_batch_udf(arr: pd.Series) -> pd.Series:
        model = get_model_for_eval()
        # model.to(device)
        arr = np.vstack(arr.map(lambda x: eval(x)).values)
        arr = torch.tensor(arr).long()
        with torch.no_grad():
            predictions = list(model(arr).cpu().numpy())
        return pd.Series(predictions)
    data = data.withColumn('predictions', predict_batch_udf('features'))