如何在pyspark数据框架上运行自定义pytorch模型(madule)的推理? 我有一个使用pytorch模型的类。
def get_model_obj(model):
model = model.module if hasattr(model, "module") else model
return model
class BertEncoder(nn.Module):
def __init__(
self, bert_model, output_dim, layer_pulled=-1, add_linear=None):
super(BertEncoder, self).__init__()
self.layer_pulled = layer_pulled
bert_output_dim = bert_model.embeddings.word_embeddings.weight.size(1)
self.bert_model = bert_model
if add_linear:
self.additional_linear = nn.Linear(bert_output_dim, output_dim)
self.dropout = nn.Dropout(0.1)
else:
self.additional_linear = None
def forward(self, token_ids, segment_ids, attention_mask):
output_bert, output_pooler = self.bert_model(
token_ids, segment_ids, attention_mask
# get embedding of [CLS] token
if self.additional_linear is not None:
embeddings = output_pooler
else:
embeddings = output_bert[:, 0, :]
# in case of dimensionality reduction
if self.additional_linear is not None:
result = self.additional_linear(self.dropout(embeddings))
else:
result = embeddings
return result
class BiEncoderModule(torch.nn.Module):
def __init__(self, params):
super(BiEncoderModule, self).__init__()
bert_model_path = "bert-base-cased"
ctxt_bert = BertModel.from_pretrained(bert_model_path)
cand_bert = BertModel.from_pretrained(bert_model_path)
self.context_encoder = BertEncoder(
ctxt_bert,
params["out_dim"],
layer_pulled=params["pull_from_layer"],
add_linear=params["add_linear"],
self.cand_encoder = BertEncoder(
cand_bert,
params["out_dim"],
layer_pulled=params["pull_from_layer"],
add_linear=params["add_linear"],
self.config = ctxt_bert.config
def forward(
self,
token_idx_ctxt,
segment_idx_ctxt,
mask_ctxt,
token_idx_cands,
segment_idx_cands,
mask_cands,
embedding_ctxt = None
if token_idx_ctxt is not None:
embedding_ctxt = self.context_encoder(
token_idx_ctxt, segment_idx_ctxt, mask_ctxt
embedding_cands = None
if token_idx_cands is not None:
embedding_cands = self.cand_encoder(
token_idx_cands, segment_idx_cands, mask_cands
return embedding_ctxt, embedding_cands
我可以用它作为简单的pytorch模型。
tokenizer = BertTokenizer.from_pretrained(
"bert-base-cased" )
def token_to_id(x, max_seq_length):
cand_tokens = tokenizer.tokenize(x)
input_ids = tokenizer.convert_tokens_to_ids(cand_tokens)
padding = [0] * (max_seq_length - len(input_ids))
input_ids += padding
return input_ids
config = {
"out_dim": 100,
"pull_from_layer":-1,
"add_linear" : False
args = argparse.Namespace(**config)
torch_model = BiEncoderModule(params)
ctx_token = token_to_id("[CLS] My name is [unused0] John [unused1] ! [SEP]",32)
ctx_segment = [0]*len(ctx_token)
ctx_mask = [1 if i !=0 else 0 for i in ctx_token]
cnd_token = token_to_id("[CLS] John [unused3] male given name [SEP]",512)
cnd_segment = [0]*len(cnd_token)
cndmask = [1 if i !=0 else 0 for i in cnd_token]
embedding_ctxt, embedding_cands = torch_model(torch.as_tensor(ctx_token)[None, :],torch.as_tensor(ctx_segment)[None, :],torch.as_tensor(ctx_mask)[None, :],torch.as_tensor(cnd_token)[None, :],torch.as_tensor(cnd_segment)[None, :],torch.as_tensor(cndmask)[None, :])
embedding_ctxt = embedding_ctxt.unsqueeze(1)
embedding_cands = embedding_cands.unsqueeze(2)
scores = torch.bmm(embedding_ctxt, embedding_cands)
scores = torch.squeeze(scores)
print(scores)
但我怎样才能将该模块植入Pyspark数据框架中呢?我尝试了这个代码,但得到的错误是"'list'对象没有'toArray'属性"。
data=[("[CLS] My name is [unused0] John [unused1] ! [SEP]","[CLS] John [unused3] male given name [SEP]")]
spark = sparknlp.start()
df=spark.createDataFrame(data).toDF("context","candidate")
df =df.withColumn("ctx_idx", udf(lambda x : [i for i in token_to_id(x,32)], ArrayType(IntegerType()))("context"))
df =df.withColumn("ctx_segment", udf(lambda x : [0]*len(x), ArrayType(IntegerType()))("ctx_idx"))
df =df.withColumn("ctx_mask", udf(lambda x :[1 if i !=0 else 0 for i in x], ArrayType(IntegerType()))("ctx_idx"))
df =df.withColumn("cnd_idx", udf(lambda x : [i for i in token_to_id(x,512)], ArrayType(IntegerType()))("candidate"))
df =df.withColumn("score", udf(lambda ctx_ids, cnd_ids : run_model(ctx_ids, cnd_ids), ArrayType(IntegerType()))("context","candidate"))
df =df.withColumn("cnd_segment", udf(lambda x : [0]*len(x), ArrayType(IntegerType()))("cnd_idx"))
df =df.withColumn("cnd_mask", udf(lambda x : [1 if i !=0 else 0 for i in x], ArrayType(IntegerType()))("cnd_idx"))
df =df.withColumn("ctx_idx", udf(lambda x : [x], ArrayType(ArrayType(IntegerType())))("ctx_idx"))
df =df.withColumn("ctx_segment", udf(lambda x : [x], ArrayType(ArrayType(IntegerType())))("ctx_segment"))
df =df.withColumn("ctx_mask", udf(lambda x : [x], ArrayType(ArrayType(IntegerType())))("ctx_mask"))
df =df.withColumn("cnd_idx", udf(lambda x : [x], ArrayType(ArrayType(IntegerType())))("cnd_idx"))
df =df.withColumn("cnd_segment", udf(lambda x : [x], ArrayType(ArrayType(IntegerType())))("cnd_segment"))
df =df.withColumn("cnd_mask", udf(lambda x : [x], ArrayType(ArrayType(IntegerType())))("cnd_mask"))
df = df.withColumn("features", concat(df.ctx_idx,df.ctx_segment, df.ctx_mask, df.cnd_idx, df.cnd_segment,df.cnd_mask))
spark_torch_model = create_spark_torch_model(
torch_model,
inputCol='features',
predictionCol='predictions'
pipeline = Pipeline(stages=[spark_torch_model])
result = pipeline.fit(df).transform(df)