n = 10
LR = 0.0001
EPOCH = 100
train_end = -500
df, df_all, df_index = readData('收盘价', n=n, train_end=train_end)
df_all = np.array(df_all.tolist())
plt.plot(df_index, df_all, label='real-data')
df_numpy = np.array(df)
df_numpy_mean = np.mean(df_numpy)
df_numpy_std = np.std(df_numpy)
df_numpy = (df_numpy - df_numpy_mean) / df_numpy_std
df_tensor = torch.Tensor(df_numpy)
trainset = TrainSet(df_tensor)
trainloader = DataLoader(trainset, batch_size=16, shuffle=True)
rnn = RNN(n)
optimizer = torch.optim.Adam(rnn.parameters(), lr=LR)
loss_func = nn.MSELoss()
for step in range(EPOCH):
for tx, ty in trainloader:
output = rnn(torch.unsqueeze(tx, dim=0))
loss = loss_func(torch.squeeze(output), ty)
optimizer.zero_grad()
loss.backward()
optimizer.step()
print(step, loss)
if step % 10:
torch.save(rnn, 'rnn.pkl')
torch.save(rnn, 'rnn.pkl')
generate_data_train = []
generate_data_test = []
test_index = len(df_all) + train_end
df_all_normal = (df_all - df_numpy_mean) / df_numpy_std
df_all_normal_tensor = torch.Tensor(df_all_normal)
for i in range(n, len(df_all)):
x = df_all_normal_tensor[i - n:i]
x = torch.unsqueeze(torch.unsqueeze(x, dim=0), dim=0)
y = rnn(x)
if i < test_index:
generate_data_train.append(torch.squeeze(y).detach().numpy() * df_numpy_std + df_numpy_mean)
else:
generate_data_test.append(torch.squeeze(y).detach().numpy() * df_numpy_std + df_numpy_mean)
plt.plot(df_index[n:train_end], generate_data_train, label='generate_train')
plt.plot(df_index[train_end:], generate_data_test, label='generate_test')
plt.legend()
plt.show()
plt.cla()
plt.plot(df_index[train_end:-400], df_all[train_end:-400], label='real-data')
plt.plot(df_index[train_end:-400], generate_data_test[:-400], label='generate_test')
plt.legend()
plt.show()
本实例数据集使用上证的收盘价,构建时序数据集,使用GRU对数据预测,从结果上来看,走势有一定的滞后性,由于只是用了价格预测价格,还是太简单了,可以考虑加入更多的特征去优化这一算法。
import pandas as pd
import matplotlib.pyplot as plt
import datetime
import torch
import torch.nn as nn
import numpy as np
from torch.utils.data import Dataset, DataLoader
import os
os.environ['KMP_DUPLICATE_LIB_OK'] = 'TRUE'
def generate_df_affect_by_n_days(series, n, index=False):
if len(series) <= n:
raise Exception("The Length of series is %d, while affect by (n=%d)." % (len(series), n))
df = pd.DataFrame()
for i in range(n):
df['c%d' % i] = series.tolist()[i:-(n - i)]
df['y'] = series.tolist()[n:]
if index:
df.index = series.index[n:]
return df
def readData(column='收盘价', n=30, all_too=True, index=False, train_end=-300):
df = pd.read_csv("sh.csv", index_col=0, encoding='utf-8')
df.fillna(0, inplace=True)
df.replace(to_replace="None", value=0)
del df["股票代码"]
del df["名称"]
df.index = list(map(lambda x: datetime.datetime.strptime(x, "%Y-%m-%d").date(), df.index))
df_column = df[column].copy()
df_column_train, df_column_test = df_column[:train_end], df_column[train_end - n:]
df_generate_from_df_column_train = generate_df_affect_by_n_days(df_column_train, n, index=index)
print(df_generate_from_df_column_train)
if all_too:
return df_generate_from_df_column_train, df_column, df.index.tolist()
return df_generate_from_df_column_train
class RNN(nn.Module):
def __init__(self, input_size):
super(RNN, self).__init__()
self.rnn = nn.GRU(
input_size=input_size,
hidden_size=64,
num_layers=1,
batch_first=True,
self.out = nn.Sequential(
nn.Linear(64, 1),
self.hidden = None
def forward(self, x):
r_out, self.hidden = self.rnn(x)
out = self.out(r_out)
return out
class TrainSet(Dataset):
def __init__(self, data):
self.data, self.label = data[:, :-1].float(), data[:, -1].float()
def __getitem__(self, index):
return self.data[index], self.label[index]
def __len__(self):
return len(self.data)
n = 10
LR = 0.0001
EPOCH = 100
train_end = -500
df, df_all, df_index = readData('收盘价', n=n, train_end=train_end)
df_all = np.array(df_all.tolist())
plt.plot(df_index, df_all, label='real-data')
df_numpy = np.array(df)
df_numpy_mean = np.mean(df_numpy)
df_numpy_std = np.std(df_numpy)
df_numpy = (df_numpy - df_numpy_mean) / df_numpy_std
df_tensor = torch.Tensor(df_numpy)
trainset = TrainSet(df_tensor)
trainloader = DataLoader(trainset, batch_size=64, shuffle=True)
rnn = RNN(n)
optimizer = torch.optim.Adam(rnn.parameters(), lr=LR)
loss_func = nn.MSELoss()
for step in range(EPOCH):
for tx, ty in trainloader:
output = rnn(torch.unsqueeze(tx, dim=0))
loss = loss_func(torch.squeeze(output), ty)
optimizer.zero_grad()
loss.backward()
optimizer.step()
print(step, loss)
if step % 10:
torch.save(rnn, 'rnn.pkl')
torch.save(rnn, 'rnn.pkl')
generate_data_train = []
generate_data_test = []
test_index = len(df_all) + train_end
df_all_normal = (df_all - df_numpy_mean) / df_numpy_std
df_all_normal_tensor = torch.Tensor(df_all_normal)
for i in range(n, len(df_all)):
x = df_all_normal_tensor[i - n:i]
x = torch.unsqueeze(torch.unsqueeze(x, dim=0), dim=0)
y = rnn(x)
if i < test_index:
generate_data_train.append(torch.squeeze(y).detach().numpy() * df_numpy_std + df_numpy_mean)
else:
generate_data_test.append(torch.squeeze(y).detach().numpy() * df_numpy_std + df_numpy_mean)
plt.plot(df_index[n:train_end], generate_data_train, label='generate_train')
plt.plot(df_index[train_end:], generate_data_test, label='generate_test')
plt.legend()
plt.show()
plt.cla()
plt.plot(df_index[train_end:-400], df_all[train_end:-400], label='real-data')
plt.plot(df_index[train_end:-400], generate_data_test[:-400], label='generate_test')
plt.legend()
plt.show()