背景知识

LSTM原理介绍

pytorch doc

简单实例

1
2
3
4
5
6
lstm = nn.LSTM(1, 2, 2)                    # (input_size(feature_num) ,hidden_size,num_layers ) 
input = torch.randn(4, 1, 1) # (seq_len, batch, input_size(feature_num))
h0 = torch.randn(2, 1, 2) # (num_layers * num_directions, batch, hidden_size)
c0 = torch.randn(2, 1, 2) # (num_layers * num_directions, batch, hidden_size)
output, (hn, cn) = lstm(input, (h0, c0)) # output:(seq_len, batch, num_directions * hidden_size)
# hn = output[-1] , output = [h1,h2,...,hn]

各个参数的理解

  1. input_size:一个输入中的特征的个数
  2. hidden_size: 隐藏层节点的个数,如下图
  3. num_layers: lstm层的个数,默认为1,如果设为2,则第一层的输出[h0,h1,…hn]作为第二层的输入,在计算出最后的输出[h0,h1,…hn]
  4. seq_len: 一个序列中输入的个数
  5. batch: 每批处理的样本个数

实例结构图

航班预测人数

参考 https://stackabuse.com/time-series-prediction-using-lstm-with-pytorch-in-python/

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
import torch
import torch.nn as nn
import seaborn as sns
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.preprocessing import MinMaxScaler

# https://stackabuse.com/time-series-prediction-using-lstm-with-pytorch-in-python/

def plot_data(data, pred=None):
plt.title('mouth vs passengers')
plt.xlabel('mouths')
plt.ylabel('total passengers')
plt.plot(data)
if pred is not None:
x = np.arange(132, 144, 1)
plt.plot(x, pred)
plt.show()


def create_io_seq(data, tw):
io_seq = []
for i in range(len(data) - tw):
seq = data[i:i + tw]
lab = data[i + tw:i + tw + 1]
io_seq.append((seq, lab))
return io_seq


class LSTM(nn.Module):
def __init__(self, i_size=1, hid_size=200, o_size=1):
super(LSTM, self).__init__()
self.hidden_layer_size = hid_size
self.lstm = nn.LSTM(i_size, self.hidden_layer_size)
self.linear = nn.Linear(self.hidden_layer_size, o_size)
self.hidden_cell = (torch.zeros(1, 1, self.hidden_layer_size), torch.zeros(1, 1, self.hidden_layer_size))

def forward(self, input_seq):
out, self.hidden_cell = self.lstm(input_seq.view(len(input_seq), 1, -1), self.hidden_cell)
pred = self.linear(out.view(len(input_seq), -1))
return pred[-1]


def train(lstm, data, PATH='./cifar_net.pth'):
loss_func = nn.MSELoss().to(device)
optimizer = torch.optim.Adam(lstm.parameters(), lr=0.001)
print(lstm)
epochs = 150
for i in range(epochs):
for seq, label in data:
seq = seq.to(device)
label = label.to(device)
optimizer.zero_grad()
lstm.hidden_cell = (
torch.zeros(1, 1, lstm.hidden_layer_size).to(device),
torch.zeros(1, 1, lstm.hidden_layer_size).to(device))
y_pred = lstm(seq)
single_loss = loss_func(y_pred, label)
single_loss.backward()
optimizer.step()
if i % 25 == 1:
print(f'epoch: {i:3} loss: {single_loss.item():10.8f}')
torch.save(lstm.state_dict(), PATH)


if __name__ == '__main__':
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

flight = sns.load_dataset('flights')
all_data = flight['passengers'].values.astype(float)
test_data_size = 12
train_data = all_data[:-test_data_size]
test_data = all_data[-test_data_size:]
scaler = MinMaxScaler((0, 10))
train_data_norm = scaler.fit_transform(train_data.reshape(-1, 1))
train_data_norm = torch.FloatTensor(train_data_norm).view(-1)

train_io_seq = create_io_seq(train_data_norm, 12)

lstm = LSTM()
lstm.to(device)
# train(lstm, train_io_seq)

fut_pred = 12
lstm.load_state_dict(torch.load('./cifar_net.pth'))
lstm.cpu()
test_input = train_data_norm[-12:].tolist()
for i in range(fut_pred):
seq = torch.FloatTensor(test_input[-12:])
with torch.no_grad():
lstm.hidden_cell = (torch.zeros(1, 1, lstm.hidden_layer_size),
torch.zeros(1, 1, lstm.hidden_layer_size))
test_input.append(lstm(seq).item())

act_pred = scaler.inverse_transform(np.array(test_input[-12:]).reshape(-1, 1))
plot_data(flight['passengers'], act_pred)

最终结果,蓝色为原始数据,橙红色是预测数据