# 搭建模型
#### model stracture ####
class PositionalEncoding(nn.Module):
def __init__(self, d_model, max_len=10000):# 位置编码的输入是一个三维的的 这里的max_len=1100 要大于=输入矩阵的第一个维度
super(PositionalEncoding, self).__init__()
pe = torch.zeros(max_len, d_model)
position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
div_term = torch.exp(torch.arange(0, d_model, 3).float() * (-math.log(10000.0) / d_model))
pe[:, 0::2] = torch.sin(position * div_term)
pe[:, 1::2] = torch.cos(position * div_term)
pe = pe.unsqueeze(0).transpose(0, 1)
self.register_buffer('pe', pe)
def forward(self, x):
return x + self.pe[:x.size(0), :]
class transformer_encode_gru(nn.Module):
def __init__(self, feature_size=3, num_layers=2, dropout=0.1): # 这个feature_size=200的维度要与输入transformer中的每个单元的维度是一样的
super( transformer_encode_gru, self).__init__()
self.model_type = 'Transformer'
self.src_mask = None
self.pos_encoder = PositionalEncoding(feature_size)
self.encoder_layer = nn.TransformerEncoderLayer(d_model=feature_size, nhead=1, dropout=0.1)
self.transformer_encoder = nn.TransformerEncoder(self.encoder_layer, num_layers=2)
self.linear = nn.Linear(6,3)
self.init_weights() # nn.Linear 权重参数 初始化
self.src_key_padding_mask = None #解码器的mask是空 可以直接删除
self.gru = nn.LSTM(3,3,num_layers=1,bidirectional=True) # ,batch_first=True 是使用双向
self.relu=F.relu
def init_weights(self):
initrange = 0.1
self.linear.bias.data.zero_()
self.linear.weight.data.uniform_(-initrange, initrange)
def forward(self, src):
#----------------------------------------------------------------
# src=self.conv1(src)
# src = torch.squeeze(src,dim=2)
#输入进来的是4维度的数据,先卷积然后再去除一个1维度的数据,变成一个三维度的数据。 transfoemer 要求是输入三维度的数据 ,句子长度 batch大小 每个单词的维度
#其中 原始数据要求是两个维度的,所以加上batch是三个维度也正好是三维度的数据。
#----------------------------------------------------------------
# 制作mask矩阵 ---------------------------------------------------
src_key_padding_mask=np.zeros(int(src.shape[0])*int(src.shape[1]))
src_key_padding_mask=torch.tensor(src_key_padding_mask.reshape([int(src.shape[1]),int(src.shape[0])])).to(device)
#----------------------------------------------------------------
src_key_padding_mask = src_key_padding_mask.bool()
src = self.pos_encoder(src) # 位置编码
output = self.transformer_encoder(src,self.src_mask,src_key_padding_mask) # encode部分
output=self.relu(output)
gru_out,(h_n,c_n)= self.gru(output, None)
# print("gru_out.shape",gru_out.shape)
gru_out=gru_out.squeeze(1)
output=self.relu(gru_out)
output = self.linear(output)
return output
def mape(y_true, y_pred):
return np.mean(np.abs((y_pred - y_true) / y_true)) * 100
def test(model):
with torch.no_grad():
inputs=test_x
targets=test_y
# 归一化
inputs = preprocessing.scale(inputs)
targets = preprocessing.scale(targets)
inputs = torch.tensor(inputs).to(device)
targets = torch.tensor(targets).to(device)
inputs=inputs.float()
targets=targets.float()
inputs= inputs.unsqueeze(0).transpose(0,1)
outputs=model(inputs)
# print(outputs.shape) train_y=train_y.long()
loss=criterion(outputs,targets)
# print("loss:",loss)
mape_loss=mape(targets.cpu().detach().numpy(),outputs.cpu().detach().numpy())
# print("mape_loss",mape_loss)
return loss.item(),mape_loss
# torch.set_default_tensor_type(torch.DoubleTensor) # 直接设置创建的tensor类型默认为Double,如果不设置的话自动默认为float类型。ko
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = transformer_encode_gru().to(device)
epochs = 50
# best_model = None
optimizer = torch.optim.Adam(model.parameters(), lr=0.002)
criterion = torch.nn.MSELoss().to(device)# size_average = True, reduce = True
# torch.set_default_tensor_type(torch.FloatTensor)
val_mape_loss=[]
val_mse_loss=[]
train_loss=[]
best_test_loss=100000
for epoch in tqdm(range(epochs)):
train_epoch_loss=[]
for i in range(0,len(train_x),1):#
# optimizer.zero_grad()
inputs=train_x[i]
targets=train_y[i]
# 归一化
inputs = preprocessing.scale(inputs)
targets = preprocessing.scale(targets)
inputs = torch.tensor(inputs).to(device)
targets = torch.tensor(targets).to(device)
inputs=inputs.float()
targets=targets.float()
inputs= inputs.unsqueeze(0).transpose(0,1)
outputs=model(inputs)
# print(outputs.shape)
# print(targets.shape)
loss=criterion(outputs.float(),targets.float())
print("loss:",loss)
# loss=torch.tensor(loss.item(),requires_grad=True)
# mape_loss=mape(targets.cpu().detach().numpy().reshape(1,-1),outputs.cpu().detach().numpy().reshape(1,-1))
# print("mape_loss",mape_loss)
loss.backward()
optimizer.step()
train_epoch_loss.append(loss.item())
test_mse_loss,test_mape_loss=test(model)
val_mse_loss.append(test_mse_loss)
val_mape_loss.append(test_mape_loss)
train_loss.append(np.mean(np.array(train_epoch_loss)))
if test_mse_loss<best_test_loss:
best_test_loss=test_mse_loss
print("best_test_loss",best_test_loss)
best_model=model
print("np.mean(np.array(train_epoch_loss))",np.mean(np.array(train_epoch_loss))," test_mse_loss",test_mse_loss,"----------")
# torch.save(best_model.state_dict(),'best_model.pth')