1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97
| import torch import torch.nn as nn import seaborn as sns import numpy as np import pandas as pd import matplotlib.pyplot as plt from sklearn.preprocessing import MinMaxScaler
def plot_data(data, pred=None): plt.title('mouth vs passengers') plt.xlabel('mouths') plt.ylabel('total passengers') plt.plot(data) if pred is not None: x = np.arange(132, 144, 1) plt.plot(x, pred) plt.show()
def create_io_seq(data, tw): io_seq = [] for i in range(len(data) - tw): seq = data[i:i + tw] lab = data[i + tw:i + tw + 1] io_seq.append((seq, lab)) return io_seq
class LSTM(nn.Module): def __init__(self, i_size=1, hid_size=200, o_size=1): super(LSTM, self).__init__() self.hidden_layer_size = hid_size self.lstm = nn.LSTM(i_size, self.hidden_layer_size) self.linear = nn.Linear(self.hidden_layer_size, o_size) self.hidden_cell = (torch.zeros(1, 1, self.hidden_layer_size), torch.zeros(1, 1, self.hidden_layer_size))
def forward(self, input_seq): out, self.hidden_cell = self.lstm(input_seq.view(len(input_seq), 1, -1), self.hidden_cell) pred = self.linear(out.view(len(input_seq), -1)) return pred[-1]
def train(lstm, data, PATH='./cifar_net.pth'): loss_func = nn.MSELoss().to(device) optimizer = torch.optim.Adam(lstm.parameters(), lr=0.001) print(lstm) epochs = 150 for i in range(epochs): for seq, label in data: seq = seq.to(device) label = label.to(device) optimizer.zero_grad() lstm.hidden_cell = ( torch.zeros(1, 1, lstm.hidden_layer_size).to(device), torch.zeros(1, 1, lstm.hidden_layer_size).to(device)) y_pred = lstm(seq) single_loss = loss_func(y_pred, label) single_loss.backward() optimizer.step() if i % 25 == 1: print(f'epoch: {i:3} loss: {single_loss.item():10.8f}') torch.save(lstm.state_dict(), PATH)
if __name__ == '__main__': device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
flight = sns.load_dataset('flights') all_data = flight['passengers'].values.astype(float) test_data_size = 12 train_data = all_data[:-test_data_size] test_data = all_data[-test_data_size:] scaler = MinMaxScaler((0, 10)) train_data_norm = scaler.fit_transform(train_data.reshape(-1, 1)) train_data_norm = torch.FloatTensor(train_data_norm).view(-1)
train_io_seq = create_io_seq(train_data_norm, 12)
lstm = LSTM() lstm.to(device)
fut_pred = 12 lstm.load_state_dict(torch.load('./cifar_net.pth')) lstm.cpu() test_input = train_data_norm[-12:].tolist() for i in range(fut_pred): seq = torch.FloatTensor(test_input[-12:]) with torch.no_grad(): lstm.hidden_cell = (torch.zeros(1, 1, lstm.hidden_layer_size), torch.zeros(1, 1, lstm.hidden_layer_size)) test_input.append(lstm(seq).item())
act_pred = scaler.inverse_transform(np.array(test_input[-12:]).reshape(-1, 1)) plot_data(flight['passengers'], act_pred)
|