from .isp_ops import * def read_metadata(metadata): meta = metadata['metadata'][0, 0] beta1, beta2 = meta['UnknownTags'][7, 0][2][0][0:2] cam = get_cam(meta) bayer_pattern = get_bayer_pattern(meta) # We found that the correct Bayer pattern is GBRG in S6 if cam == 'S6': bayer_pattern = [1, 2, 0, 1] bayer_2by2 = (np.asarray(bayer_pattern) + 1).reshape((2, 2)).tolist() wb = get_wb(meta) cst1, cst2 = get_csts(meta) # use cst2 for rendering iso = get_iso(meta) metadata = { 'meta': meta, 'beta1': beta1, 'beta2': beta2, 'bayer_2by2': bayer_2by2, 'wb':wb, 'cst2':cst2, 'iso':iso, 'cam':cam } return metadata def get_iso(metadata): try: iso = metadata['ISOSpeedRatings'][0][0] except: try: iso = metadata['DigitalCamera'][0, 0]['ISOSpeedRatings'][0][0] except: raise Exception('ISO not found.') return iso def get_cam(metadata): model = metadata['Make'][0] cam_dict = {'Apple': 'IP', 'Google': 'GP', 'samsung': 'S6', 'motorola': 'N6', 'LGE': 'G4'} return cam_dict[model] def get_bayer_pattern(metadata): bayer_id = 33422 bayer_tag_idx = 1 try: unknown_tags = metadata['UnknownTags'] if unknown_tags[bayer_tag_idx]['ID'][0][0][0] == bayer_id: bayer_pattern = unknown_tags[bayer_tag_idx]['Value'][0][0] else: raise Exception except: try: unknown_tags = metadata['SubIFDs'][0, 0]['UnknownTags'][0, 0] if unknown_tags[bayer_tag_idx]['ID'][0][0][0] == bayer_id: bayer_pattern = unknown_tags[bayer_tag_idx]['Value'][0][0] else: raise Exception except: try: unknown_tags = metadata['SubIFDs'][0, 1]['UnknownTags'] if unknown_tags[1]['ID'][0][0][0] == bayer_id: bayer_pattern = unknown_tags[bayer_tag_idx]['Value'][0][0] else: raise Exception except: print('Bayer pattern not found. Assuming RGGB.') bayer_pattern = [1, 2, 2, 3] return bayer_pattern def get_wb(metadata): return metadata['AsShotNeutral'] def get_csts(metadata): return metadata['ColorMatrix1'].reshape((3, 3)), metadata['ColorMatrix2'].reshape((3, 3)) def toTensor(patch, cam): # Convert Bayer into BGGR if cam == 'IP': # RGGB patch = np.rot90(patch, 2) elif cam == 'S6': # GBRG patch = np.flip(patch, axis=1) else: # GP, N6, G4: BGGR patch = patch # Space to depth patch = space_to_depth(np.expand_dims(patch, axis=-1)) # Add the batch size channel patch = np.expand_dims(patch, 0) # To tensor tensor = torch.from_numpy(patch.transpose((0, 3, 1, 2))).float() return tensor def toPatch(tensor, cam): # To numpy patch = tensor.cpu().detach().numpy() # Depth to space and squeeze the batch size channel patch = np.squeeze(depth_to_space(np.transpose(patch[0],(1,2,0))), axis=2) # Conver back to original Bayer if cam == 'IP': # BGGR patch = np.rot90(patch, 2) elif cam == 'S6': # GBRG patch = np.flip(patch, axis=1) else: # GP, N6, G4: BGGR patch = patch return patch def toTensor_nf(patch, cam): # Convert Bayer into RGGB if cam == 'IP': # RGGB patch = patch elif cam == 'S6': # GBRG patch = np.rot90(patch, 3) else: # GP, N6, G4: BGGR patch = np.rot90(patch, 2) # Space to depth patch = space_to_depth(np.expand_dims(patch, axis=-1)) # Add the batch size channel patch = np.expand_dims(patch, 0) return patch def toPatch_nf(patch, cam): # Depth to space and squeeze the batch size channel patch = np.squeeze(depth_to_space(patch[0]), axis=2) # Conver back to original Bayer if cam == 'IP': # RGGB patch = patch elif cam == 'S6': # GBRG patch = np.rot90(patch, 1) else: # GP, N6, G4: BGGR patch = np.rot90(patch, 2) return patch def space_to_depth(x, block_size=2): x = np.asarray(x) height, width, depth = x.shape reduced_height = height // block_size reduced_width = width // block_size y = x.reshape(reduced_height, block_size, reduced_width, block_size, depth) z = np.swapaxes(y, 1, 2).reshape(reduced_height, reduced_width, -1) return z def depth_to_space(x, block_size=2): x = np.asarray(x) height, width, _ = x.shape increased_height = height * block_size increased_width = width * block_size y = x.reshape(height, width, block_size, block_size, -1) z = np.swapaxes(y, 1, 2).reshape(increased_height, increased_width, -1) return z def process_sidd_image(image, bayer_pattern, wb, cst, *, save_file_rgb=None): """Simple processing pipeline""" image = image.clip(0, 1) image = flip_bayer(image, bayer_pattern) image = stack_rggb_channels(image) rgb2xyz = np.array( [ [0.4124564, 0.3575761, 0.1804375], [0.2126729, 0.7151522, 0.0721750], [0.0193339, 0.1191920, 0.9503041], ] ) rgb2cam = np.matmul(cst, rgb2xyz) cam2rgb = np.linalg.inv(rgb2cam) cam2rgb = cam2rgb / np.sum(cam2rgb, axis=-1, keepdims=True) image_srgb = process(image, 1 / wb[0][0], 1 / wb[0][1], 1 / wb[0][2], cam2rgb) image_srgb = swap_channels(image_srgb) image_srgb = image_srgb * 255.0 image_srgb = image_srgb.astype(np.uint8) if save_file_rgb: # Save cv2.imwrite(save_file_rgb, image_srgb) return image_srgb def flip_bayer(image, bayer_pattern): if (bayer_pattern == [[1, 2], [2, 3]]): pass elif (bayer_pattern == [[2, 1], [3, 2]]): image = np.fliplr(image) elif (bayer_pattern == [[2, 3], [1, 2]]): image = np.flipud(image) elif (bayer_pattern == [[3, 2], [2, 1]]): image = np.fliplr(image) image = np.flipud(image) else: import pdb pdb.set_trace() print('Unknown Bayer pattern.') return image def rot_bayer(image, bayer_pattern, rev=False, axis=(-2, -1)): if (bayer_pattern == [[1, 2], [2, 3]]): k = 0 elif (bayer_pattern == [[2, 1], [3, 2]]): k = 3 elif (bayer_pattern == [[2, 3], [1, 2]]): k = 1 elif (bayer_pattern == [[3, 2], [2, 1]]): k = 2 else: import pdb pdb.set_trace() print('Unknown Bayer pattern.') if rev: k = (4-k) % 4 image = np.rot90(image, k=k, axes=axis) return image def stack_rggb_channels(raw_image): """Stack the four RGGB channels of a Bayer raw image along a third dimension""" height, width = raw_image.shape channels = [] for yy in range(2): for xx in range(2): raw_image_c = raw_image[yy:height:2, xx:width:2].copy() channels.append(raw_image_c) channels = np.stack(channels, axis=-1) return channels def swap_channels(image): """Swap the order of channels: RGB --> BGR""" h, w, c = image.shape image1 = np.zeros(image.shape) for i in range(c): image1[:, :, i] = image[:, :, c - i - 1] return image1 def RGGB2Bayer(im): # convert RGGB stacked image to one channel Bayer bayer = np.zeros((im.shape[0] * 2, im.shape[1] * 2)) bayer[0::2, 0::2] = im[:, :, 0] bayer[0::2, 1::2] = im[:, :, 1] bayer[1::2, 0::2] = im[:, :, 2] bayer[1::2, 1::2] = im[:, :, 3] return bayer def demosaic_CV2(rggb_channels_stack): # using opencv demosaic bayer = RGGB2Bayer(rggb_channels_stack) dem = cv2.cvtColor(np.clip(bayer * 16383, 0, 16383).astype(dtype=np.uint16), cv2.COLOR_BayerBG2RGB_EA) dem = dem.astype(dtype=np.float32) / 16383 return dem def apply_gains(bayer_image, red_gains, green_gains, blue_gains): gains = np.stack([red_gains, green_gains, green_gains, blue_gains], axis=-1) gains = gains[np.newaxis, np.newaxis, :] return bayer_image * gains def demosaic_simple(rggb_channels_stack): channels_rgb = rggb_channels_stack[:, :, :3] channels_rgb[:, :, 0] = channels_rgb[:, :, 0] channels_rgb[:, :, 1] = np.mean(rggb_channels_stack[:, :, 1:3], axis=2) channels_rgb[:, :, 2] = rggb_channels_stack[:, :, 3] return channels_rgb def apply_ccm(image, ccm): images = image[:, :, np.newaxis, :] ccms = ccm[np.newaxis, np.newaxis, :, :] return np.sum(images * ccms, axis=-1) def gamma_compression(images, gamma=2.2): return np.maximum(images, 1e-8) ** (1.0 / gamma) def process(bayer_images, red_gains, green_gains, blue_gains, cam2rgbs): bayer_images = apply_gains(bayer_images, red_gains, green_gains, blue_gains) bayer_images = np.clip(bayer_images, 0.0, 1.0) images = demosaic_CV2(bayer_images) images = apply_ccm(images, cam2rgbs) images = np.clip(images, 0.0, 1.0) images = gamma_compression(images) return images def get_histogram(data, bin_edges=None, left_edge=0.0, right_edge=1.0, n_bins=1000): data_range = right_edge - left_edge bin_width = data_range / n_bins if bin_edges is None: bin_edges = np.arange(left_edge, right_edge + bin_width, bin_width) bin_centers = bin_edges[:-1] + (bin_width / 2.0) n = np.prod(data.shape) hist, _ = np.histogram(data, bin_edges) return hist / n, bin_centers def cal_kld(p_data, q_data, left_edge=0.0, right_edge=1.0, n_bins=1000): """Returns forward, inverse, and symmetric KL divergence between two sets of data points p and q""" bw = 0.2 / 64 bin_edges = np.concatenate(([-1000.0], np.arange(-0.1, 0.1 + 1e-9, bw), [1000.0]), axis=0) p, _ = get_histogram(p_data, bin_edges, left_edge, right_edge, n_bins) q, _ = get_histogram(q_data, bin_edges, left_edge, right_edge, n_bins) idx = (p > 0) & (q > 0) p = p[idx] q = q[idx] logp = np.log(p) logq = np.log(q) kl_fwd = np.sum(p * (logp - logq)) kl_inv = np.sum(q * (logq - logp)) kl_sym = (kl_fwd + kl_inv) / 2.0 return kl_fwd #, kl_inv, kl_sym