import pickle import torch import numpy as np import cv2 import os import h5py import rawpy from matplotlib import pyplot as plt from torch.utils.data import Dataset, DataLoader from tqdm import tqdm from .unprocess import mosaic, unprocess, random_gains from .process import * from utils import * def bayer_aug(rggb, k=0): bayer = rggb2bayer(rggb) bayer = np.rot90(bayer, k=k, axes=(-2,-1)) rggb = bayer2rggb(bayer) return rggb # def get_threshold(data, step=1, mode='score2', print=False): # if mode == 'naive': # quants = list(range(step, 100, step)) # ths = np.percentile(data.reshape(-1), quants, method='linear') # ths = np.array(ths.tolist() + [data.max()]) # th = ths[0] # diffs = ths[1:] - ths[:-1] # gap = data.max() / (100/step)**2 # for i in range(len(diffs)): # if diffs[i] > gap: # if print: log(f'Adaptive percent: {quants[i]}% - th: {ths[i]*959:.2f}') # th = ths[i] # break # elif mode == 'score': # quants = np.linspace(step, 100, 100//step) # ths = np.percentile(data.reshape(-1), quants, method='linear') # diffs = ths[1:] - ths[:-1] # quants = quants[:-1] # score = diffs/quants # i = np.argmin(score) # th = ths[i] # if print: log(f'Adaptive percent: {quants[i]}% - th: {ths[i]*959:.2f}') # return th # Raw AWGN Dataset(Raw->Raw) class SID_Raw_Dataset(Dataset): def __init__(self, args=None): super().__init__() self.default_args() if args is not None: for key in args: self.args[key] = args[key] self.initialization() def default_args(self): self.args = {} self.args['root_dir'] = 'SID' self.args['crop_per_image'] = 8 self.args['crop_size'] = 512 self.args['ori'] = False self.args['iso'] = None self.args['dgain'] = None self.args['params'] = None self.args['lock_wb'] = False self.args['gpu_preprocess'] = False self.args['dstname'] = 'SID' self.args['mode'] = 'train' self.args['wp'] = 16383 self.args['bl'] = 512 self.args['command'] = '' def initialization(self): # 获取数据地址 self.suffix = 'npy' self.root_dir = self.args['root_dir'] self.mode = self.args['mode'] self.dataset_file = f'SID_{self.args["mode"]}.info' with open(f"infos/{self.dataset_file}", 'rb') as info_file: self.infos = pkl.load(info_file) print(f'>> Successfully load "{self.dataset_file}" (Length: {len(self.infos)})') self.datapath = [info['long'] for info in self.infos] self.names = [info['name'] for info in self.infos] self.buffer = [None] * len(self.infos) if 'cache' in self.args['command']: log(f'Loading {len(self.infos)} crops!!!') self.buffer = [dataload(path) for path in tqdm(self.datapath)] self.length = len(self.infos) self.sigma = -1 self.get_shape() def __len__(self): return self.length def get_shape(self): self.H, self.W = self.args['H'], self.args['W'] self.C = 3 self.h = self.H // 2 self.w = self.W // 2 self.c = 4 def data_aug(self, data, mode=0): if mode == 0: return data rot = mode % 4 flip = mode // 4 data = np.rot90(data, k=rot, axes=(-2, -1)) if flip: data = data[..., ::-1] return data def init_random_crop_point(self, mode='non-overlapped', raw_crop=False): self.h_start = [] self.w_start = [] self.h_end = [] self.w_end = [] self.aug = np.random.randint(8, size=self.args['crop_per_image']) h, w = self.h, self.w if raw_crop: h, w = self.H, self.W if mode == 'non-overlapped': nh = h // self.args["patch_size"] nw = w // self.args["patch_size"] h_start = np.random.randint(0, h - nh*self.args["patch_size"] + 1) w_start = np.random.randint(0, w - nw*self.args["patch_size"] + 1) for i in range(nh): for j in range(nw): self.h_start.append(h_start + i * self.args["patch_size"]) self.w_start.append(w_start + j * self.args["patch_size"]) self.h_end.append(h_start + (i+1) * self.args["patch_size"]) self.w_end.append(w_start + (j+1) * self.args["patch_size"]) else: # random_crop for i in range(self.args['crop_per_image']): h_start = np.random.randint(0, h - self.args["patch_size"] + 1) w_start = np.random.randint(0, w - self.args["patch_size"] + 1) self.h_start.append(h_start) self.w_start.append(w_start) self.h_end.append(h_start + self.args["patch_size"]) self.w_end.append(w_start + self.args["patch_size"]) def random_crop(self, img): # 本函数用于将numpy随机裁剪成以crop_size为边长的方形crop_per_image等份 c, h, w = img.shape # 创建空numpy做画布, [crops, h, w] crops = np.empty((self.args["crop_per_image"], c, self.args["patch_size"], self.args["patch_size"]), dtype=np.float32) # 往空tensor的通道上贴patchs for i in range(self.args["crop_per_image"]): crop = img[:, self.h_start[i]:self.h_end[i], self.w_start[i]:self.w_end[i]] # crop = self.data_aug(crop, mode=self.aug[i]) # 会导致格子纹 crops[i] = crop return crops def __getitem__(self, idx): data = {} # 读取数据 data['wb'] = self.infos[idx]['wb'] data['ccm'] = self.infos[idx]['ccm'] data['name'] = self.infos[idx]['name'] if self.buffer[idx] is None: self.buffer[idx] = dataload(self.datapath[idx]) hr_raw = (self.buffer[idx].astype(np.float32) - self.args['bl']) / (self.args['wp'] - self.args['bl']) # BayerAug, 这个旋转是用来改变bayer模式的 data['pattern'] = np.random.randint(4) if self.args["mode"] == 'train' else idx%4 hr_raw = np.rot90(hr_raw, k=data['pattern'], axes=(-2,-1)) hr_raw = bayer2rggb(hr_raw).clip(0, 1).transpose(2,0,1) # 模拟VST后的gt数值范围 y=sqrt(x+3/8)≈sqrt(x) data['vst_aug'] = True if np.random.randint(2) else False hr_raw = hr_raw ** 0.5 if data['vst_aug'] else hr_raw if self.args["mode"] == 'train': # 随机裁剪成crop_per_image份 self.init_random_crop_point(mode=self.args['croptype'], raw_crop=False) if data['pattern'] % 2: self.h_start, self.h_end, self.w_start, self.w_end = self.w_start, self.w_end, self.h_start, self.h_end hr_crops = self.random_crop(hr_raw) else: setup_seed(idx) hr_crops = hr_raw[None,:] lr_shape = hr_crops.shape if self.args["lock_wb"] is False and np.random.randint(2): rgb_gain, red_gain, blue_gain = random_gains() red_gain = data['wb'][0] / red_gain.numpy() blue_gain = data['wb'][2] / blue_gain.numpy() hr_crops *= rgb_gain.numpy() hr_crops[:,0] = hr_crops[:,0] * red_gain hr_crops[:,2] = hr_crops[:,2] * blue_gain data['wb'][0] = red_gain data['wb'][2] = blue_gain lr_crops = hr_crops.copy() # 人工加噪声 if self.args['gpu_preprocess'] is False: if self.args['mode'] == 'train': lower, upper = np.log(self.args['sigma_min']), np.log(self.args['sigma_max']) data['sigma'] = np.exp(np.random.rand()*(upper-lower)+lower) / 255. else: data['sigma'] = self.sigma setup_seed(idx) noise = np.random.randn(*lr_crops.shape) * data['sigma'] lr_crops += noise data["lr"] = np.ascontiguousarray(lr_crops) data["hr"] = np.ascontiguousarray(hr_crops) if self.args['clip']: data["lr"] = lr_crops.clip(0, 1) data["hr"] = hr_crops.clip(0, 1) return data # Unprocess Synthetic Dataset(sRGB->Raw) class RGB_Img2Raw_Dataset(Dataset): def __init__(self, args=None): super().__init__() self.default_args() if args is not None: for key in args: self.args[key] = args[key] self.initialization() def default_args(self): self.args = {} self.args['root_dir'] = 'YOND' self.args['crop_size'] = 256 self.args['ori'] = False self.args['iso'] = None self.args['dgain'] = None self.args['params'] = None self.args['lock_wb'] = False self.args['gpu_preprocess'] = False self.args['dstname'] = 'YOND' self.args['mode'] = 'train' self.args['command'] = '' def initialization(self): # 获取数据地址 self.suffix = 'npy' self.root_dir = self.args['root_dir'] self.mode = self.args['mode'] self.data_dir = f"{self.root_dir}/{self.mode}" if self.mode == 'train': self.data_dir += f'_{self.args["subname"]}' self.datapath = sorted(glob.glob(f'{self.data_dir}/*.{self.suffix}')) self.names = [os.path.basename(path)[:-4] for path in self.datapath] self.infos = [{'name':name, 'path':path} for name, path in zip(self.names, self.datapath)] self.buffer = [None] * len(self.infos) if 'cache' in self.args['command']: log(f'Loading {len(self.infos)} crops!!!') self.buffer = [dataload(path) for path in tqdm(self.datapath)] self.length = len(self.infos) self.sigma = -1 self.get_shape() log(f'Successfully cache {self.length} npy data!!!') def __len__(self): return self.length def get_shape(self): self.H, self.W = self.args['H'], self.args['W'] self.C = 3 self.h = self.H // 2 self.w = self.W // 2 self.c = 4 def data_aug(self, data, mode=0): if mode == 0: return data rot = mode % 4 flip = mode // 4 data = np.rot90(data, k=rot, axes=(-2, -1)) if flip: data = data[..., ::-1] return data def __getitem__(self, idx): data = {} # 读取数据 data['name'] = self.infos[idx]['name'] if self.buffer[idx] is None: max_val = 255. hr_imgs = dataload(self.datapath[idx]).astype(np.float32) / max_val else: max_val = 65535. if self.buffer[idx].dtype == np.uint16 else 255. hr_imgs = self.buffer[idx].astype(np.float32) / max_val if self.args["mode"] == 'train': data['aug_id1'] = np.random.randint(8) self.data_aug(hr_imgs, data['aug_id1']) else: setup_seed(idx) hr_crops = hr_imgs # RAW需要复杂的unproces lr_shape = hr_crops.shape hr_crops = torch.from_numpy(hr_crops) hr_crops, metadata = unprocess(hr_crops, lock_wb=self.args["lock_wb"], use_gpu=self.args['gpu_preprocess']) data['wb'] = np.array([metadata['red_gain'].item(), 1., metadata['blue_gain'].item()]) data['ccm'] = metadata['cam2rgb'].numpy() if 'blur' in self.args['command']: lr_crops = hr_crops.clone().numpy() lr_crops = cv2.GaussianBlur(lr_crops, (3,3), -1) red = lr_crops[0::2, 0::2, 0] green_red = lr_crops[0::2, 1::2, 1] green_blue = lr_crops[1::2, 0::2, 1] blue = lr_crops[1::2, 1::2, 2] lr_crops = np.stack((red, green_red, green_blue, blue), axis=0) hr_crops = mosaic(hr_crops).numpy() # rggb # 这个旋转是用来改变bayer模式的 if 'no_bayeraug' in self.args["command"]: data['pattern'] = 0 else: if self.args["mode"] == 'train': data['pattern'] = np.random.randint(4) else: data['pattern'] = idx%4 hr_crops = bayer_aug(hr_crops, k=data['pattern']) data['vst_aug'] = False hr_crops = hr_crops ** 0.5 if data['vst_aug'] else hr_crops # [crops,h,w,c] -> [crops,c,h,w] hr_crops = hr_crops.transpose(2,0,1) if 'blur' not in self.args['command']: lr_crops = hr_crops.copy() # 人工加噪声 if self.args['gpu_preprocess'] is False: if self.args['mode'] == 'train': lower, upper = np.log(self.args['sigma_min']), np.log(self.args['sigma_max']) data['sigma'] = np.exp(np.random.rand()*(upper-lower)+lower) / 255. else: data['sigma'] = self.sigma setup_seed(idx) noise = np.random.randn(*lr_crops.shape) * data['sigma'] lr_crops += noise data["lr"] = np.ascontiguousarray(lr_crops) data["hr"] = np.ascontiguousarray(hr_crops) if self.args['clip']: data["lr"] = lr_crops.clip(0, 1) data["hr"] = hr_crops.clip(0, 1) return data # Unprocess Synthetic Dataset(sRGB->Raw) class RGB_Img2Raw_Syn_Dataset(Dataset): def __init__(self, args=None): super().__init__() self.default_args() if args is not None: for key in args: self.args[key] = args[key] self.initialization() def default_args(self): self.args = {} self.args['root_dir'] = 'YOND' self.args['crop_size'] = 256 self.args['ori'] = False self.args['iso'] = None self.args['dgain'] = None self.args['params'] = None self.args['lock_wb'] = False self.args['gpu_preprocess'] = False self.args['dstname'] = 'YOND' self.args['mode'] = 'train' self.args['command'] = '' def initialization(self): # 获取数据地址 self.suffix = 'npy' self.root_dir = self.args['root_dir'] self.mode = self.args['mode'] self.data_dir = f"{self.root_dir}/{self.mode}" if self.mode == 'train': self.data_dir += f'_{self.args["subname"]}' self.datapath = sorted(glob.glob(f'{self.data_dir}/*.{self.suffix}')) self.names = [os.path.basename(path)[:-4] for path in self.datapath] self.infos = [{'name':name, 'path':path} for name, path in zip(self.names, self.datapath)] self.buffer = [None] * len(self.infos) if 'cache' in self.args['command']: log(f'Loading {len(self.infos)} crops!!!') self.buffer = [dataload(path) for path in tqdm(self.datapath)] self.length = len(self.infos) self.sigma = -1 self.get_shape() log(f'Successfully cache {self.length} npy data!!!') def __len__(self): return self.length def get_shape(self): self.H, self.W = self.args['H'], self.args['W'] self.C = 3 self.h = self.H // 2 self.w = self.W // 2 self.c = 4 def data_aug(self, data, mode=0): if mode == 0: return data rot = mode % 4 flip = mode // 4 data = np.rot90(data, k=rot, axes=(-2, -1)) if flip: data = data[..., ::-1] return data def __getitem__(self, idx): data = {} # 读取数据 data['name'] = self.infos[idx]['name'] if self.buffer[idx] is None: self.buffer[idx] = dataload(self.datapath[idx]) max_val = 65535. if self.buffer[idx].dtype == np.uint16 else 255. hr_imgs = self.buffer[idx].astype(np.float32) / max_val if self.args["mode"] == 'train': data['aug_id1'] = np.random.randint(8) self.data_aug(hr_imgs, data['aug_id1']) else: setup_seed(idx) hr_crops = hr_imgs # RAW需要复杂的unproces lr_shape = hr_crops.shape hr_crops = torch.from_numpy(hr_crops) hr_crops, metadata = unprocess(hr_crops, lock_wb=self.args["lock_wb"], use_gpu=self.args['gpu_preprocess']) data['wb'] = np.array([metadata['red_gain'].item(), 1., metadata['blue_gain'].item()]) data['ccm'] = metadata['cam2rgb'].numpy() hr_crops = mosaic(hr_crops).numpy() # rgbg # 这个旋转是用来改变bayer模式的 if 'no_bayeraug' in self.args["command"]: data['pattern'] = 0 else: if self.args["mode"] == 'train': data['pattern'] = np.random.randint(4) else: data['pattern'] = idx%4 hr_crops = bayer_aug(hr_crops, k=data['pattern']) data['vst_aug'] = False hr_crops = hr_crops ** 0.5 if data['vst_aug'] else hr_crops # [crops,h,w,c] -> [crops,c,h,w] hr_crops = hr_crops.transpose(2,0,1) lr_crops = hr_crops.copy() # 人工加噪声 if self.args['gpu_preprocess'] is False: if self.args['mode'] == 'train': p = sample_params(self.args['camera_type']) p['ratio'] = np.random.rand(1) * 19 + 1 lr_crops = generate_noisy_obs(lr_crops, param=p, noise_code=self.args['noise_code'], ori=self.args['ori']) if self.args['ori']: hr_crops = hr_crops / p['ratio'] else: p['K'] = p['K'] * p['ratio'] p['sigGs'] = p['sigGs'] * p['ratio'] hr_crops = hr_crops * (p['wp'] - p['bl']) lr_crops = lr_crops * (p['wp'] - p['bl']) bias = close_form_bias(hr_crops, p['sigGs'], p['K']) hr_crops = VST(hr_crops, p['sigGs'], gain=p['K']) lr_crops = VST(lr_crops, p['sigGs'], gain=p['K']) lr_crops = lr_crops - bias lower = VST(0, p['sigGs'], gain=p['K']) upper = VST((p['wp'] - p['bl']), p['sigGs'], gain=p['K']) nsr = 1 / (upper - lower) hr_crops = (hr_crops - lower) / (upper - lower) lr_crops = (lr_crops - lower) / (upper - lower) data['sigma'] = nsr else: data['sigma'] = self.sigma setup_seed(idx) noise = np.random.randn(*lr_crops.shape) * data['sigma'] lr_crops += noise data["lr"] = np.ascontiguousarray(lr_crops) data["hr"] = np.ascontiguousarray(hr_crops) if self.args['clip']: data["lr"] = lr_crops.clip(0, 1) data["hr"] = hr_crops.clip(0, 1) return data # Unprocess Synthetic Dataset(sRGB->Raw) class RGB_Img2Raw_SFRN_Dataset(Dataset): def __init__(self, args=None): super().__init__() self.default_args() if args is not None: for key in args: self.args[key] = args[key] self.initialization() def default_args(self): self.args = {} self.args['root_dir'] = 'YOND' self.args['crop_size'] = 256 self.args['ori'] = False self.args['iso'] = None self.args['dgain'] = None self.args['params'] = None self.args['lock_wb'] = False self.args['gpu_preprocess'] = False self.args['dstname'] = 'YOND' self.args['mode'] = 'train' self.args['command'] = '' def initialization(self): # 获取数据地址 self.suffix = 'npy' self.root_dir = self.args['root_dir'] self.mode = self.args['mode'] self.data_dir = f"{self.root_dir}/{self.mode}" if self.mode == 'train': self.data_dir += f'_{self.args["subname"]}' self.datapath = sorted(glob.glob(f'{self.data_dir}/*.{self.suffix}')) self.names = [os.path.basename(path)[:-4] for path in self.datapath] self.infos = [{'name':name, 'path':path} for name, path in zip(self.names, self.datapath)] self.buffer = [None] * len(self.infos) if 'cache' in self.args['command']: log(f'Loading {len(self.infos)} crops!!!') self.buffer = [dataload(path) for path in tqdm(self.datapath)] self.legal_iso = sorted([int(iso) for iso in os.listdir(f"{self.args['bias_dir']}")]) self.bias_frames = {iso: sorted(glob.glob(f"{self.args['bias_dir']}/{iso}/*.npy")) for iso in self.legal_iso} # self.buffer_lr = [bayer2rggb(dataload(path)).transpose(2,0,1) for path in tqdm(self.bias_frames)] self.buffer_lr = {} for iso in self.legal_iso: log(f'Loading {len(self.bias_frames[iso])} bias frames (ISO-{iso})!!!') self.buffer_lr[iso] = [bayer2rggb(dataload(path)).transpose(2,0,1) for path in tqdm(self.bias_frames[iso])] self.bias_lut = BiasLUT() self.length = len(self.infos) self.sigma = -1 self.get_shape() log(f'Successfully cache {self.length} npy data!!!') def __len__(self): return self.length def get_shape(self): self.H, self.W = self.args['H'], self.args['W'] self.C = 3 self.h = self.H // 2 self.w = self.W // 2 self.c = 4 def data_aug(self, data, mode=0): if mode == 0: return data rot = mode % 4 flip = mode // 4 data = np.rot90(data, k=rot, axes=(-2, -1)) if flip: data = data[..., ::-1] return data def __getitem__(self, idx): data = {} # 读取数据 data['name'] = self.infos[idx]['name'] hr_imgs = dataload(self.datapath[idx]) max_val = 65535. if hr_imgs.dtype == np.uint16 else 255. hr_imgs = hr_imgs.astype(np.float32) / max_val if self.args["mode"] == 'train': data['aug_id1'] = np.random.randint(8) self.data_aug(hr_imgs, data['aug_id1']) else: setup_seed(idx) hr_crops = hr_imgs # RAW需要复杂的unproces lr_shape = hr_crops.shape hr_crops = torch.from_numpy(hr_crops) hr_crops, metadata = unprocess(hr_crops, lock_wb=self.args["lock_wb"], use_gpu=self.args['gpu_preprocess']) data['wb'] = np.array([metadata['red_gain'].item(), 1., metadata['blue_gain'].item()]) data['ccm'] = metadata['cam2rgb'].numpy() if 'blur' in self.args['command']: lr_crops = hr_crops.clone().numpy() lr_crops = cv2.GaussianBlur(lr_crops, (3,3), 0.4) red = lr_crops[0::2, 0::2, 0] green_red = lr_crops[0::2, 1::2, 1] green_blue = lr_crops[1::2, 0::2, 1] blue = lr_crops[1::2, 1::2, 2] lr_crops = np.stack((red, green_red, green_blue, blue), axis=0) hr_crops = mosaic(hr_crops).numpy() # rggb # 这个旋转是用来改变bayer模式的 if 'no_bayeraug' in self.args["command"]: data['pattern'] = 0 else: if self.args["mode"] == 'train': data['pattern'] = np.random.randint(4) else: data['pattern'] = idx%4 hr_crops = bayer_aug(hr_crops, k=data['pattern']) data['vst_aug'] = False hr_crops = hr_crops ** 0.5 if data['vst_aug'] else hr_crops # [crops,h,w,c] -> [crops,c,h,w] hr_crops = hr_crops.transpose(2,0,1) if 'blur' not in self.args['command']: lr_crops = hr_crops.copy() data['ISO'] = iso = self.legal_iso[np.random.randint(len(self.legal_iso))] idr = np.random.randint(len(self.buffer_lr[iso])) dh, dw = self.args['patch_size']//2, self.args['patch_size']//2 xx = np.random.randint(self.h - dh + 1) yy = np.random.randint(self.w - dw + 1) black_crops = self.buffer_lr[iso][idr][:, xx:xx+dh, yy:yy+dw].copy() # 人工加噪声 data['ratio'] = np.ones(1) if self.args['gpu_preprocess'] is False: if self.args['mode'] == 'train': p = sample_params_max(self.args['camera_type'], iso=iso) p['ratio'] = np.exp(np.random.rand() * 4) data['ratio'] = p['ratio'] y = lr_crops * (p['wp'] - p['bl']) / p['ratio'] y = np.random.poisson(y/p['K']).astype(np.float32) * p['K'] n_read = black_crops n_read += np.random.randn(y.shape[-3], y.shape[-2], 1).astype(np.float32) * p['sigR'] n_read += np.random.uniform(low=-0.5, high=0.5, size=y.shape) n_read += np.random.randn() * 0.03 # BLE y += n_read lr_crops = y / (p['wp'] - p['bl']) p['sigGs'] = n_read.std() if self.args['ori']: hr_crops = hr_crops / p['ratio'] else: lr_crops = lr_crops * p['ratio'] p['K'] = p['K'] * p['ratio'] p['sigGs'] = p['sigGs'] * p['ratio'] hr_crops = hr_crops * (p['wp'] - p['bl']) lr_crops = lr_crops * (p['wp'] - p['bl']) bias = self.bias_lut.get_lut(lr_crops.clip(0,None), p['sigGs'], p['K']) # bias = close_form_bias(lr_crops.clip(0,None), p['sigGs'], p['K']) hr_crops = VST(hr_crops, p['sigGs'], gain=p['K']) lr_crops = VST(lr_crops, p['sigGs'], gain=p['K']) lr_crops = lr_crops - bias lower = VST(0, p['sigGs'], gain=p['K']) upper = VST((p['wp'] - p['bl']), p['sigGs'], gain=p['K']) nsr = 1 / (upper - lower) hr_crops = (hr_crops - lower) / (upper - lower) lr_crops = (lr_crops - lower) / (upper - lower) data['sigma'] = nsr else: data['sigma'] = self.sigma setup_seed(idx) noise = np.random.randn(*lr_crops.shape) * data['sigma'] lr_crops += noise data["lr"] = np.ascontiguousarray(lr_crops) data["hr"] = np.ascontiguousarray(hr_crops) if self.args['clip']: data["lr"] = lr_crops.clip(0, 1) data["hr"] = hr_crops.clip(0, 1) return data # Synthetic Dataset(sRGB) class RGB_Img_Dataset(Dataset): def __init__(self, args=None): super().__init__() self.default_args() if args is not None: for key in args: self.args[key] = args[key] self.initialization() def default_args(self): self.args = {} self.args['root_dir'] = 'YOND' self.args['crop_size'] = 256 self.args['ori'] = False self.args['iso'] = None self.args['dgain'] = None self.args['params'] = None self.args['lock_wb'] = False self.args['gpu_preprocess'] = False self.args['dstname'] = 'YOND' self.args['mode'] = 'train' self.args['command'] = '' def initialization(self): # 获取数据地址 self.suffix = 'npy' self.root_dir = self.args['root_dir'] self.mode = self.args['mode'] self.data_dir = f"{self.root_dir}/{self.mode}" if self.mode == 'train': self.data_dir += f'_{self.args["subname"]}' self.datapath = sorted(glob.glob(f'{self.data_dir}/*.{self.suffix}')) self.names = [os.path.basename(path)[:-4] for path in self.datapath] self.infos = [{'name':name, 'path':path} for name, path in zip(self.names, self.datapath)] self.buffer = [None] * len(self.infos) if 'cache' in self.args['command']: log(f'Loading {len(self.infos)} crops!!!') self.buffer = [dataload(path) for path in tqdm(self.datapath)] self.length = len(self.infos) self.sigma = -1 self.get_shape() log(f'Successfully cache {self.length} npy data!!!') def __len__(self): return self.length def get_shape(self): self.H, self.W = self.args['H'], self.args['W'] self.C = 3 self.h = self.H // 2 self.w = self.W // 2 self.c = 4 def data_aug(self, data, mode=0): if mode == 0: return data rot = mode % 4 flip = mode // 4 data = np.rot90(data, k=rot, axes=(-2, -1)) if flip: data = data[..., ::-1] return data def __getitem__(self, idx): data = {} # 读取数据 data['name'] = self.infos[idx]['name'] if self.buffer[idx] is None: self.buffer[idx] = dataload(self.datapath[idx]) max_val = 255. if self.buffer[idx].dtype == np.uint8 else 65535. hr_imgs = self.buffer[idx].astype(np.float32) / max_val if self.args["mode"] == 'train': data['aug_id1'] = np.random.randint(8) self.data_aug(hr_imgs, data['aug_id1']) else: setup_seed(idx) hr_crops = hr_imgs.transpose(2,0,1) lr_crops = hr_crops.copy() # 人工加噪声 if self.args['gpu_preprocess'] is False: if self.args['mode'] == 'train': # lower, upper = np.log(self.args['sigma_min']), np.log(self.args['sigma_max']) # data['sigma'] = np.exp(np.random.rand()*(upper-lower)+lower) / 255. lower, upper = self.args['sigma_min'], self.args['sigma_max'] data['sigma'] = (np.random.rand()*(upper-lower)+lower) / 255. else: data['sigma'] = self.sigma setup_seed(idx) noise = np.random.randn(*lr_crops.shape) * data['sigma'] lr_crops += noise data["lr"] = np.ascontiguousarray(lr_crops) data["hr"] = np.ascontiguousarray(hr_crops) if self.args['clip']: data["lr"] = lr_crops.clip(0, 1) data["hr"] = hr_crops.clip(0, 1) return data # Unprocess Synthetic Dataset(sRGB->Raw) class DIV2K_Img2Raw_Dataset(Dataset): def __init__(self, args=None): super().__init__() self.default_args() if args is not None: for key in args: self.args[key] = args[key] self.initialization() def default_args(self): self.args = {} self.args['root_dir'] = 'DIV2K' self.args['crop_size'] = 256 self.args['ori'] = False self.args['iso'] = None self.args['dgain'] = None self.args['params'] = None self.args['lock_wb'] = False self.args['gpu_preprocess'] = False self.args['dstname'] = 'DIV2K' self.args['mode'] = 'train' self.args['command'] = '' def initialization(self): # 获取数据地址 self.suffix = 'npy' self.root_dir = self.args['root_dir'] self.mode = self.args['mode'] self.data_dir = f"{self.root_dir}/npy/{self.mode}" self.datapath = sorted(glob.glob(f'{self.data_dir}/*.{self.suffix}')) self.names = [os.path.basename(path)[:-4] for path in self.datapath] self.infos = [{'name':name, 'path':path} for name, path in zip(self.names, self.datapath)] self.buffer = [None] * len(self.infos) if 'cache' in self.args['command']: log(f'Loading {len(self.infos)} crops!!!') self.buffer = [dataload(path) for path in tqdm(self.datapath)] self.length = len(self.infos) self.sigma = -1 self.get_shape() log(f'Successfully cache {self.length} npy data!!!') def __len__(self): return self.length def get_shape(self): self.H, self.W = self.args['H'], self.args['W'] self.C = 3 self.h = self.H // 2 self.w = self.W // 2 self.c = 4 def data_aug(self, data, mode=0): if mode == 0: return data rot = mode % 4 flip = mode // 4 data = np.rot90(data, k=rot, axes=(-2, -1)) if flip: data = data[..., ::-1] return data def __getitem__(self, idx): data = {} # 读取数据 data['name'] = self.infos[idx]['name'] if self.buffer[idx] is None: self.buffer[idx] = dataload(self.datapath[idx]) hr_imgs = self.buffer[idx].astype(np.float32) / 255. if self.args["mode"] == 'train': data['aug_id1'] = np.random.randint(8) self.data_aug(hr_imgs, data['aug_id1']) else: setup_seed(idx) hr_crops = hr_imgs # RAW需要复杂的unproces lr_shape = hr_crops.shape hr_crops = torch.from_numpy(hr_crops) hr_crops, metadata = unprocess(hr_crops, lock_wb=self.args["lock_wb"], use_gpu=self.args['gpu_preprocess']) data['wb'] = np.array([metadata['red_gain'].item(), 1., metadata['blue_gain'].item()]) data['ccm'] = metadata['cam2rgb'].numpy() hr_crops = mosaic(hr_crops).numpy() # rgbg # 这个旋转是用来改变bayer模式的 # if self.args["mode"] == 'train': data['pattern'] = np.random.randint(4) if self.args["mode"] == 'train' else idx%4 hr_crops = bayer_aug(hr_crops, k=data['pattern']) data['vst_aug'] = False hr_crops = hr_crops ** 0.5 if data['vst_aug'] else hr_crops # [crops,h,w,c] -> [crops,c,h,w] hr_crops = hr_crops.transpose(2,0,1) lr_crops = hr_crops.copy() # 人工加噪声 if self.args['gpu_preprocess'] is False: if self.args['mode'] == 'train': lower, upper = np.log(self.args['sigma_min']), np.log(self.args['sigma_max']) data['sigma'] = np.exp(np.random.rand()*(upper-lower)+lower) / 255. else: data['sigma'] = self.sigma setup_seed(idx) noise = np.random.randn(*lr_crops.shape) * data['sigma'] lr_crops += noise data["lr"] = np.ascontiguousarray(lr_crops) data["hr"] = np.ascontiguousarray(hr_crops) if self.args['clip']: data["lr"] = lr_crops.clip(0, 1) data["hr"] = hr_crops.clip(0, 1) return data # Unprocess Synthetic Dataset(sRGB->Raw3c1n) class RGB_Img2Raw3c1n_Dataset(Dataset): def __init__(self, args=None): super().__init__() self.default_args() if args is not None: for key in args: self.args[key] = args[key] self.initialization() def default_args(self): self.args = {} self.args['root_dir'] = 'YOND' self.args['crop_size'] = 256 self.args['ori'] = False self.args['iso'] = None self.args['dgain'] = None self.args['params'] = None self.args['lock_wb'] = False self.args['gpu_preprocess'] = False self.args['dstname'] = 'YOND' self.args['mode'] = 'train' self.args['command'] = '' def initialization(self): # 获取数据地址 self.suffix = 'npy' self.root_dir = self.args['root_dir'] self.mode = self.args['mode'] self.data_dir = f"{self.root_dir}/{self.mode}" if self.mode == 'train': self.data_dir += f'_{self.args["subname"]}' self.datapath = sorted(glob.glob(f'{self.data_dir}/*.{self.suffix}')) self.names = [os.path.basename(path)[:-4] for path in self.datapath] self.infos = [{'name':name, 'path':path} for name, path in zip(self.names, self.datapath)] self.buffer = [None] * len(self.infos) if 'cache' in self.args['command']: log(f'Loading {len(self.infos)} crops!!!') self.buffer = [dataload(path) for path in tqdm(self.datapath)] self.length = len(self.infos) self.sigma = -1 self.get_shape() log(f'Successfully cache {self.length} npy data!!!') def __len__(self): return self.length def get_shape(self): self.H, self.W = self.args['H'], self.args['W'] self.C = 3 self.h = self.H // 2 self.w = self.W // 2 self.c = 4 def data_aug(self, data, mode=0): if mode == 0: return data rot = mode % 4 flip = mode // 4 data = np.rot90(data, k=rot, axes=(-2, -1)) if flip: data = data[..., ::-1] return data def __getitem__(self, idx): data = {} # 读取数据 data['name'] = self.infos[idx]['name'] if self.buffer[idx] is None: self.buffer[idx] = dataload(self.datapath[idx]) max_val = 255. if self.buffer[idx].dtype == np.uint8 else 65535. hr_imgs = self.buffer[idx].astype(np.float32) / max_val if self.args["mode"] == 'train': data['aug_id1'] = np.random.randint(8) self.data_aug(hr_imgs, data['aug_id1']) else: setup_seed(idx) hr_crops = hr_imgs # RAW需要复杂的unproces lr_shape = hr_crops.shape hr_crops = torch.from_numpy(hr_crops) hr_crops, metadata = unprocess(hr_crops, lock_wb=self.args["lock_wb"], use_gpu=self.args['gpu_preprocess']) data['wb'] = np.array([metadata['red_gain'].item(), 1., metadata['blue_gain'].item()]) data['ccm'] = metadata['cam2rgb'].numpy() hr_crops = mosaic(hr_crops).numpy() # rggb # [crops,h,w,c] -> [crops,c,h,w] hr_crops = hr_crops.transpose(2,0,1) lr_crops = hr_crops.copy() # 人工加噪声 if self.args['gpu_preprocess'] is False: if self.args['mode'] == 'train': lower, upper = np.log(self.args['sigma_min']), np.log(self.args['sigma_max']) data['sigma'] = np.exp(np.random.rand()*(upper-lower)+lower) / 255. else: data['sigma'] = self.sigma setup_seed(idx) noise = np.random.randn(*lr_crops.shape[-2:]) * data['sigma'] lr_crops[2] += noise data["lr"] = np.ascontiguousarray(lr_crops) data["hr"] = np.ascontiguousarray(hr_crops) if self.args['clip']: data["lr"] = lr_crops.clip(0, 1) data["hr"] = hr_crops.clip(0, 1) return data # Unprocess Synthetic Dataset(sRGB->Raw) class DIV2K_PG_Dataset(DIV2K_Img2Raw_Dataset): def __init__(self, args=None): super().__init__(args) self.noise_params= { 'Kmin': -2.5, 'Kmax': 3.5, 'lam': 0.102, 'q': 1/(2**10), 'wp': 1023, 'bl': 64, 'sigTLk': 0.85187, 'sigTLb': 0.07991, 'sigTLsig': 0.02921, 'sigRk': 0.87611, 'sigRb': -2.11455, 'sigRsig': 0.03274, 'sigGsk': 0.85187, 'sigGsb': 0.67991, 'sigGssig': 0.02921, } self.p = self.get_noise_params() def get_noise_params(self): p = self.noise_params log_K = np.random.uniform(low=p['Kmin'], high=p['Kmax']) mu_Gs = (p['sigGsk'] + np.random.uniform(-0.2, 0.2))*log_K + (p['sigGsb'] + np.random.uniform(-1, 1)) log_sigGs = np.random.normal(loc=mu_Gs, scale=p['sigGssig']) K = np.exp(log_K) sigma = np.exp(log_sigGs) scale = p['wp'] - p['bl'] self.p = {'K':K, 'sigma':sigma, 'beta1':K/scale, 'beta2':(sigma/scale)**2, 'wp':p['wp'], 'bl':p['bl'], 'scale':p['wp']-p['bl']} return self.p def __getitem__(self, idx): data = {} # 读取数据 data['name'] = self.infos[idx]['name'] if self.buffer[idx] is None: self.buffer[idx] = dataload(self.datapath[idx]) hr_imgs = self.buffer[idx].astype(np.float32) / 255. if self.args["mode"] == 'train': data['aug_id1'] = np.random.randint(8) # self.data_aug(hr_imgs, data['aug_id1']) else: setup_seed(idx) hr_crops = hr_imgs # RAW需要复杂的unproces lr_shape = hr_crops.shape hr_crops = torch.from_numpy(hr_crops) hr_crops, metadata = unprocess(hr_crops, lock_wb=self.args["lock_wb"], use_gpu=False) data['wb'] = np.array([metadata['red_gain'].item(), 1., metadata['blue_gain'].item()]) data['ccm'] = metadata['cam2rgb'].numpy() hr_crops = mosaic(hr_crops).numpy() # rgbg # 这个旋转是用来改变bayer模式的 # if self.args["mode"] == 'train': data['pattern'] = np.random.randint(4) if self.args["mode"] == 'train' else idx%4 hr_crops = bayer_aug(hr_crops, k=data['pattern']) lr_crops = hr_crops.copy() # 人工加噪声 if self.args['mode'] == 'train': p = self.get_noise_params() data.update(p) else: data.update(self.p) if self.args['gpu_preprocess'] is False: lr_crops = np.random.poisson(lr_crops/p['beta1'])*p['beta1'] + np.random.randn(*lr_crops.shape) * data['beta2']**0.5 if 'est' in self.args['command']: # 对噪图求blur k = 19 lr_crops = lr_crops hr_crops = hr_crops lr_blur = cv2.blur(lr_crops, (k, k)) hr_blur = cv2.blur(hr_crops, (k, k)) lr_std = stdfilt(lr_crops, k) hr_std = stdfilt(hr_crops, k) hr_target = (p['beta1'] * hr_blur + p['beta2']) ** 0.5 var = lr_std**2 mean = lr_blur th, percent = get_threshold(hr_std) mask = (hr_std <= th) # 按阈值分割图像平滑区域与非平滑区域 if var[mask].size > 0: var, mean = var[mask], mean[mask] else: mask = (hr_std <= hr_std.max()) data['th'] = th data['hr_mask'] = mask data['lr_rggb'] = lr_crops data['hr_rggb'] = hr_crops data['lr_std'] = lr_std data['hr_std'] = hr_std data['lr_blur'] = lr_blur data['hr_blur'] = hr_blur data['lr'] = np.concatenate([lr_std, lr_blur, lr_crops], axis=-1) data['hr'] = hr_target else: data["lr"] = np.ascontiguousarray(lr_crops) data["hr"] = np.ascontiguousarray(hr_crops) for key in data: if 'lr' in key or 'hr' in key: data[key] = data[key].transpose(2,0,1) if self.args['clip']: data['lr'] = lr_crops.clip(0, 1) data['hr'] = hr_crops.clip(0, 1) return data # SIDD Paired Real Data class SIDD_Dataset(Dataset): def __init__(self, args=None): super().__init__() self.default_args() if args is not None: for key in args: self.args[key] = args[key] self.initialization() def default_args(self): self.args = {} self.args['root_dir'] = '/data/fenghansen/datasets/SIDD' self.args['params'] = None self.args['lock_wb'] = False self.args['gpu_preprocess'] = False self.args['dstname'] = 'SIDD' self.args['mode'] = 'eval' self.args['clip'] = 'True' self.args['wp'] = 1023 self.args['bl'] = 64 self.args['patch_size'] = 256 self.args['H'] = 256 self.args['W'] = 256 self.args['command'] = '' def initialization(self): # 获取数据地址 self.suffix = 'npy' self.root_dir = self.args['root_dir'] self.mode = self.args['mode'] if self.mode == 'train': self.data_dir = f'{self.root_dir}/SIDD_Medium_Raw/data' else: self.data_dir = f'{self.root_dir}/SIDD_Benchmark_Data' if self.mode == 'eval': self.lr_data = sio.loadmat(f'{self.root_dir}/SIDD_Validation_Raw/ValidationNoisyBlocksRaw.mat')['ValidationNoisyBlocksRaw'] self.hr_data = sio.loadmat(f'{self.root_dir}/SIDD_Validation_Raw/ValidationGtBlocksRaw.mat')['ValidationGtBlocksRaw'] else: self.lr_data = sio.loadmat(f'{self.root_dir}/SIDD_Validation_Raw/BenchmarkNoisyBlocksRaw.mat')['BenchmarkNoisyBlocksRaw'] self.hr_data = None self.pos = sio.loadmat(f'{self.root_dir}/SIDD_Validation_Raw/BenchmarkBlocks32.mat')['BenchmarkBlocks32'] self.names = sorted(os.listdir(self.data_dir)) self.datapaths = sorted(glob.glob(f'{self.data_dir}//*/*_010.MAT')) self.metapaths = sorted([path for path in self.datapaths if 'META' in path]) self.lr_paths = sorted([path for path in self.datapaths if 'NOISY' in path]) self.hr_paths = sorted([path for path in self.datapaths if 'GT' in path]) self.length = len(self.names) self.infos = [None] * self.length for i in range(self.length): metadata = read_metadata(dataload(self.metapaths[i])) self.infos[i] = { 'name': self.names[i], 'lr_path': self.lr_paths[i], 'hr_path': self.hr_paths[i] if len(self.hr_paths)>0 else None, 'metadata': metadata, } self.sigma = -1 self.get_shape() log(f'Successfully load {self.length} data!!! ({self.mode})') def __len__(self): return self.length def get_shape(self): self.H, self.W = self.args['H'], self.args['W'] self.C = 3 self.h = self.H // 2 self.w = self.W // 2 self.c = 4 def data_aug(self, data, mode=0): if mode == 0: return data rot = mode % 4 flip = mode // 4 data = np.rot90(data, k=rot, axes=(-2, -1)) if flip: data = data[..., ::-1] return data def __getitem__(self, idx): data = {} # 读取数据 data['name'] = self.infos[idx]['name'] data['meta'] = self.infos[idx]['metadata'] data['lr_path_full'] = self.infos[idx]['lr_path'] data['hr_path_full'] = self.infos[idx]['hr_path'] data['wb'] = data['meta']['wb'] data['cfa'] = data['meta']['bayer_2by2'] data['ccm'] = data['meta']['cst2'] data['iso'] = data['meta']['iso'] data['reg'] = (data['meta']['beta1'], data['meta']['beta2']) if self.args["mode"] == 'train': raise NotImplementedError else: data['lr'] = self.lr_data[idx] if self.mode == 'eval': data['hr'] = self.hr_data[idx] return data class LRID_Dataset(Dataset): def __init__(self, args=None): super().__init__() self.default_args() if args is not None: for key in args: self.args[key] = args[key] self.initialization() def default_args(self): self.args = {} self.args['root_dir'] = 'LRID/' self.args['suffix'] = 'dng' self.args['dgain'] = 1 self.args['dstname'] = 'indoor_x5' self.args['camera_type'] = 'IMX686' self.args['params'] = None self.args['mode'] = 'eval' self.args['GT_type'] = 'GT_align_ours' self.args['command'] = '' self.args['H'] = 3472 self.args['W'] = 4624 self.args['wp'] = 1023 self.args['bl'] = 64 self.args['clip'] = False def initialization(self): # 获取数据地址 self.suffix = 'dng' self.change_eval_ratio(ratio=1) self.iso = 6400 def __len__(self): return self.length def get_shape(self): self.shape = self.templet_raw.raw_image_visible.shape self.H, self.W = self.shape if 'bl' not in self.args: self.bl_all = np.array(self.templet_raw.black_level_per_channel) if np.mean(self.bl_all - self.bl_all[0]) != 0: warnings.warn(f'4 channel have different black level!!! ({self.bl_all})') self.bl = self.bl_all[0] else: self.bl = self.args['bl'] self.wp = self.templet_raw.white_level def change_eval_ratio(self, ratio): self.ratio = ratio self.infos_gt = [] self.infos_short = [] for dstname in self.args['dstname']: with open(f"infos/{dstname}_{self.args['GT_type']}.info", 'rb') as info_file: info = pkl.load(info_file) eval_id = self.get_eval_id(dstname) for idx in eval_id: self.infos_gt.append(info[idx]) with open(f'infos/{dstname}_short.info', 'rb') as info_file: info = pkl.load(info_file)[ratio] eval_id = self.get_eval_id(dstname) for idx in eval_id: self.infos_short.append(info[idx]) self.infos = self.infos_gt for i in range(len(self.infos)): self.infos[i]['hr'] = self.infos[i]['data'] self.infos[i]['lr'] = self.infos_short[i]['data'][0] if 'syn_noise' in self.args['command'].lower(): nt = self.args['noise_type'].upper() self.infos[i]['lr'] = self.infos[i]['hr'].replace('GT_align_ours', f'Noisy_{nt}/{ratio}') self.infos[i]['ExposureTime'] = self.infos_short[i]['metadata'][0]['ExposureTime'] del self.infos[i]['data'] print(f'>> Successfully load infos.pkl (Length: {len(self.infos)})') self.iso = 6400 self.length = len(self.infos) self.templet_raw_path = self.infos_short[0]['data'][0] self.templet_raw = rawpy.imread(self.templet_raw_path) self.get_shape() def get_eval_id(self, dstname='indoor_x5'): if dstname == 'indoor_x5': eval_ids = [4,14,25,41,44,51,52,53,58] elif dstname == 'indoor_x3': eval_ids = []#[0,6,15] elif dstname == 'outdoor_x5': eval_ids = [1,2,5] elif dstname == 'outdoor_x3': eval_ids = [9,21,22,32,44,51] else: eval_ids = [] return eval_ids def __getitem__(self, idx): data = {} # dataload hr_raw = np.array(dataload(self.infos[idx]['hr'])).reshape(self.H,self.W) lr_raw = np.array(dataload(self.infos[idx]['lr'])).reshape(self.H,self.W) data["hr"] = (hr_raw.astype(np.float32) - self.bl) / (self.wp - self.bl) # lr_raw = bayer2rggb(lr_raw.astype(np.float32)) - self.bl_all.reshape(1,1,-1) data["lr"] = (lr_raw.astype(np.float32) - self.bl) * self.ratio / (self.wp - self.bl) data['name'] = f"{self.infos[idx]['name']}_x{self.ratio:02d}" data['ratio'] = self.ratio data['ccm'] = self.infos[idx]['ccm'] data['wb'] = self.infos[idx]['wb'] data['cfa'] = 'rggb' data['ISO'] = self.iso data['ExposureTime'] = self.infos[idx]['ExposureTime'] * 1000 if self.args['clip']: data["hr"] = data["hr"].clip(0,1) data["lr"] = data["lr"].clip(0,1) return data class ELD_Full_Dataset(Dataset): def __init__(self, args=None): super().__init__() self.default_args() if args is not None: for key in args: self.args[key] = args[key] self.initialization() def default_args(self): self.args = {} self.args['root_dir'] = 'ELD/' self.args['ratio'] = 1 self.args['dstname'] = 'ELD' self.args['params'] = None self.args['mode'] = 'eval' self.args['command'] = '' self.args['wp'] = 16383 self.args['bl'] = 512 self.args['clip'] = False def initialization(self): # 获取数据地址 self.suffix = {'CanonEOS70D':'CR2', 'CanonEOS700D':'CR2', 'NikonD850':'nef', 'SonyA7S2':'ARW'} self.infos_all = {'CanonEOS70D':[], 'CanonEOS700D':[], 'NikonD850':[], 'SonyA7S2':[]} iso_list = [800, 1600, 3200] ratio_list = [1,10,100,200] hr_ids = np.array([1, 6, 11, 16]) for camera_type in self.infos_all: sub_dir = f'{self.args["root_dir"]}/{camera_type}' for scene in range(1,11): for iso_id, iso in enumerate(iso_list): for ratio_id, ratio in enumerate(ratio_list): lr_id = iso_id*5 + ratio_id + 2 ind = np.argmin(np.abs(lr_id - hr_ids)) hr_id = hr_ids[ind] name = f'IMG_{lr_id:04d}.{self.suffix[camera_type]}' hr_name = f'IMG_{hr_id:04d}.{self.suffix[camera_type]}' self.infos_all[camera_type].append({ 'cam': camera_type, 'name': f'{camera_type}_{scene:02d}_{name[:-4]}', 'hr': f'{sub_dir}/scene-{scene}/{hr_name}', 'lr': f'{sub_dir}/scene-{scene}/{name}', 'iso': iso, 'ratio': ratio, }) self.change_eval_ratio('SonyA7S2', ratio=1) def __len__(self): return self.length def change_eval_ratio(self, cam='SonyA7S2', ratio=1, iso_list=None): if iso_list is None: iso_list = [800, 1600, 3200] self.infos = [] for i in range(len(self.infos_all[cam])): if self.infos_all[cam][i]['iso'] in iso_list and self.infos_all[cam][i]['ratio'] == ratio: self.infos.append(self.infos_all[cam][i]) self.length = len(self.infos) self.ratio = ratio self.templet_raw_path = self.infos[0]['lr'] self.templet_raw = rawpy.imread(self.templet_raw_path) self.get_shape() log(f'Eval change to {cam} (length:{self.length}): ratio={ratio}, iso_list={iso_list}') def get_shape(self): self.shape = self.templet_raw.raw_image_visible.shape self.H, self.W = self.shape self.bl = np.array(self.templet_raw.black_level_per_channel) if np.mean(self.bl - self.bl[0]) != 0: warnings.warn(f'4 channel have different black level!!! ({self.bl})') self.bl = self.bl[0] self.wp = self.templet_raw.white_level def __getitem__(self, idx): data = {} # dataload hr_raw = np.array(dataload(self.infos[idx]['hr'])).reshape(self.H,self.W) lr_raw = np.array(dataload(self.infos[idx]['lr'])).reshape(self.H,self.W) data["hr"] = (hr_raw.astype(np.float32) - self.bl) / (self.wp - self.bl) data["lr"] = (lr_raw.astype(np.float32) - self.bl) * self.infos[idx]['ratio'] / (self.wp - self.bl) data['name'] = self.infos[idx]['name'] data['wb'], data['ccm'] = read_wb_ccm(rawpy.imread(self.infos[idx]['hr'])) data['ratio'] = self.infos[idx]['ratio'] data['ISO'] = self.infos[idx]['iso'] if self.args['clip']: data["hr"] = data["hr"].clip(0,1) data["lr"] = data["lr"].clip(0,1) return data ''' ds = np.load('E:/datasets/LRID/resources/darkshading-iso-6400.npy') lr_paths = glob.glob() raw_hr = rawpy.imread('F:/datasets/SELD/indoor_x5/100/004/000071_exp-512000000_iso-100_2022_08_17_06_21_43_034_12562490542059_orientation_0_camera-0.dng') raw_lr = rawpy.imread('E:/datasets/LRID/indoor_x5/6400/1/004/000115_exp-8000000_iso-6400_2022_08_17_06_21_58_288_12578251173465_orientation_0_camera-0.dng')#.raw_image_visible p = get_ISO_ExposureTime('F:/datasets/SELD/indoor_x5/100/004/000071_exp-512000000_iso-100_2022_08_17_06_21_43_034_12562490542059_orientation_0_camera-0.dng') p['name'] = 'IMX686' bl = raw_lr.black_level_per_channel[0] print(raw_lr.black_level_per_channel) p['bl'] = raw_lr.black_level_per_channel[0] p['wp'] = 1023 p['ratio'] = 1 p['scale'] = (p['wp']-p['bl']) / p['ratio'] print(p) lr_raw = (raw_lr.raw_image_visible.astype(np.float32) - p['bl']) / (p['wp'] - p['bl']) hr_raw = (raw_hr.raw_image_visible.astype(np.float32) - p['bl']) / (p['wp'] - p['bl']) hr_raw = np.load('E:/datasets/LRID/indoor_x5/npy/GT_align_ours/004.npy') hr_raw = (hr_raw.astype(np.float32) - p['bl']) / (p['wp'] - p['bl']) print(hr_raw.shape, hr_raw.min(), hr_raw.max()) print(lr_raw.shape, lr_raw.min(), lr_raw.max()) raw_hr.raw_image_visible[:] = hr_raw * (p['wp'] - p['bl']) + p['bl'] img_hr = raw_hr.postprocess(use_camera_wb=True, half_size=False, no_auto_bright=True, output_bps=8) plt.imsave(f"{p['name']}.png", img_hr) data = {'lr': lr_raw, 'hr':hr_raw.clip(0,1), 'name':p['name']} '''