如何在pyspark数据框架上运行自定义pytorch模型(madule)的推理?

1 人关注

如何在pyspark数据框架上运行自定义pytorch模型(madule)的推理? 我有一个使用pytorch模型的类。

def get_model_obj(model):
    model = model.module if hasattr(model, "module") else model
    return model
class BertEncoder(nn.Module):
    def __init__(
        self, bert_model, output_dim, layer_pulled=-1, add_linear=None):
        super(BertEncoder, self).__init__()
        self.layer_pulled = layer_pulled
        bert_output_dim = bert_model.embeddings.word_embeddings.weight.size(1)
        self.bert_model = bert_model
        if add_linear:
            self.additional_linear = nn.Linear(bert_output_dim, output_dim)
            self.dropout = nn.Dropout(0.1)
        else:
            self.additional_linear = None
    def forward(self, token_ids, segment_ids, attention_mask):
        output_bert, output_pooler = self.bert_model(
            token_ids, segment_ids, attention_mask
        # get embedding of [CLS] token
        if self.additional_linear is not None:
            embeddings = output_pooler
        else:
            embeddings = output_bert[:, 0, :]
        # in case of dimensionality reduction
        if self.additional_linear is not None:
            result = self.additional_linear(self.dropout(embeddings))
        else:
            result = embeddings
        return result
class BiEncoderModule(torch.nn.Module):
    def __init__(self, params):
        super(BiEncoderModule, self).__init__()
        bert_model_path = "bert-base-cased"
        ctxt_bert = BertModel.from_pretrained(bert_model_path)
        cand_bert = BertModel.from_pretrained(bert_model_path)
        self.context_encoder = BertEncoder(
            ctxt_bert,
            params["out_dim"],
            layer_pulled=params["pull_from_layer"],
            add_linear=params["add_linear"],
        self.cand_encoder = BertEncoder(
            cand_bert,
            params["out_dim"],
            layer_pulled=params["pull_from_layer"],
            add_linear=params["add_linear"],
        self.config = ctxt_bert.config
    def forward(
            self,
            token_idx_ctxt,
            segment_idx_ctxt,
            mask_ctxt,
            token_idx_cands,
            segment_idx_cands,
            mask_cands,
        embedding_ctxt = None
        if token_idx_ctxt is not None:
            embedding_ctxt = self.context_encoder(
                token_idx_ctxt, segment_idx_ctxt, mask_ctxt
        embedding_cands = None
        if token_idx_cands is not None:
            embedding_cands = self.cand_encoder(
                token_idx_cands, segment_idx_cands, mask_cands
        return embedding_ctxt, embedding_cands

我可以用它作为简单的pytorch模型。

tokenizer = BertTokenizer.from_pretrained(
            "bert-base-cased" )
def token_to_id(x, max_seq_length):
  cand_tokens = tokenizer.tokenize(x)
  input_ids = tokenizer.convert_tokens_to_ids(cand_tokens)
  padding = [0] * (max_seq_length - len(input_ids))
  input_ids += padding
  return input_ids
 config = {
        "out_dim": 100,
        "pull_from_layer":-1,
        "add_linear" : False
args = argparse.Namespace(**config)
torch_model = BiEncoderModule(params)
ctx_token = token_to_id("[CLS] My name is [unused0] John [unused1] ! [SEP]",32)
ctx_segment =  [0]*len(ctx_token)
ctx_mask = [1 if i !=0 else 0 for i in ctx_token]
cnd_token = token_to_id("[CLS] John [unused3] male given name [SEP]",512)
cnd_segment =  [0]*len(cnd_token)
cndmask = [1 if i !=0 else 0 for i in cnd_token]
embedding_ctxt, embedding_cands = torch_model(torch.as_tensor(ctx_token)[None, :],torch.as_tensor(ctx_segment)[None, :],torch.as_tensor(ctx_mask)[None, :],torch.as_tensor(cnd_token)[None, :],torch.as_tensor(cnd_segment)[None, :],torch.as_tensor(cndmask)[None, :])
embedding_ctxt = embedding_ctxt.unsqueeze(1)  
embedding_cands = embedding_cands.unsqueeze(2)
scores = torch.bmm(embedding_ctxt, embedding_cands)  
scores = torch.squeeze(scores)
print(scores)

但我怎样才能将该模块植入Pyspark数据框架中呢?我尝试了这个代码,但得到的错误是"'list'对象没有'toArray'属性"。

data=[("[CLS] My name is [unused0] John [unused1] ! [SEP]","[CLS] John [unused3] male given name [SEP]")]
spark = sparknlp.start()
df=spark.createDataFrame(data).toDF("context","candidate")
df =df.withColumn("ctx_idx", udf(lambda x : [i for i  in token_to_id(x,32)], ArrayType(IntegerType()))("context"))
df =df.withColumn("ctx_segment", udf(lambda x : [0]*len(x), ArrayType(IntegerType()))("ctx_idx"))
df =df.withColumn("ctx_mask", udf(lambda x :[1 if i !=0 else 0 for i in x], ArrayType(IntegerType()))("ctx_idx"))
df =df.withColumn("cnd_idx", udf(lambda x : [i for i  in token_to_id(x,512)], ArrayType(IntegerType()))("candidate"))
df =df.withColumn("score", udf(lambda ctx_ids, cnd_ids : run_model(ctx_ids, cnd_ids), ArrayType(IntegerType()))("context","candidate"))
df =df.withColumn("cnd_segment", udf(lambda x : [0]*len(x), ArrayType(IntegerType()))("cnd_idx"))
df =df.withColumn("cnd_mask", udf(lambda x : [1 if i !=0 else 0 for i in x], ArrayType(IntegerType()))("cnd_idx"))
df =df.withColumn("ctx_idx", udf(lambda x : [x], ArrayType(ArrayType(IntegerType())))("ctx_idx"))
df =df.withColumn("ctx_segment", udf(lambda x : [x], ArrayType(ArrayType(IntegerType())))("ctx_segment"))
df =df.withColumn("ctx_mask", udf(lambda x : [x], ArrayType(ArrayType(IntegerType())))("ctx_mask"))
df =df.withColumn("cnd_idx", udf(lambda x : [x], ArrayType(ArrayType(IntegerType())))("cnd_idx"))
df =df.withColumn("cnd_segment", udf(lambda x : [x], ArrayType(ArrayType(IntegerType())))("cnd_segment"))
df =df.withColumn("cnd_mask", udf(lambda x : [x], ArrayType(ArrayType(IntegerType())))("cnd_mask"))
df = df.withColumn("features", concat(df.ctx_idx,df.ctx_segment, df.ctx_mask, df.cnd_idx, df.cnd_segment,df.cnd_mask))
spark_torch_model = create_spark_torch_model(
    torch_model, 
    inputCol='features',
    predictionCol='predictions'
pipeline = Pipeline(stages=[spark_torch_model])
result = pipeline.fit(df).transform(df)