提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档


前言

我在做基于卷积神经网路的注意力机制,这两天在搞自己的算法并与相关模型进行融合,首先就是ResNet-50模型。但在此之前,需要将基础模型训练好。于是就在裸ResNet-50上利用cifar-10进行训练。捣鼓了一天,一直在调整训练超参数,当天晚上走之前改了最后一次,第二天发现测试精度达到了96%上下。特此记录,希望给同志们提供一定的参考。


一、这里我就简单概括一下。(我也想写具体点,但是没做详细的记录,cao!)

给我冲!

首先强调一下,训练模型时最基本的方式逻辑不要搞错,之前我就是因为训练过程中的动态改变学习率出了问题,精度一直在70%多。

1.数据增强

对模型精度提高最明显的是数据增强。我的模型原本的数据增强是随机水平翻转,精度为80%多一点,但是后来我加了:
1)填充至40x40再随机裁剪32x32
2)Cutout
后,精度增加至了88%左右。所以没有加足够的数据增强,训练的结果就是训练精度很高,测试精度很低,过拟合很严重。其他现在很流行的数据增强方式还有random erasing、cutout、mixup、cutmix。除了cutout其余的我都没有试过,大家可以根据情况试一试。

2.模型结构

之前我也训练过裸ResNet-50结构(Pytorch自己的模型),但是和自己写的模型相比,在同样的超参条件下,我的模型测试精度可以达到94%,而裸ResNet-50顶多也就87%上下。当时没有想到改模型的结构。但是问题恰恰就是它的结构。不知道小伙伴有没有意识到这个问题。在前面数据增强的基础上,我首先将开始的卷积层,7x7的核改成3x3,池化层将3x3改成2x2,步长也相对改了,但是精度仍然只是88%左右,后来我将此池化层删掉,将前面的卷积核改成3x3,步长改为1,填充改为1,精度第二天达到96%左右。

二、代码

1.数据处理

from torchvision import transforms, datasets
from torch.utils.data import SubsetRandomSampler
import torch.utils.data as tdata
import numpy as np
import os
from PIL import Image
from torchtoolbox.transform import Cutout
def get_loader_ciafar_10(data_dir,valid_size,augmentation,batch_size,do_shuffle=False,number_workers=4, pin_memery=True):

    normalize = transforms.Normalize(mean=[0.4914, 0.4822, 0.4465],
                                      std=[0.2023, 0.1994, 0.2010])

    if augmentation:
        train_transform=transforms.Compose([
            transforms.RandomCrop(32, padding=4),
            transforms.RandomHorizontalFlip(),
            Cutout(),
            transforms.ToTensor(),
            normalize]
            )
    else:
        train_transform=transforms.Compose( [transforms.ToTensor(),
            normalize])
    
    valid_transform=transforms.Compose([transforms.ToTensor(),
            normalize]) 


    train_dataset=datasets.CIFAR10(root=data_dir, train=True, 
                                    download = True, transform=train_transform,)
    valid_dataset = datasets.CIFAR10(root=data_dir, train=True,
                                    download=True,transform=valid_transform)

    num_data=len(train_dataset)
    index_list=list(range(num_data))
    if do_shuffle:
        np.random.shuffle(index_list)
    valid_num=int(np.floor(valid_size * num_data))
    train_index, valid_index=index_list[valid_num:], index_list[:valid_num]
    train_sampler, valid_sampler = SubsetRandomSampler(train_index),SubsetRandomSampler(valid_index)

    train_loader = tdata.DataLoader(dataset=train_dataset, batch_size=batch_size,
                                    sampler=train_sampler,num_workers = number_workers,pin_memory = pin_memery)
    valid_loader = tdata.DataLoader(dataset=valid_dataset, batch_size=batch_size,
                                    sampler=valid_sampler,num_workers = number_workers,pin_memory = pin_memery)
    return train_loader, valid_loader

2.训练

"""
Trainning and validation
"""
from Data_loader import get_loader_ciafar_10 as loader
import torch
from torch import optim
import torch.nn as nn
import torch.backends.cudnn  as cudnn
import logging
import configparser
import  os, sys
sys.path.append('*****')
os.chdir(sys.path[0])#这两行大家很具自己的情况而定,我的是因为用的vscode所以组要这样的路径操作,pycharm就不用
import resnet50
from torch.utils import tensorboard as tb
import datetime
import time
INITIAL_DIR = 'Configuration.ini'


class Configer():
    @classmethod
    def get_params(cls, ini_dir: str, section: str) -> dict:
        config = configparser.ConfigParser()
        config.read(ini_dir)
        params = {}
        for key, value in config[section].items():
            params[key] = value
        return params


class Log():
    @classmethod
    def Log(cls, name, path):
        logger = logging.getLogger(name)
        logger.setLevel(logging.INFO)
        formatter = logging.Formatter('%(asctime)s %(levelname)s-8s:%(message)s')

        file_handler = logging.FileHandler(os.path.join(path,'main.log'))
        file_handler.setLevel(logging.INFO)
        file_handler.setFormatter(formatter)

        scream_handler = logging.StreamHandler()
        scream_handler.setFormatter(formatter)
        scream_handler.setLevel(logging.INFO)

        logger.addHandler(file_handler)
        logger.addHandler(scream_handler)

        return logger


class Train_model():
    def __init__(self, log, train_loader, valid_loader, net, device, criterion, max_epoch,
                 train_interval, valid_interval):
        self.logger = log
        self.train_loader, self.valid_loader = train_loader, valid_loader
        self.device = device
        self.net = net
        self.criterion = criterion
        self.max_epoch = max_epoch
        self.train_interval = train_interval
        self.valid_interval = valid_interval
        # self.device = torch.device('cuda:2' if torch.cuda.is_available() else 'cpu')

    def train(self, epoch):
        self.net.train()
        if epoch == 0: lr = 0.01
        if epoch > 0: lr = 0.1
        if epoch > 60: lr = 0.01
        if epoch > 120: lr = 0.001
        if epoch > 150: lr = 0.0008
        if epoch > 170: lr = 0.0004
        if epoch > 190: lr = 0.0002
        optimizer = optim.SGD(self.net.parameters(), lr=lr, momentum=0.9, weight_decay=5e-4)
        correct, total, loss_ = 0., 0., 0.
        for ind, data in enumerate(self.train_loader):
            inputs, labels = data
            inputs, labels = inputs.to(self.device), labels.to(self.device)
            outputs = self.net(inputs)
            _, indices = torch.max(outputs, 1)
            correct += (indices == labels).sum()
            total += labels.size()[0]
            loss = self.criterion(outputs, labels)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            loss_ += loss.item()
            writer.add_scalar('train_loss',loss_, epoch * len(train_loader) + ind +1)
            writer.add_scalar('train_acc',correct / total, epoch * len(train_loader) + ind +1)
            if (ind + 1) % self.train_interval == 0 or ind == len(self.train_loader) - 1:
                self.logger.info(
                    f'**Train**Epoch--{epoch+1}--Iter--{ind + 1}--Loss_mean={loss_ / (ind + 1):<05.2f},Acc={correct / total * 100:<05.2f}%')

    def valid(self, epoch):
        self.net.eval()
        correct, total, loss_ = 0., 0., 0.
        with torch.no_grad():
            for ind, data in enumerate(self.valid_loader):
                inputs, labels = data
                inputs, labels = inputs.to(self.device), labels.to(self.device)
                outputs = self.net(inputs)
                _, indices = torch.max(outputs, 1)
                correct += (indices == labels).sum()
                total += labels.size()[0]
                loss = self.criterion(outputs, labels)
                loss_ += loss.item()
                writer.add_scalar('val_loss', loss_, epoch * len(valid_loader) + ind +1)
                writer.add_scalar('val_acc',correct / total, epoch * len(valid_loader) + ind +1)

            self.logger.info(
                f'**Validation**Epoch--{epoch + 1}--Loss_mean={loss_ / (ind + 1):<05.2f},Acc={correct / total * 100:<05.2f}%')

    def process(self):
        start = time.time()
        for epoch in range(self.max_epoch):
            self.train(epoch)
            if (epoch + 1) % self.valid_interval == 0 or epoch == self.max_epoch - 1:
                self.valid(epoch)
        end = time.time()
        writer.close()
        self.logger.info(f'**Total time: {(end-start)/60} mins.')


if __name__ == '__main__':
    params_train = Configer.get_params(INITIAL_DIR, 'Training')
    tensorboard_path = os.path.join('./tensorboard', datetime.datetime.now().strftime('%Y-%m-%d-%H:%M:%S'))
    os.mkdir(tensorboard_path)
    logger = Log.Log('resnet50',tensorboard_path)
    writer = tb.SummaryWriter(tensorboard_path)
    device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
   
    net = resnet50()
    # net = nn.DataParallel(net, device_ids=[0, 1, 2])
    net = net.to(device)
    cudnn.benchmark = True
    criterion = nn.CrossEntropyLoss()
   
    train_loader, valid_loader = loader(data_dir= params_train['data_dir'], batch_size= int(params_train['batch_size']), valid_size= 0.1, augmentation=True, do_shuffle= True )
    model = Train_model(logger, train_loader, valid_loader, net, device, criterion,
                        max_epoch= int(params_train['max_epoch']),
                        train_interval= int(params_train['train_interval']), 
                        valid_interval= int(params_train['valid_interval']))
    model.process()

3.配置文件(Configuration.ini)

[Training]
data_dir = ******
max_epoch = 400
train_interval = 10
valid_interval = 1
batch_size = 256

这里的epoch有点大,实际上到200左右就收敛完毕了,大家可以根据自己的情况而定。

4.模型

import torch.nn as nn
from torch import Tensor
import torch
from torchvision import models
from typing import List, Optional, Type, Union, Callable, Any

def conv3x3(in_planes: int, out_planes: int, stride: int = 1, groups: int = 1, dilation: int = 1) -> nn.Conv2d:
    """3x3 convolution with padding"""
    return nn.Conv2d(in_planes, out_planes, kernel_size=3, stride=stride,
                     padding=dilation, groups=groups, bias=False, dilation=dilation)


def conv1x1(in_planes: int, out_planes: int, stride: int = 1) -> nn.Conv2d:
    """1x1 convolution"""
    return nn.Conv2d(in_planes, out_planes, kernel_size=1, stride=stride, bias=False)


class Bottleneck(nn.Module):
   
    expansion: int = 4

    def __init__(
        self,
        inplanes: int,
        planes: int,
        stride: int = 1,
        downsample: Optional[nn.Module] = None,
        groups: int = 1,
        base_width: int = 64,
        dilation: int = 1,
        norm_layer: Optional[Callable[..., nn.Module]] = None
    ) -> None:
        super(Bottleneck, self).__init__()
        if norm_layer is None:
            norm_layer = nn.BatchNorm2d
        width = int(planes * (base_width / 64.)) * groups
       
        self.conv1 = conv1x1(inplanes, width)
        self.bn1 = norm_layer(width)
        self.conv2 = conv3x3(width, width, stride, groups, dilation)
        self.bn2 = norm_layer(width)
        self.conv3 = conv1x1(width, planes * self.expansion)
        self.bn3 = norm_layer(planes * self.expansion)
        self.relu = nn.ReLU(inplace=True)
        self.downsample = downsample
        self.stride = stride

    def forward(self, x: Tensor) -> Tensor:
        identity = x

        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)

        out = self.conv2(out)
        out = self.bn2(out)
        out = self.relu(out)

        out = self.conv3(out)
        out = self.bn3(out)

        if self.downsample is not None:
            identity = self.downsample(x)

        out += identity
        out = self.relu(out)

        return out


class ResNet(nn.Module):

    def __init__(
        self,
        block: Type[Union[Bottleneck]],
        layers: List[int],
        num_classes: int = 10,
        zero_init_residual: bool = False,
        groups: int = 1,
        width_per_group: int = 64,
        norm_layer: Optional[Callable[..., nn.Module]] = None
    ) -> None:
        super(ResNet, self).__init__()
        if norm_layer is None:
            norm_layer = nn.BatchNorm2d
        self._norm_layer = norm_layer

        self.inplanes = 64
        self.dilation = 1
        
        self.groups = groups
        self.base_width = width_per_group
        self.conv1 = nn.Conv2d(3, self.inplanes, kernel_size=3, stride=1, padding=1,
                               bias=False)
        self.bn1 = norm_layer(self.inplanes)
        self.relu = nn.ReLU(inplace=True)
        self.layer1 = self._make_layer(block, 64, layers[0])
        self.layer2 = self._make_layer(block, 128, layers[1], stride=2,)
        self.layer3 = self._make_layer(block, 256, layers[2], stride=2,)
        self.layer4 = self._make_layer(block, 512, layers[3], stride=2,)
        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
        self.fc = nn.Linear(512 * block.expansion, num_classes)

        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
            elif isinstance(m, (nn.BatchNorm2d, nn.GroupNorm)):
                nn.init.constant_(m.weight, 1)
                nn.init.constant_(m.bias, 0)

        if zero_init_residual:
            for m in self.modules():
                if isinstance(m, Bottleneck):
                    nn.init.constant_(m.bn3.weight, 0)  # type: ignore[arg-type]
                else: 
                    raise ValueError('not Bottleneck')

    def _make_layer(self, block: Type[Union[Bottleneck]], planes: int, blocks: int,
                    stride: int = 1,) -> nn.Sequential:
        norm_layer = self._norm_layer
        downsample = None
        previous_dilation = self.dilation
        if stride != 1 or self.inplanes != planes * block.expansion:
            downsample = nn.Sequential(
                conv1x1(self.inplanes, planes * block.expansion, stride),
                norm_layer(planes * block.expansion),
            )

        layers = []
        layers.append(block(self.inplanes, planes, stride, downsample, self.groups,
                            self.base_width, previous_dilation, norm_layer))
        self.inplanes = planes * block.expansion
        for _ in range(1, blocks):
            layers.append(block(self.inplanes, planes, groups=self.groups,
                                base_width=self.base_width, dilation=self.dilation,
                                norm_layer=norm_layer))

        return nn.Sequential(*layers)

    def _forward_impl(self, x: Tensor) -> Tensor:
        # See note [TorchScript super()]
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        # x = self.maxpool(x)

        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)

        x = self.avgpool(x)
        x = torch.flatten(x, 1)
        x = self.fc(x)

        return x

    def forward(self, x: Tensor) -> Tensor:
        return self._forward_impl(x)




def resnet50(pretrained: bool = False, progress: bool = True, **kwargs: Any) -> ResNet:
   
    return ResNet(Bottleneck, [3, 4, 6, 3])

总结

当然还有过很多其他的参数设置,这里我就分享一个对我启发很大的一个文章吧。

Logo

DAMO开发者矩阵,由阿里巴巴达摩院和中国互联网协会联合发起,致力于探讨最前沿的技术趋势与应用成果,搭建高质量的交流与分享平台,推动技术创新与产业应用链接,围绕“人工智能与新型计算”构建开放共享的开发者生态。

更多推荐