from PIL import Image
import pandas as pd
import numpy as np
import torchvision.transforms as transforms
from torch.utils.data import Dataset, DataLoader
import os
#放文件的路徑
dir_path= './97/train/'
csv_path='./97/train.csv'
class Mydataset(Dataset):
#傳遞數(shù)據(jù)路徑,csv路徑 ,數(shù)據(jù)增強(qiáng)方法
def __init__(self, dir_path,csv, transform=None, target_transform=None):
super(Mydataset, self).__init__()
#一個(gè)個(gè)往列表里面加絕對路徑
self.path = []
#讀取csv
self.data = pd.read_csv(csv)
#對標(biāo)簽進(jìn)行硬編碼,例如0 1 2 3 4,把字母變成這個(gè)
colorMap = {elem: index + 1 for index, elem in enumerate(set(self.data["label"]))}
self.data['label'] = self.data['label'].map(colorMap)
#創(chuàng)造空的label準(zhǔn)備存放標(biāo)簽
self.num = int(self.data.shape[0]) # 一共多少照片
self.label = np.zeros(self.num, dtype=np.int32)
#迭代得到數(shù)據(jù)路徑和標(biāo)簽一一對應(yīng)
for index, row in self.data.iterrows():
self.path.append(os.path.join(dir_path,row['filename']))
self.label[index] = row['label'] # 將數(shù)據(jù)全部讀取出來
#訓(xùn)練數(shù)據(jù)增強(qiáng)
self.transform = transform
#驗(yàn)證數(shù)據(jù)增強(qiáng)在這里沒用
self.target_transform = target_transform
#最關(guān)鍵的部分,在這里使用前面的方法
def __getitem__(self, index):
img =Image.open(self.path[index]).convert('RGB')
labels = self.label[index]
#在這里做數(shù)據(jù)增強(qiáng)
if self.transform is not None:
img = self.transform(img) # 轉(zhuǎn)化tensor類型
return img, labels
def __len__(self):
return len(self.data)
#數(shù)據(jù)增強(qiáng)的具體內(nèi)容
transform = transforms.Compose(
[transforms.ToTensor(),
transforms.Resize(150),
transforms.CenterCrop(150),
transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))]
)
#加載數(shù)據(jù)
train_data = Mydataset(dir_path=dir_path,csv=csv_path, transform=transform)
trainloader = DataLoader(train_data, batch_size=16, shuffle=True, num_workers=0)
#迭代訓(xùn)練
for i_batch,batch_data in enumerate(trainloader):
image,label=batch_data
import os
import sys
import numpy as np
import cv2
import torch
import torch.nn as nn
import torchvision.transforms as transforms
from torch.utils.data import DataLoader, Dataset
import time
import random
import csv
from PIL import Image
def createImgIndex(dataPath, ratio):
'''
讀取目錄下面的圖片制作包含圖片信息、圖片label的train.txt和val.txt
dataPath: 圖片目錄路徑
ratio: val占比
return:label列表
'''
fileList = os.listdir(dataPath)
random.shuffle(fileList)
classList = [] # label列表
# val 數(shù)據(jù)集制作
with open('data/val_section1015.csv', 'w') as f:
writer = csv.writer(f)
for i in range(int(len(fileList)*ratio)):
row = []
if '.jpg' in fileList[i]:
fileInfo = fileList[i].split('_')
sectionName = fileInfo[0] + '_' + fileInfo[1] # 切面名+標(biāo)準(zhǔn)與否
row.append(os.path.join(dataPath, fileList[i])) # 圖片路徑
if sectionName not in classList:
classList.append(sectionName)
row.append(classList.index(sectionName))
writer.writerow(row)
f.close()
# train 數(shù)據(jù)集制作
with open('data/train_section1015.csv', 'w') as f:
writer = csv.writer(f)
for i in range(int(len(fileList) * ratio)+1, len(fileList)):
row = []
if '.jpg' in fileList[i]:
fileInfo = fileList[i].split('_')
sectionName = fileInfo[0] + '_' + fileInfo[1] # 切面名+標(biāo)準(zhǔn)與否
row.append(os.path.join(dataPath, fileList[i])) # 圖片路徑
if sectionName not in classList:
classList.append(sectionName)
row.append(classList.index(sectionName))
writer.writerow(row)
f.close()
print(classList, len(classList))
return classList
def default_loader(path):
'''定義讀取文件的格式'''
return Image.open(path).resize((128, 128),Image.ANTIALIAS).convert('RGB')
class MyDataset(Dataset):
'''Dataset類是讀入數(shù)據(jù)集數(shù)據(jù)并且對讀入的數(shù)據(jù)進(jìn)行索引'''
def __init__(self, txt, transform=None, target_transform=None, loader=default_loader):
super(MyDataset, self).__init__() #對繼承自父類的屬性進(jìn)行初始化
fh = open(txt, 'r') #按照傳入的路徑和txt文本參數(shù),以只讀的方式打開這個(gè)文本
reader = csv.reader(fh)
imgs = []
for row in reader:
imgs.append((row[0], int(row[1]))) # (圖片信息,lable)
self.imgs = imgs
self.transform = transform
self.target_transform = target_transform
self.loader = loader
def __getitem__(self, index):
'''用于按照索引讀取每個(gè)元素的具體內(nèi)容'''
# fn是圖片path #fn和label分別獲得imgs[index]也即是剛才每行中row[0]和row[1]的信息
fn, label = self.imgs[index]
img = self.loader(fn)
if self.transform is not None:
img = self.transform(img) #數(shù)據(jù)標(biāo)簽轉(zhuǎn)換為Tensor
return img, label
def __len__(self):
'''返回?cái)?shù)據(jù)集的長度'''
return len(self.imgs)
class Model(nn.Module):
def __init__(self, classNum=31):
super(Model, self).__init__()
# torch.nn.Conv2d(in_channels, out_channels, kernel_size, stride, padding)
# torch.nn.MaxPool2d(kernel_size, stride, padding)
# input 維度 [3, 128, 128]
self.cnn = nn.Sequential(
nn.Conv2d(3, 64, 3, 1, 1), # [64, 128, 128]
nn.BatchNorm2d(64),
nn.ReLU(),
nn.MaxPool2d(2, 2, 0), # [64, 64, 64]
nn.Conv2d(64, 128, 3, 1, 1), # [128, 64, 64]
nn.BatchNorm2d(128),
nn.ReLU(),
nn.MaxPool2d(2, 2, 0), # [128, 32, 32]
nn.Conv2d(128, 256, 3, 1, 1), # [256, 32, 32]
nn.BatchNorm2d(256),
nn.ReLU(),
nn.MaxPool2d(2, 2, 0), # [256, 16, 16]
nn.Conv2d(256, 512, 3, 1, 1), # [512, 16, 16]
nn.BatchNorm2d(512),
nn.ReLU(),
nn.MaxPool2d(2, 2, 0), # [512, 8, 8]
nn.Conv2d(512, 512, 3, 1, 1), # [512, 8, 8]
nn.BatchNorm2d(512),
nn.ReLU(),
nn.MaxPool2d(2, 2, 0), # [512, 4, 4]
)
self.fc = nn.Sequential(
nn.Linear(512 * 4 * 4, 1024),
nn.ReLU(),
nn.Linear(1024, 512),
nn.ReLU(),
nn.Linear(512, classNum)
)
def forward(self, x):
out = self.cnn(x)
out = out.view(out.size()[0], -1)
return self.fc(out)
def train(train_set, train_loader, val_set, val_loader):
model = Model()
loss = nn.CrossEntropyLoss() # 因?yàn)槭欠诸惾蝿?wù),所以loss function使用 CrossEntropyLoss
optimizer = torch.optim.Adam(model.parameters(), lr=0.001) # optimizer 使用 Adam
num_epoch = 10
# 開始訓(xùn)練
for epoch in range(num_epoch):
epoch_start_time = time.time()
train_acc = 0.0
train_loss = 0.0
val_acc = 0.0
val_loss = 0.0
model.train() # train model會(huì)開放Dropout和BN
for i, data in enumerate(train_loader):
optimizer.zero_grad() # 用 optimizer 將 model 參數(shù)的 gradient 歸零
train_pred = model(data[0]) # 利用 model 的 forward 函數(shù)返回預(yù)測結(jié)果
batch_loss = loss(train_pred, data[1]) # 計(jì)算 loss
batch_loss.backward() # tensor(item, grad_fn=NllLossBackward>)
optimizer.step() # 以 optimizer 用 gradient 更新參數(shù)
train_acc += np.sum(np.argmax(train_pred.data.numpy(), axis=1) == data[1].numpy())
train_loss += batch_loss.item()
model.eval()
with torch.no_grad(): # 不跟蹤梯度
for i, data in enumerate(val_loader):
# data = [imgData, labelList]
val_pred = model(data[0])
batch_loss = loss(val_pred, data[1])
val_acc += np.sum(np.argmax(val_pred.data.numpy(), axis=1) == data[1].numpy())
val_loss += batch_loss.item()
# 打印結(jié)果
print('[%03d/%03d] %2.2f sec(s) Train Acc: %3.6f Loss: %3.6f | Val Acc: %3.6f loss: %3.6f' % \
(epoch + 1, num_epoch, time.time() - epoch_start_time, \
train_acc / train_set.__len__(), train_loss / train_set.__len__(), val_acc / val_set.__len__(),
val_loss / val_set.__len__()))
if __name__ == '__main__':
dirPath = '/data/Matt/QC_images/test0916' # 圖片文件目錄
createImgIndex(dirPath, 0.2) # 創(chuàng)建train.txt, val.txt
root = os.getcwd() + '/data/'
train_data = MyDataset(txt=root+'train_section1015.csv', transform=transforms.ToTensor())
val_data = MyDataset(txt=root+'val_section1015.csv', transform=transforms.ToTensor())
train_loader = DataLoader(dataset=train_data, batch_size=6, shuffle=True, num_workers = 4)
val_loader = DataLoader(dataset=val_data, batch_size=6, shuffle=False, num_workers = 4)
# 開始訓(xùn)練模型
train(train_data, train_loader, val_data, val_loader)