from .utils import * def read_wb_ccm(raw): wb = np.array(raw.camera_whitebalance) wb /= wb[1] wb = wb.astype(np.float32) ccm = raw.color_matrix[:3, :3].astype(np.float32) if ccm[0,0] == 0: ccm = np.eye(3, dtype=np.float32) return wb, ccm def get_ISO_ExposureTime(filepath): # 不限于RAW,RGB图片也适用 raw_file = open(filepath, 'rb') exif_file = exifread.process_file(raw_file, details=False, strict=True) # 获取曝光时间 if 'EXIF ExposureTime' in exif_file: exposure_str = exif_file['EXIF ExposureTime'].printable else: exposure_str = exif_file['Image ExposureTime'].printable if '/' in exposure_str: fenmu = float(exposure_str.split('/')[0]) fenzi = float(exposure_str.split('/')[-1]) exposure = fenmu / fenzi else: exposure = float(exposure_str) # 获取ISO if 'EXIF ISOSpeedRatings' in exif_file: ISO_str = exif_file['EXIF ISOSpeedRatings'].printable else: ISO_str = exif_file['Image ISOSpeedRatings'].printable if '/' in ISO_str: fenmu = float(ISO_str.split('/')[0]) fenzi = float(ISO_str.split('/')[-1]) ISO = fenmu / fenzi else: ISO = float(ISO_str) info = {'ISO':int(ISO), 'ExposureTime':exposure, 'name':filepath.split('/')[-1]} return info def metainfo(rawpath): with open(rawpath, 'rb') as f: tags = exifread.process_file(f) _, suffix = os.path.splitext(os.path.basename(rawpath)) if suffix == '.dng': expo = eval(str(tags['Image ExposureTime'])) iso = eval(str(tags['Image ISOSpeedRatings'])) else: expo = eval(str(tags['EXIF ExposureTime'])) iso = eval(str(tags['EXIF ISOSpeedRatings'])) # print('ISO: {}, ExposureTime: {}'.format(iso, expo)) return iso, expo # Yuzhi Wang's ISP def bayer2rggb(bayer): H, W = bayer.shape return bayer.reshape(H//2, 2, W//2, 2).transpose(0, 2, 1, 3).reshape(H//2, W//2, 4) def rggb2bayer(rggb): H, W, _ = rggb.shape return rggb.reshape(H, W, 2, 2).transpose(0, 2, 1, 3).reshape(H*2, W*2) def bayer2rggbs(bayers): H, W = bayers.shape[-2:] return bayers.reshape(-1, H//2, 2, W//2, 2).permute(0, 1, 3, 2, 4).reshape(-1, H//2, W//2, 4) def rggb2bayers(rggbs): H, W, _ = rggbs.shape[-3:] return rggbs.reshape(-1, H, W, 2, 2).permute(0, 1, 3, 2, 4).reshape(-1, H*2, W*2) def bayer2rows(bayer): # 分行 H, W = bayer.shape return np.stack((bayer[0:H:2], bayer[1:H:2])) def bayer2gray(raw): # 相当于双线性插值的bayer2gray kernel = np.array([[1,2,1],[2,4,2],[1,2,1]], np.float32) / 16. gray = cv2.filter2D(raw, -1, kernel, borderType=cv2.BORDER_REFLECT) return gray def rows2bayer(rows): c, H, W = rows.shape bayer = np.empty((H*2, W)) bayer[0:H*2:2] = rows[0] bayer[1:H*2:2] = rows[1] return bayer # Kaixuan Wei's ISP def raw2bayer(raw, wp=1023, bl=64, norm=True, clip=False, bias=np.array([0,0,0,0])): raw = raw.astype(np.float32) H, W = raw.shape out = np.stack((raw[0:H:2, 0:W:2], #RGBG raw[0:H:2, 1:W:2], raw[1:H:2, 1:W:2], raw[1:H:2, 0:W:2]), axis=0).astype(np.float32) if norm: bl = bias + bl bl = bl.reshape(4, 1, 1) out = (out - bl) / (wp - bl) if clip: out = np.clip(out, 0, 1) return out.astype(np.float32) def bayer2raw(packed_raw, wp=16383, bl=512): if torch.is_tensor(packed_raw): packed_raw = packed_raw.detach() packed_raw = packed_raw[0].cpu().float().numpy() packed_raw = np.clip(packed_raw, 0, 1) packed_raw = packed_raw * (wp - bl) + bl C, H, W = packed_raw.shape H *= 2 W *= 2 raw = np.empty((H, W), dtype=np.uint16) raw[0:H:2, 0:W:2] = packed_raw[0, :,:] raw[0:H:2, 1:W:2] = packed_raw[1, :,:] raw[1:H:2, 1:W:2] = packed_raw[2, :,:] raw[1:H:2, 0:W:2] = packed_raw[3, :,:] return raw # Hansen Feng's ISP def repair_bad_pixels(raw, bad_points, method='median'): fixed_raw = bayer2rggb(raw) for i in range(4): fixed_raw[:,:,i] = cv2.medianBlur(fixed_raw[:,:,i],3) fixed_raw = rggb2bayer(fixed_raw) # raw = (1-bpc_map) * raw + bpc_map * fixed_raw for p in bad_points: raw[p[0],p[1]] = fixed_raw[p[0],p[1]] return raw def SimpleISP(raw, bl=0, wp=1, wb=[2,1,1,2], gamma=2.2): # rggb2RGB (SimpleISP) raw = (raw.astype(np.float32) - bl) / (wp-bl) wb = np.array(wb) raw = raw * wb.reshape(1,1,-1) raw = raw.clip(0, 1)[:,:,(0,1,3)] raw = raw ** (1/gamma) return raw def FastISP(img4c, wb=None, ccm=None, gamma=2.2, low_mem=True): # rgbg2RGB (FastISP) if torch.is_tensor(img4c): img4c = img4c[0].detach().cpu().numpy() h,w = img4c.shape[:2] H = h * 2 W = w * 2 raw = np.zeros((H,W), np.float32) red_gain = wb[0] if wb is not None else 2 blue_gain = wb[2] if wb is not None else 2 raw[0:H:2,0:W:2] = img4c[:,:,0] * red_gain # R raw[0:H:2,1:W:2] = img4c[:,:,1] # G1 raw[1:H:2,0:W:2] = img4c[:,:,2] # G2 raw[1:H:2,1:W:2] = img4c[:,:,3] * blue_gain # B raw = np.clip(raw, 0, 1) white_point = 16383 raw = raw * white_point img = cv2.cvtColor(raw.astype(np.uint16), cv2.COLOR_BAYER_BG2RGB_EA) / white_point if ccm is None: ccm = np.eye(3, dtype=np.float32) img = img[:, :, None, :].astype(np.float32) ccm = ccm[None, None, :, :].astype(np.float32) if low_mem: n = 8 h = img.shape[0] // n img_ccm = img.copy() img = img[:,:,0] for i in range(n): img[h*i:h*(i+1)] = np.sum(img_ccm[h*i:h*(i+1)] * ccm, axis=-1) img[h*n-h:] = np.sum(img_ccm[h*n-h:] * ccm, axis=-1) else: img = np.sum(img * ccm, axis=-1) img = np.clip(img, 0, 1) ** (1/gamma) return img def raw2rgb_rawpy(packed_raw, wb=None, ccm=None, raw=None): """Raw2RGB pipeline (rawpy postprocess version)""" if raw is None: if packed_raw.shape[-2] > 1500: raw = rawpy.imread('templet.dng') wp = 1023 bl = 64 else: raw = rawpy.imread('templet.ARW') wp = 16383 bl = 512 if wb is None: wb = np.array(raw.camera_whitebalance) wb /= wb[1] wb = list(wb) if ccm is None: try: ccm = raw.rgb_camera_matrix[:3, :3] except: warnings.warn("You have no Wei Kaixuan's customized rawpy, you can't get right ccm of SonyA7S2...") ccm = raw.color_matrix[:3, :3] elif np.max(np.abs(ccm - np.identity(3))) == 0: ccm = np.array([[ 1.9712269,-0.6789218,-0.29230508], [-0.29104823,1.748401,-0.45735288], [ 0.02051281,-0.5380369,1.5175241 ]], dtype=np.float32) if len(packed_raw.shape) >= 3: raw.raw_image_visible[:] = bayer2raw(packed_raw, wp, bl) else: # 传进来的就是raw图 raw.raw_image_visible[:] = packed_raw out = raw.postprocess(use_camera_wb=False, user_wb=wb, half_size=False, no_auto_bright=True, output_bps=8, bright=1, user_black=None, user_sat=None) return out def mask_pos_init(seq=7, max_speed=20, patch_size=512, crop_size=256): ps, cs = patch_size, crop_size pos = np.zeros((seq, 2), np.int16) v = np.random.randint(2*max_speed+1, size=2) - max_speed max_a = (max_speed // 4) * 2 // 2 + 1 # 模拟运动 for i in range(1, seq): pxmin, pymin = pos.min(axis=0) pxmax, pymax = pos.max(axis=0) # 保证不越界 if pxmax - pxmin >= ps - cs: v[0] = -v[0] if pymax - pymin >= ps - cs: v[1] = -v[1] pos[i] = pos[i-1] + v a = np.random.randint(2*max_a+1, size=2) - max_a v = (v + a).clip(-max_speed, max_speed) pxmin, pymin = pos.min(axis=0) pxmax, pymax = pos.max(axis=0) # 保证不越界 if pxmax - pxmin >= ps - cs: pos[:,0] = np.int16(pos[:,0] / (pxmax-pxmin+1)) if pymax - pymin >= ps - cs: pos[:,1] = np.int16(pos[:,1] / (pymax-pymin+1)) pxmin, pymin = pos.min(axis=0) pxmax, pymax = pos.max(axis=0) pxstart = np.random.randint(-pxmin, ps - cs - pxmax + 1) pystart = np.random.randint(-pymin, ps - cs - pymax + 1) pos[:,0] += pxstart pos[:,1] += pystart return pos def generate_gradient_square(sx, sy, min_val_range=(0.1, 0.9), max_val_range=(0.1, 0.9), color_aug=0.5, device='cuda'): """ 生成一个三通道的渐变正方形图像,min_val和max_val在指定范围内随机选取, 横轴和纵轴的渐变会根据随机选择的函数进行调整,每个通道随机缩放。 参数: sx (int): 正方形的宽度。 sy (int): 正方形的高度。 min_val_range (tuple): 最小值的范围,默认为(0.1, 0.9)。 max_val_range (tuple): 最大值的范围,默认为(0.1, 0.9)。 color_aug: 随机色彩aug的概率,需要在(0, 1)之间 device (str): 设备类型,可以是 'cuda' 或 'cpu'。 返回: torch.Tensor: 生成的三通道渐变正方形图像。(sx, sy, 3) """ # 随机选择最小值和最大值 min_val = random.uniform(*min_val_range) max_val = random.uniform(max(min_val, max_val_range[0]), max_val_range[1]) # 创建横轴和纵轴的线性渐变(从0到1) x = torch.linspace(0, 1, steps=sx, device=device, dtype=torch.float32) y = torch.linspace(0, 1, steps=sy, device=device, dtype=torch.float32) # 随机选择函数和参数 functions = [ lambda t: t.flip(0) if random.choice([True, False]) else t, lambda t: torch.sin(2 * torch.pi * random.uniform(0.25, 4) * t) / 2 + 0.5, lambda t: t ** random.uniform(0.25, 4) ] x_func = random.choice(functions) y_func = random.choice(functions) # 应用随机函数 x_grad = x_func(x) y_grad = y_func(y) # 将渐变扩展成二维矩阵 x_matrix = x_grad.unsqueeze(0).expand(sy, sx) y_matrix = y_grad.unsqueeze(1).expand(sy, sx) # 将横轴和纵轴渐变相乘 combined_grad = x_matrix * y_matrix # 扩展为三通道 grad_3c = combined_grad.unsqueeze(0).expand(3, sy, sx) # 随机对每个通道进行缩放(0.5) color_scales = torch.tensor([random.uniform(0, 1) for _ in range(3)], device=device, dtype=torch.float32).view(3, 1, 1) grad_3c = grad_3c * color_scales / color_scales.max() if random.uniform(0, 1) < color_aug else grad_3c # 缩放到指定的最小值和最大值(默认原范围为0~1) scaled_grad = min_val + (max_val - min_val) * grad_3c return scaled_grad