#使用dnn模型(k折交叉验证)
import torch
import torch.nn as nn
from torch.utils import data
from torch.utils.data import Dataset,DataLoader
from torch import optim
#定义神经网络模型
dropout1, dropout2 = 0.3, 0.6
class SimpleNN(nn.Module):
def __init__(self):
super(SimpleNN, self).__init__() # 继承需要用 SimpleNN
self.dense = nn.Sequential(
nn.Flatten(),
nn.Linear(154, 256),
nn.ReLU(),
nn.Dropout(dropout1),
nn.Linear(256, 128),
nn.ReLU(),
nn.Dropout(dropout2),
nn.Linear(128, 1),
)
def forward(self, X):
output = self.dense(X)
return output
#初始化模型和优化器
dnn_model = SimpleNN()
loss = nn.MSELoss() #定义损失函数
optimizer = optim.Adam(dnn_model.parameters(),lr=0.0001,weight_decay=0) #定义优化器
#k折交叉验证选取训练集与验证集
def get_k_fold_data(k, i, X, y):
assert k > 1
fold_size = len(X) // k
X_train, y_train = None, None
for j in range(k):
start = j * fold_size
end = (j + 1) * fold_size
if j == i:
X_valid, y_valid = X.iloc[start:end], y.iloc[start:end]
elif X_train is None:
X_train, y_train = X.iloc[start:end], y.iloc[start:end]
else:
X_train = pd.concat([X_train, X.iloc[start:end]], ignore_index=True)
y_train = pd.concat([y_train, y.iloc[start:end]], ignore_index=True)
return X_train, y_train, X_valid, y_valid
# 开始训练
k = 5
batch_size = 64
num_epochs = 100
#weight_decay = 0
#初始化损失
train_l_sum, valid_l_sum = 0, 0
#初始化列表
train_ls, valid_ls = [], []
for i in range(k):
X_train, y_train, X_valid, y_valid = get_k_fold_data(k, i, X, y)
print(f'FOLD {i}')
print('--------------------------------')
#将DataFrame数据转换为NumPy数组,然后再转换为PyTorch张量
X_train = torch.tensor(X_train.astype(np.float32).values, dtype=torch.float32)
y_train = torch.tensor(y_train.astype(np.float32).values, dtype=torch.float32)
X_valid = torch.tensor(X_valid.astype(np.float32).values, dtype=torch.float32)
y_valid = torch.tensor(y_valid.astype(np.float32).values, dtype=torch.float32)
#创建数据集
train_dataset = data.TensorDataset(X_train, y_train)
valid_dataset = data.TensorDataset(X_valid, y_valid)
# 获取一个数据迭代器
train_iter = DataLoader(dataset=train_dataset,batch_size=batch_size,shuffle=True,num_workers=2)#shuffle=True相当于sampler=RandomSampler(dataset)
valid_iter = DataLoader(dataset=valid_dataset,batch_size=batch_size,shuffle=True,num_workers=2)
#开始迭代
for epoch in range(num_epochs):
train_loss = 0
for tensor_x, tensor_y in train_iter:#训练集执行梯度更新
tensor_x = tensor_x.float()
tensor_y = tensor_y.float().reshape(-1, 1)
optimizer.zero_grad() #梯度清零
pre_train = dnn_model(tensor_x)
train_l = loss(pre_train, tensor_y) #损失应避免与全局变量loss重名
train_l.backward()#前向传播
optimizer.step()#梯度下降
train_loss += train_l.item() * len(tensor_x) #批量规模损失累加
train_loss /= len(tensor_x) #每次迭代平均损失
if epoch % 20 == 0:
print('Loss: {} Epoch:{}'.format(train_loss, epoch))
with torch.no_grad():
valid_loss = 0
for tensor_x, tensor_y in valid_iter:
tensor_x = tensor_x.float()
tensor_y = tensor_y.float().reshape(-1, 1)
pre_valid = dnn_model(tensor_x)
valid_l = loss(pre_valid, tensor_y)
valid_loss += valid_l.item() * len(tensor_x)
valid_loss /= len(tensor_x)
if epoch % 20 == 0:
print('Valid Loss: {} Epoch:{}'.format(valid_loss, epoch))
#将每折的损失添加到列表中
train_ls.append(train_loss)
valid_ls.append(valid_loss)
print('Training Ended')
print('Train Average Loss: {} Valid Average Loss: {}'.format(np.mean(train_l),np.mean(valid_l)))