YOND / utils /sidd_utils.py
hansen97's picture
Initial clean commit
0e07d71
from .isp_ops import *
def read_metadata(metadata):
meta = metadata['metadata'][0, 0]
beta1, beta2 = meta['UnknownTags'][7, 0][2][0][0:2]
cam = get_cam(meta)
bayer_pattern = get_bayer_pattern(meta)
# We found that the correct Bayer pattern is GBRG in S6
if cam == 'S6':
bayer_pattern = [1, 2, 0, 1]
bayer_2by2 = (np.asarray(bayer_pattern) + 1).reshape((2, 2)).tolist()
wb = get_wb(meta)
cst1, cst2 = get_csts(meta) # use cst2 for rendering
iso = get_iso(meta)
metadata = {
'meta': meta, 'beta1': beta1, 'beta2': beta2,
'bayer_2by2': bayer_2by2, 'wb':wb, 'cst2':cst2,
'iso':iso, 'cam':cam
}
return metadata
def get_iso(metadata):
try:
iso = metadata['ISOSpeedRatings'][0][0]
except:
try:
iso = metadata['DigitalCamera'][0, 0]['ISOSpeedRatings'][0][0]
except:
raise Exception('ISO not found.')
return iso
def get_cam(metadata):
model = metadata['Make'][0]
cam_dict = {'Apple': 'IP', 'Google': 'GP', 'samsung': 'S6', 'motorola': 'N6', 'LGE': 'G4'}
return cam_dict[model]
def get_bayer_pattern(metadata):
bayer_id = 33422
bayer_tag_idx = 1
try:
unknown_tags = metadata['UnknownTags']
if unknown_tags[bayer_tag_idx]['ID'][0][0][0] == bayer_id:
bayer_pattern = unknown_tags[bayer_tag_idx]['Value'][0][0]
else:
raise Exception
except:
try:
unknown_tags = metadata['SubIFDs'][0, 0]['UnknownTags'][0, 0]
if unknown_tags[bayer_tag_idx]['ID'][0][0][0] == bayer_id:
bayer_pattern = unknown_tags[bayer_tag_idx]['Value'][0][0]
else:
raise Exception
except:
try:
unknown_tags = metadata['SubIFDs'][0, 1]['UnknownTags']
if unknown_tags[1]['ID'][0][0][0] == bayer_id:
bayer_pattern = unknown_tags[bayer_tag_idx]['Value'][0][0]
else:
raise Exception
except:
print('Bayer pattern not found. Assuming RGGB.')
bayer_pattern = [1, 2, 2, 3]
return bayer_pattern
def get_wb(metadata):
return metadata['AsShotNeutral']
def get_csts(metadata):
return metadata['ColorMatrix1'].reshape((3, 3)), metadata['ColorMatrix2'].reshape((3, 3))
def toTensor(patch, cam):
# Convert Bayer into BGGR
if cam == 'IP': # RGGB
patch = np.rot90(patch, 2)
elif cam == 'S6': # GBRG
patch = np.flip(patch, axis=1)
else: # GP, N6, G4: BGGR
patch = patch
# Space to depth
patch = space_to_depth(np.expand_dims(patch, axis=-1))
# Add the batch size channel
patch = np.expand_dims(patch, 0)
# To tensor
tensor = torch.from_numpy(patch.transpose((0, 3, 1, 2))).float()
return tensor
def toPatch(tensor, cam):
# To numpy
patch = tensor.cpu().detach().numpy()
# Depth to space and squeeze the batch size channel
patch = np.squeeze(depth_to_space(np.transpose(patch[0],(1,2,0))), axis=2)
# Conver back to original Bayer
if cam == 'IP': # BGGR
patch = np.rot90(patch, 2)
elif cam == 'S6': # GBRG
patch = np.flip(patch, axis=1)
else: # GP, N6, G4: BGGR
patch = patch
return patch
def toTensor_nf(patch, cam):
# Convert Bayer into RGGB
if cam == 'IP': # RGGB
patch = patch
elif cam == 'S6': # GBRG
patch = np.rot90(patch, 3)
else: # GP, N6, G4: BGGR
patch = np.rot90(patch, 2)
# Space to depth
patch = space_to_depth(np.expand_dims(patch, axis=-1))
# Add the batch size channel
patch = np.expand_dims(patch, 0)
return patch
def toPatch_nf(patch, cam):
# Depth to space and squeeze the batch size channel
patch = np.squeeze(depth_to_space(patch[0]), axis=2)
# Conver back to original Bayer
if cam == 'IP': # RGGB
patch = patch
elif cam == 'S6': # GBRG
patch = np.rot90(patch, 1)
else: # GP, N6, G4: BGGR
patch = np.rot90(patch, 2)
return patch
def space_to_depth(x, block_size=2):
x = np.asarray(x)
height, width, depth = x.shape
reduced_height = height // block_size
reduced_width = width // block_size
y = x.reshape(reduced_height, block_size, reduced_width, block_size, depth)
z = np.swapaxes(y, 1, 2).reshape(reduced_height, reduced_width, -1)
return z
def depth_to_space(x, block_size=2):
x = np.asarray(x)
height, width, _ = x.shape
increased_height = height * block_size
increased_width = width * block_size
y = x.reshape(height, width, block_size, block_size, -1)
z = np.swapaxes(y, 1, 2).reshape(increased_height, increased_width, -1)
return z
def process_sidd_image(image, bayer_pattern, wb, cst, *, save_file_rgb=None):
"""Simple processing pipeline"""
image = image.clip(0, 1)
image = flip_bayer(image, bayer_pattern)
image = stack_rggb_channels(image)
rgb2xyz = np.array(
[
[0.4124564, 0.3575761, 0.1804375],
[0.2126729, 0.7151522, 0.0721750],
[0.0193339, 0.1191920, 0.9503041],
]
)
rgb2cam = np.matmul(cst, rgb2xyz)
cam2rgb = np.linalg.inv(rgb2cam)
cam2rgb = cam2rgb / np.sum(cam2rgb, axis=-1, keepdims=True)
image_srgb = process(image, 1 / wb[0][0], 1 / wb[0][1], 1 / wb[0][2], cam2rgb)
image_srgb = swap_channels(image_srgb)
image_srgb = image_srgb * 255.0
image_srgb = image_srgb.astype(np.uint8)
if save_file_rgb:
# Save
cv2.imwrite(save_file_rgb, image_srgb)
return image_srgb
def flip_bayer(image, bayer_pattern):
if (bayer_pattern == [[1, 2], [2, 3]]):
pass
elif (bayer_pattern == [[2, 1], [3, 2]]):
image = np.fliplr(image)
elif (bayer_pattern == [[2, 3], [1, 2]]):
image = np.flipud(image)
elif (bayer_pattern == [[3, 2], [2, 1]]):
image = np.fliplr(image)
image = np.flipud(image)
else:
import pdb
pdb.set_trace()
print('Unknown Bayer pattern.')
return image
def rot_bayer(image, bayer_pattern, rev=False, axis=(-2, -1)):
if (bayer_pattern == [[1, 2], [2, 3]]):
k = 0
elif (bayer_pattern == [[2, 1], [3, 2]]):
k = 3
elif (bayer_pattern == [[2, 3], [1, 2]]):
k = 1
elif (bayer_pattern == [[3, 2], [2, 1]]):
k = 2
else:
import pdb
pdb.set_trace()
print('Unknown Bayer pattern.')
if rev: k = (4-k) % 4
image = np.rot90(image, k=k, axes=axis)
return image
def stack_rggb_channels(raw_image):
"""Stack the four RGGB channels of a Bayer raw image along a third dimension"""
height, width = raw_image.shape
channels = []
for yy in range(2):
for xx in range(2):
raw_image_c = raw_image[yy:height:2, xx:width:2].copy()
channels.append(raw_image_c)
channels = np.stack(channels, axis=-1)
return channels
def swap_channels(image):
"""Swap the order of channels: RGB --> BGR"""
h, w, c = image.shape
image1 = np.zeros(image.shape)
for i in range(c):
image1[:, :, i] = image[:, :, c - i - 1]
return image1
def RGGB2Bayer(im):
# convert RGGB stacked image to one channel Bayer
bayer = np.zeros((im.shape[0] * 2, im.shape[1] * 2))
bayer[0::2, 0::2] = im[:, :, 0]
bayer[0::2, 1::2] = im[:, :, 1]
bayer[1::2, 0::2] = im[:, :, 2]
bayer[1::2, 1::2] = im[:, :, 3]
return bayer
def demosaic_CV2(rggb_channels_stack):
# using opencv demosaic
bayer = RGGB2Bayer(rggb_channels_stack)
dem = cv2.cvtColor(np.clip(bayer * 16383, 0, 16383).astype(dtype=np.uint16), cv2.COLOR_BayerBG2RGB_EA)
dem = dem.astype(dtype=np.float32) / 16383
return dem
def apply_gains(bayer_image, red_gains, green_gains, blue_gains):
gains = np.stack([red_gains, green_gains, green_gains, blue_gains], axis=-1)
gains = gains[np.newaxis, np.newaxis, :]
return bayer_image * gains
def demosaic_simple(rggb_channels_stack):
channels_rgb = rggb_channels_stack[:, :, :3]
channels_rgb[:, :, 0] = channels_rgb[:, :, 0]
channels_rgb[:, :, 1] = np.mean(rggb_channels_stack[:, :, 1:3], axis=2)
channels_rgb[:, :, 2] = rggb_channels_stack[:, :, 3]
return channels_rgb
def apply_ccm(image, ccm):
images = image[:, :, np.newaxis, :]
ccms = ccm[np.newaxis, np.newaxis, :, :]
return np.sum(images * ccms, axis=-1)
def gamma_compression(images, gamma=2.2):
return np.maximum(images, 1e-8) ** (1.0 / gamma)
def process(bayer_images, red_gains, green_gains, blue_gains, cam2rgbs):
bayer_images = apply_gains(bayer_images, red_gains, green_gains, blue_gains)
bayer_images = np.clip(bayer_images, 0.0, 1.0)
images = demosaic_CV2(bayer_images)
images = apply_ccm(images, cam2rgbs)
images = np.clip(images, 0.0, 1.0)
images = gamma_compression(images)
return images
def get_histogram(data, bin_edges=None, left_edge=0.0, right_edge=1.0, n_bins=1000):
data_range = right_edge - left_edge
bin_width = data_range / n_bins
if bin_edges is None:
bin_edges = np.arange(left_edge, right_edge + bin_width, bin_width)
bin_centers = bin_edges[:-1] + (bin_width / 2.0)
n = np.prod(data.shape)
hist, _ = np.histogram(data, bin_edges)
return hist / n, bin_centers
def cal_kld(p_data, q_data, left_edge=0.0, right_edge=1.0, n_bins=1000):
"""Returns forward, inverse, and symmetric KL divergence between two sets of data points p and q"""
bw = 0.2 / 64
bin_edges = np.concatenate(([-1000.0], np.arange(-0.1, 0.1 + 1e-9, bw), [1000.0]), axis=0)
p, _ = get_histogram(p_data, bin_edges, left_edge, right_edge, n_bins)
q, _ = get_histogram(q_data, bin_edges, left_edge, right_edge, n_bins)
idx = (p > 0) & (q > 0)
p = p[idx]
q = q[idx]
logp = np.log(p)
logq = np.log(q)
kl_fwd = np.sum(p * (logp - logq))
kl_inv = np.sum(q * (logq - logp))
kl_sym = (kl_fwd + kl_inv) / 2.0
return kl_fwd #, kl_inv, kl_sym