?以下程序是将 c s v \mathrm{csv} csv文件里的 F E R 2013 \mathrm{FER2013} FER2013图片数据存储为 h 5 p y \mathrm{h5py} h5py格式文件。
#create data and label for FER2013 #labels: 0=Angry, 1=Disgust, 2=Fear, 3=Happy, 4=Sad, 5=Surprise, 6=Neutral import csv import os import numpy as np import h5py file = r'Dataset/FER2013/fer2013.csv' #Create the list to store the data and label information Training_x = [] Training_y = [] PublicTest_x = [] PublicTest_y = [] PrivateTest_x = [] PrivateTest_y = [] datapath = os.path.join('H5File','FER2013.h5') if not os.path.exists(os.path.dirname(datapath)): os.makedirs(os.path.dirname(datapath)) with open(file,'r') as csvin: data = csv.reader(csvin) for row in data:
if row[-1] == 'Training':
temp_list = []
for pixel in row[1].split( ):
temp_list.append(int(pixel))
I = np.asarray(temp_list)
Training_y.append(int(row[0]))
Training_x.append(I.tolist())
if row[-1] == 'PublicTest':
temp_list = []
for pixel in row[1].split( ):
temp_list.append(int(pixel))
I = np.asarray(temp_list)
PublicTest_y.append(int(row[0]))
PublicTest_x.append(I.tolist())
if row[-1] == 'PrivateTest':
temp_list = []
for pixel in row[1].split( ):
temp_list.append(int(pixel))
I = np.asarray(temp_list)
PrivateTest_y.append(int(row[0]))
PrivateTest_x.append(I.tolist())
print(np.shape(Training_x))
print(np.shape(PublicTest_x))
print(np.shape(PrivateTest_x))
datafile = h5py.File(datapath, 'w')
datafile.create_dataset("Training_pixel", dtype='uint8', data=Training_x)
datafile.create_dataset("Training_label", dtype='int64', data=Training_y)
datafile.create_dataset("PublicTest_pixel", dtype='uint8', data=PublicTest_x)
datafile.create_dataset("PublicTest_label", dtype='int64', data=PublicTest_y)
datafile.create_dataset("PrivateTest_pixel", dtype='uint8', data=PrivateTest_x)
datafile.create_dataset("PrivateTest_label", dtype='int64', data=PrivateTest_y)
datafile.close()
print("Save data finish!!")
以下 F E R 2013. p y \mathrm{FER2013.py} FER2013.py程序是用类torch.utils.data.Dataset对 F E R 2013 \mathrm{FER2013} FER2013数据集进行封装。
from __future__ import print_function
from PIL import Image
import numpy as np
import h5py
import torch.utils.data as data
class FER2013(data.Dataset):
''' Args: train (bool, optional): If True, creates dataset from training set, otherwise creates from test set. transform (callable, optional): A function/transform that takes in an PIL image and returns a transformed version '''
def __init__(self, split='Training', transform=None):
self.transform = transform
self.split = split # training set or test set
self.data = h5py.File('H5File/FER2013.h5','r',driver='core')
# now load the picked numpy arrays
if self.split == 'Training':
self.train_data = self.data['Training_pixel']
self.train_labels = self.data['Training_label']
self.train_data = np.asarray(self.train_data)
self.train_data = self.train_data.reshape((28709, 48, 48))
elif self.split == 'PublicTest':
self.PublicTest_data = self.data['PublicTest_pixel']
self.PublicTest_labels = self.data['PublicTest_label']
self.PublicTest_data = np.asarray(self.PublicTest_data)
self.PublicTest_data = self.train_data.reshape((3589, 48, 48))
else:
self.PrivateTest_data = self.data['PrivateTest_pixel']
self.PrivateTest_labels = self.data['PrivateTest_label']
self.PrivateTest_data = np.asarray(self.PrivateTest_data)
self.PrivateTest_data = self.train_data.reshape((3589, 48, 48))
def __getitem__(self, index):
''' Args: index (int): Index Returns: tuple: (image, target) where target is index of the target class. '''
if self.split == 'Training':
img, target = self.train_data[index], self.train_labels[index]
elif self.split == 'PublicTest':
img, target = self.PublicTest_data[index], self.PublicTest_label[index]
else:
img, target = self.PrivateTest_data[index], self.PrivateTest_labels[index]
# doing this so that it is consistent with all other datasets
# to return a PIL Image
img = img[:,:,np.newaxis]
img = np.concatenate((img, img, img), axis=2)
img = Image.fromarray(img)
if self.transform is not None:
img = self.transform(img)
return img, target
def __len__(self):
if self.split == 'Training':
return len(self.train_data)
elif self.split == 'PublicTest':
return len(self.PublicTest_data)
else:
return len(self.PrivateTest_data)
if __name__ == '__main__':
# import transforms as transforms
# transform_train = transforms.Compose([
# transforms.ToTensor(),
# ])
transform_train = None
data = FER2013(split = 'Training', transform = transform_train)
for i in range(3):
print(data.__getitem__(i))
print(data.__len__())
正常训练以及对抗训练的程序文件为 a d v e r s a r i a l t r a i n i n g . p y \mathrm{adversarial_training.py} adversarialtraining.py如下所示
import torch
from torch.autograd import Variable
import utils
from attack_method import *
import torch.nn as nn
import torch.optim as optim
from time import clock
import os
import torch.nn.functional as F
class Adversarial_Trainings(object):
def __init__(self, repeat_num , trainloader, use_cuda, attack_iters, net, multi_net, epsilon, alpha, learning_rate_decay_start, learning_rate_decay_every, learning_rate_decay_rate, lr, multi_lr, save_path, gamma1, gamma2):
self.trainloader = trainloader
self.repeat_num = repeat_num
self.use_cuda = use_cuda
self.attack_iters = attack_iters
self.net = net
self.epsilon = epsilon
self.alpha = alpha
self.learning_rate_decay_start = learning_rate_decay_start
self.learning_rate_decay_every = learning_rate_decay_every
self.learning_rate_decay_rate = learning_rate_decay_rate
self.lr = lr
self.save_path = save_path
self.optimizer = optim.SGD(self.net.parameters(), lr=self.lr, momentum=0.9, weight_decay=5e-4)
self.multi_net = multi_net
self.multi_optimizer = optim.SGD(multi_net.parameters(), lr = multi_lr , momentum=0.9, weight_decay=5e-4)
self.gamma1 = gamma1
self.gamma2 = gamma2
def Stardard_Training(self, total_epoch):
self.net.train()
for epoch in range(total_epoch):
print('\nEpoch: %d' % epoch)
train_loss = 0
correct = 0
total = 0
if epoch > self.learning_rate_decay_start and self.learning_rate_decay_start >= 0:
frac = (epoch - self.learning_rate_decay_start) // self.learning_rate_decay_every
decay_factor = self.learning_rate_decay_rate ** frac
current_lr = self.lr * decay_factor
utils.set_lr(self.optimizer, current_lr) # set the decayed rate
else:
current_lr = self.lr
print('learning_rate: %s' % str(current_lr))
for batch_idx, (inputs, targets) in enumerate(self.trainloader):
if self.use_cuda:
inputs, targets = inputs.cuda(), targets.cuda()
self.optimizer.zero_grad()
inputs, targets = Variable(inputs), Variable(targets)
outputs = self.net(inputs)
loss = nn.CrossEntropyLoss()(outputs, targets)
loss.backward()
utils.clip_gradient(self.optimizer, 0.1)
self.optimizer.step()
train_loss += loss.item()
_, predicted = torch.max(outputs.data, 1)
total += targets.size(0)
correct += predicted.eq(targets.data).cpu().sum()
utils.progress_bar(batch_idx, len(self.trainloader), 'Loss: %.3f | Acc: %.3f%% (%d/%d)'
% (train_loss/(batch_idx+1), 100.*correct/total, correct, total))
Train_acc = 100.*correct/total
torch.save(self.net.state_dict(), 'base_model.pt')
return self.net
def pgd_advTraining(self, total_epoch):
self.net.train()
for epoch in range(total_epoch):
print('\nEpoch: %d' % epoch)
train_loss = 0
correct = 0
total = 0
if epoch > self.learning_rate_decay_start and self.learning_rate_decay_start >= 0:
frac = (epoch - self.learning_rate_decay_start) // self.learning_rate_decay_every
decay_factor = self.learning_rate_decay_rate ** frac
current_lr = self.lr * decay_factor
utils.set_lr(self.optimizer, current_lr) # set the decayed rate
else:
current_lr = self.lr
print('learning_rate: %s' % str(current_lr))
for batch_idx, (inputs, targets) in enumerate(self.trainloader):
if self.use_cuda:
inputs, targets = inputs.cuda(), targets.cuda()
self.optimizer.zero_grad()
inputs, targets = Variable(inputs), Variable(targets)
delta = torch.zeros_like(inputs)
for repeat_ in range(self.repeat_num):
# Generating adversarial examples
adversarial_attack = Adversarial_methods(inputs + delta, targets, self.attack_iters, self.net, self.epsilon, self.alpha)
delta = adversarial_attack.fgsm()
# Update network parameters
outputs = self.net(torch.clamp(inputs + delta, 0, 1))
loss = nn.CrossEntropyLoss()(outputs, targets)
loss.backward()
utils.clip_gradient(self.optimizer, 0.1)
self.optimizer.step()
train_loss += loss.item()
_, predicted = torch.max(outputs.data, 1)
total += targets.size(0)
correct += predicted.eq(targets.data).cpu().sum()
utils.progress_bar(batch_idx, len(self.trainloader), 'Loss: %.3f | Acc: %.3f%% (%d/%d)'
% (train_loss/(batch_idx+1), 100.*correct/total, correct, total))
Train_acc = 100.*correct/total
print("The final free_fast accuracy is :", Train_acc)
torch.save(self.net.state_dict(), 'pgd__base_model.pt')
return self.net
def fast_advTraining(self, total_epoch):
self.net.train()
for epoch in range(total_epoch):
print('\nEpoch: %d' % epoch)
train_loss = 0
correct = 0
total = 0
if epoch > self.learning_rate_decay_start and self.learning_rate_decay_start >= 0:
frac = (epoch - self.learning_rate_decay_start) // self.learning_rate_decay_every
decay_factor = self.learning_rate_decay_rate ** frac
current_lr = self.lr * decay_factor
utils.set_lr(self.optimizer, current_lr) # set the decayed rate
else:
current_lr = self.lr
print('learning_rate: %s' % str(current_lr))
for batch_idx, (inputs, targets) in enumerate(self.trainloader):
if self.use_cuda:
inputs, targets = inputs.cuda(), targets.cuda()
self.optimizer.zero_grad()
inputs, targets = Variable(inputs), Variable(targets)
# Generating adversarial examples
adversarial_attack = Adversarial_methods(inputs, targets, self.attack_iters, self.net, self.epsilon, self.alpha)
delta = adversarial_attack.rfgsm()
# Update network parameters
outputs = self.net(torch.clamp(inputs + delta, 0, 1))
loss = nn.CrossEntropyLoss()(outputs, targets)
loss.backward()
utils.clip_gradient(self.optimizer, 0.1)
self.optimizer.step()
train_loss += loss.item()
_, predicted = torch.max(outputs.data, 1)
total += targets.size(0)
correct += predicted.eq(targets.data).cpu().sum()
utils.progress_bar(batch_idx, len(self.trainloader), 'Loss: %.3f | Acc: %.3f%% (%d/%d)'
% (train_loss/(batch_idx+1), 100.*correct/total, correct, total))
Train_acc = 100.*correct/total
print("The final free_fast accuracy is :", Train_acc)
torch.save(self.net.state_dict(), 'fast_base_model.pt')
return self.net
def free_advTraining(self, total_epoch):
self.net.train()
for epoch in range(total_epoch):
print('\nEpoch: %d' % epoch)
train_loss = 0
correct = 0
total = 0
if epoch > self.learning_rate_decay_start and self.learning_rate_decay_start >= 0:
frac = (epoch - self.learning_rate_decay_start) // self.learning_rate_decay_every
decay_factor = self.learning_rate_decay_rate ** frac
current_lr = self.lr * decay_factor
utils.set_lr(self.optimizer, current_lr) # set the decayed rate
else:
current_lr = self.lr
print('learning_rate: %s' % str(current_lr))
for batch_idx, (inputs, targets) in enumerate(self.trainloader):
if self.use_cuda:
inputs, targets = inputs.cuda(), targets.cuda()
self.optimizer.zero_grad()
inputs, targets = Variable(inputs), Variable(targets)
for repeat_ in range(self.repeat_num):
# Generating adversarial examples
adversarial_attack = Adversarial_methods(inputs, targets, self.attack_iters, self.net, self.epsilon, self.alpha)
delta = adversarial_attack.fgsm()
# Update network parameters
outputs = self.net(torch.clamp(inputs + delta, 0, 1))
loss = nn.CrossEntropyLoss()(outputs, targets)
loss.backward()
utils.clip_gradient(self.optimizer, 0.1)
self.optimizer.step()
train_loss += loss.item()
_, predicted = torch.max(outputs.data, 1)
total += targets.size(0)
correct += predicted.eq(targets.data).cpu().sum()
utils.progress_bar(batch_idx, len(self.trainloader), 'Loss: %.3f | Acc: %.3f%% (%d/%d)'
% (train_loss/(batch_idx+1), 100.*correct/total, correct, total))
Train_acc = 100.*correct/total
print("The final free_fast accuracy is :", Train_acc)
torch.save(self.net.state_dict(), 'free_base_model.pt')
return self.net
训练的主程序代码如下所示
from __future__ import print_function
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torch.backends.cudnn as cudnn
import torchvision
import transforms as transforms
import numpy as np
import os
import argparse
import utils
from CKplus import CKplus
from torch.autograd import Variable
from Networks import *
from adversarial_training import Adversarial_Trainings
from config import get_args
from test_accuracy import*
from time import clock
from FER2013 import FER2013
opt = get_args()
best_Test_acc = 0 # best PrivateTest accuracy
best_Test_acc_epoch = 0
start_epoch = 0 # start from epoch 0 or last checkpoint epoch
learning_rate_decay_start = 70 # 50
learning_rate_decay_every = 5 # 5
learning_rate_decay_rate = 0.9 # 0.9
cut_size = 44
path = os.path.join(opt.dataset + '_' + opt.model, 'mn')
file_name = 'repeat_num={}'.format(opt.repeat_num)
# Data
print('==> Preparing data..')
transform_train = transforms.Compose([
transforms.RandomCrop(cut_size),
transforms.RandomHorizontalFlip(),
transforms.ToTensor(),
])
transform_test = transforms.Compose([
transforms.TenCrop(cut_size),
transforms.Lambda(lambda crops: torch.stack([transforms.ToTensor()(crop) for crop in crops])),
])
trainset = FER2013(split = 'Training', transform=transform_train)
trainloader = torch.utils.data.DataLoader(trainset, batch_size=opt.bs, shuffle=True, num_workers=0)
testset = FER2013(split = 'PublicTest', transform=transform_test)
testloader = torch.utils.data.DataLoader(testset, batch_size=opt.bs, shuffle=False, num_workers=0)
# Model
if opt.model == 'VGG19':
net = VGG('VGG19')
# multi_net = VGG_multi('VGG19')
else:
net = ResNet18()
# multi_net = ResNet18_multi()
if opt.resume:
# Load checkpoint.
print('==> Resuming from checkpoint..')
path = 'model/VGG19/model.pt'
checkpoint = torch.load(os.path.join(path))
net.load_state_dict(checkpoint)
else:
print('==> Building model..')
path_multi = 'model/VGG19/multi_model.t7'
# multi_net.load_state_dict(torch.load(path_multi))
use_cuda = torch.cuda.is_available()
if use_cuda:
net.cuda()
# multi_net.cuda()
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(net.parameters(), lr=opt.lr, momentum=0.9, weight_decay=5e-4)
if __name__ == '__main__':
start_time = clock()
adversarial_training = Adversarial_Trainings(opt.repeat_num,trainloader, use_cuda, opt.attack_iters, net, multi_net, opt.epsilon, opt.alpha, learning_rate_decay_start, learning_rate_decay_every, learning_rate_decay_rate, opt.lr, opt.multi_lr, opt.save_path, opt.gamma1, opt.gamma2)
trainned_net = adversarial_training.Stardard_Training(opt.epoch)
end_time = clock()
cost_time = end_time - start_time
attack_dict = {
'attack_iters': 1 , 'net': trainned_net, 'epsilon': 0.008, 'alpha':0.007}
Test_acc = test(use_cuda, testloader, trainned_net, criterion, None)
Test_acc_adv = test_adv(use_cuda, testloader, trainned_net, criterion, attack_dict)
Test_acc_avg_adv = test_adv(use_cuda, testloader, trainned_net, criterion, attack_dict)
if not os.path.exists(opt.save_path):
os.makedirs(opt.save_path)
# torch.save(trainned_net.state_dict(), os.path.join(opt.save_path,'model.pt'))
print('Run time: ', cost_time)
print("Test_acc: %0.3f" % Test_acc)
print('Test_acc_adv: %0.3f' % Test_acc_adv)
print('Test_acc_avg_adv: %0.3f' % Test_acc_avg_adv)