YOND / utils /isp_ops.py
hansen97's picture
Initial clean commit
0e07d71
from .utils import *
def read_wb_ccm(raw):
wb = np.array(raw.camera_whitebalance)
wb /= wb[1]
wb = wb.astype(np.float32)
ccm = raw.color_matrix[:3, :3].astype(np.float32)
if ccm[0,0] == 0:
ccm = np.eye(3, dtype=np.float32)
return wb, ccm
def get_ISO_ExposureTime(filepath):
# 不限于RAW,RGB图片也适用
raw_file = open(filepath, 'rb')
exif_file = exifread.process_file(raw_file, details=False, strict=True)
# 获取曝光时间
if 'EXIF ExposureTime' in exif_file:
exposure_str = exif_file['EXIF ExposureTime'].printable
else:
exposure_str = exif_file['Image ExposureTime'].printable
if '/' in exposure_str:
fenmu = float(exposure_str.split('/')[0])
fenzi = float(exposure_str.split('/')[-1])
exposure = fenmu / fenzi
else:
exposure = float(exposure_str)
# 获取ISO
if 'EXIF ISOSpeedRatings' in exif_file:
ISO_str = exif_file['EXIF ISOSpeedRatings'].printable
else:
ISO_str = exif_file['Image ISOSpeedRatings'].printable
if '/' in ISO_str:
fenmu = float(ISO_str.split('/')[0])
fenzi = float(ISO_str.split('/')[-1])
ISO = fenmu / fenzi
else:
ISO = float(ISO_str)
info = {'ISO':int(ISO), 'ExposureTime':exposure, 'name':filepath.split('/')[-1]}
return info
def metainfo(rawpath):
with open(rawpath, 'rb') as f:
tags = exifread.process_file(f)
_, suffix = os.path.splitext(os.path.basename(rawpath))
if suffix == '.dng':
expo = eval(str(tags['Image ExposureTime']))
iso = eval(str(tags['Image ISOSpeedRatings']))
else:
expo = eval(str(tags['EXIF ExposureTime']))
iso = eval(str(tags['EXIF ISOSpeedRatings']))
# print('ISO: {}, ExposureTime: {}'.format(iso, expo))
return iso, expo
# Yuzhi Wang's ISP
def bayer2rggb(bayer):
H, W = bayer.shape
return bayer.reshape(H//2, 2, W//2, 2).transpose(0, 2, 1, 3).reshape(H//2, W//2, 4)
def rggb2bayer(rggb):
H, W, _ = rggb.shape
return rggb.reshape(H, W, 2, 2).transpose(0, 2, 1, 3).reshape(H*2, W*2)
def bayer2rggbs(bayers):
H, W = bayers.shape[-2:]
return bayers.reshape(-1, H//2, 2, W//2, 2).permute(0, 1, 3, 2, 4).reshape(-1, H//2, W//2, 4)
def rggb2bayers(rggbs):
H, W, _ = rggbs.shape[-3:]
return rggbs.reshape(-1, H, W, 2, 2).permute(0, 1, 3, 2, 4).reshape(-1, H*2, W*2)
def bayer2rows(bayer):
# 分行
H, W = bayer.shape
return np.stack((bayer[0:H:2], bayer[1:H:2]))
def bayer2gray(raw):
# 相当于双线性插值的bayer2gray
kernel = np.array([[1,2,1],[2,4,2],[1,2,1]], np.float32) / 16.
gray = cv2.filter2D(raw, -1, kernel, borderType=cv2.BORDER_REFLECT)
return gray
def rows2bayer(rows):
c, H, W = rows.shape
bayer = np.empty((H*2, W))
bayer[0:H*2:2] = rows[0]
bayer[1:H*2:2] = rows[1]
return bayer
# Kaixuan Wei's ISP
def raw2bayer(raw, wp=1023, bl=64, norm=True, clip=False, bias=np.array([0,0,0,0])):
raw = raw.astype(np.float32)
H, W = raw.shape
out = np.stack((raw[0:H:2, 0:W:2], #RGBG
raw[0:H:2, 1:W:2],
raw[1:H:2, 1:W:2],
raw[1:H:2, 0:W:2]), axis=0).astype(np.float32)
if norm:
bl = bias + bl
bl = bl.reshape(4, 1, 1)
out = (out - bl) / (wp - bl)
if clip: out = np.clip(out, 0, 1)
return out.astype(np.float32)
def bayer2raw(packed_raw, wp=16383, bl=512):
if torch.is_tensor(packed_raw):
packed_raw = packed_raw.detach()
packed_raw = packed_raw[0].cpu().float().numpy()
packed_raw = np.clip(packed_raw, 0, 1)
packed_raw = packed_raw * (wp - bl) + bl
C, H, W = packed_raw.shape
H *= 2
W *= 2
raw = np.empty((H, W), dtype=np.uint16)
raw[0:H:2, 0:W:2] = packed_raw[0, :,:]
raw[0:H:2, 1:W:2] = packed_raw[1, :,:]
raw[1:H:2, 1:W:2] = packed_raw[2, :,:]
raw[1:H:2, 0:W:2] = packed_raw[3, :,:]
return raw
# Hansen Feng's ISP
def repair_bad_pixels(raw, bad_points, method='median'):
fixed_raw = bayer2rggb(raw)
for i in range(4):
fixed_raw[:,:,i] = cv2.medianBlur(fixed_raw[:,:,i],3)
fixed_raw = rggb2bayer(fixed_raw)
# raw = (1-bpc_map) * raw + bpc_map * fixed_raw
for p in bad_points:
raw[p[0],p[1]] = fixed_raw[p[0],p[1]]
return raw
def SimpleISP(raw, bl=0, wp=1, wb=[2,1,1,2], gamma=2.2):
# rggb2RGB (SimpleISP)
raw = (raw.astype(np.float32) - bl) / (wp-bl)
wb = np.array(wb)
raw = raw * wb.reshape(1,1,-1)
raw = raw.clip(0, 1)[:,:,(0,1,3)]
raw = raw ** (1/gamma)
return raw
def FastISP(img4c, wb=None, ccm=None, gamma=2.2, low_mem=True):
# rgbg2RGB (FastISP)
if torch.is_tensor(img4c):
img4c = img4c[0].detach().cpu().numpy()
h,w = img4c.shape[:2]
H = h * 2
W = w * 2
raw = np.zeros((H,W), np.float32)
red_gain = wb[0] if wb is not None else 2
blue_gain = wb[2] if wb is not None else 2
raw[0:H:2,0:W:2] = img4c[:,:,0] * red_gain # R
raw[0:H:2,1:W:2] = img4c[:,:,1] # G1
raw[1:H:2,0:W:2] = img4c[:,:,2] # G2
raw[1:H:2,1:W:2] = img4c[:,:,3] * blue_gain # B
raw = np.clip(raw, 0, 1)
white_point = 16383
raw = raw * white_point
img = cv2.cvtColor(raw.astype(np.uint16), cv2.COLOR_BAYER_BG2RGB_EA) / white_point
if ccm is None: ccm = np.eye(3, dtype=np.float32)
img = img[:, :, None, :].astype(np.float32)
ccm = ccm[None, None, :, :].astype(np.float32)
if low_mem:
n = 8
h = img.shape[0] // n
img_ccm = img.copy()
img = img[:,:,0]
for i in range(n):
img[h*i:h*(i+1)] = np.sum(img_ccm[h*i:h*(i+1)] * ccm, axis=-1)
img[h*n-h:] = np.sum(img_ccm[h*n-h:] * ccm, axis=-1)
else:
img = np.sum(img * ccm, axis=-1)
img = np.clip(img, 0, 1) ** (1/gamma)
return img
def raw2rgb_rawpy(packed_raw, wb=None, ccm=None, raw=None):
"""Raw2RGB pipeline (rawpy postprocess version)"""
if raw is None:
if packed_raw.shape[-2] > 1500:
raw = rawpy.imread('templet.dng')
wp = 1023
bl = 64
else:
raw = rawpy.imread('templet.ARW')
wp = 16383
bl = 512
if wb is None:
wb = np.array(raw.camera_whitebalance)
wb /= wb[1]
wb = list(wb)
if ccm is None:
try:
ccm = raw.rgb_camera_matrix[:3, :3]
except:
warnings.warn("You have no Wei Kaixuan's customized rawpy, you can't get right ccm of SonyA7S2...")
ccm = raw.color_matrix[:3, :3]
elif np.max(np.abs(ccm - np.identity(3))) == 0:
ccm = np.array([[ 1.9712269,-0.6789218,-0.29230508],
[-0.29104823,1.748401,-0.45735288],
[ 0.02051281,-0.5380369,1.5175241 ]], dtype=np.float32)
if len(packed_raw.shape) >= 3:
raw.raw_image_visible[:] = bayer2raw(packed_raw, wp, bl)
else: # 传进来的就是raw图
raw.raw_image_visible[:] = packed_raw
out = raw.postprocess(use_camera_wb=False, user_wb=wb, half_size=False, no_auto_bright=True,
output_bps=8, bright=1, user_black=None, user_sat=None)
return out
def mask_pos_init(seq=7, max_speed=20, patch_size=512, crop_size=256):
ps, cs = patch_size, crop_size
pos = np.zeros((seq, 2), np.int16)
v = np.random.randint(2*max_speed+1, size=2) - max_speed
max_a = (max_speed // 4) * 2 // 2 + 1
# 模拟运动
for i in range(1, seq):
pxmin, pymin = pos.min(axis=0)
pxmax, pymax = pos.max(axis=0)
# 保证不越界
if pxmax - pxmin >= ps - cs: v[0] = -v[0]
if pymax - pymin >= ps - cs: v[1] = -v[1]
pos[i] = pos[i-1] + v
a = np.random.randint(2*max_a+1, size=2) - max_a
v = (v + a).clip(-max_speed, max_speed)
pxmin, pymin = pos.min(axis=0)
pxmax, pymax = pos.max(axis=0)
# 保证不越界
if pxmax - pxmin >= ps - cs:
pos[:,0] = np.int16(pos[:,0] / (pxmax-pxmin+1))
if pymax - pymin >= ps - cs:
pos[:,1] = np.int16(pos[:,1] / (pymax-pymin+1))
pxmin, pymin = pos.min(axis=0)
pxmax, pymax = pos.max(axis=0)
pxstart = np.random.randint(-pxmin, ps - cs - pxmax + 1)
pystart = np.random.randint(-pymin, ps - cs - pymax + 1)
pos[:,0] += pxstart
pos[:,1] += pystart
return pos
def generate_gradient_square(sx, sy, min_val_range=(0.1, 0.9), max_val_range=(0.1, 0.9), color_aug=0.5, device='cuda'):
"""
生成一个三通道的渐变正方形图像,min_val和max_val在指定范围内随机选取,
横轴和纵轴的渐变会根据随机选择的函数进行调整,每个通道随机缩放。
参数:
sx (int): 正方形的宽度。
sy (int): 正方形的高度。
min_val_range (tuple): 最小值的范围,默认为(0.1, 0.9)。
max_val_range (tuple): 最大值的范围,默认为(0.1, 0.9)。
color_aug: 随机色彩aug的概率,需要在(0, 1)之间
device (str): 设备类型,可以是 'cuda' 或 'cpu'。
返回:
torch.Tensor: 生成的三通道渐变正方形图像。(sx, sy, 3)
"""
# 随机选择最小值和最大值
min_val = random.uniform(*min_val_range)
max_val = random.uniform(max(min_val, max_val_range[0]), max_val_range[1])
# 创建横轴和纵轴的线性渐变(从0到1)
x = torch.linspace(0, 1, steps=sx, device=device, dtype=torch.float32)
y = torch.linspace(0, 1, steps=sy, device=device, dtype=torch.float32)
# 随机选择函数和参数
functions = [
lambda t: t.flip(0) if random.choice([True, False]) else t,
lambda t: torch.sin(2 * torch.pi * random.uniform(0.25, 4) * t) / 2 + 0.5,
lambda t: t ** random.uniform(0.25, 4)
]
x_func = random.choice(functions)
y_func = random.choice(functions)
# 应用随机函数
x_grad = x_func(x)
y_grad = y_func(y)
# 将渐变扩展成二维矩阵
x_matrix = x_grad.unsqueeze(0).expand(sy, sx)
y_matrix = y_grad.unsqueeze(1).expand(sy, sx)
# 将横轴和纵轴渐变相乘
combined_grad = x_matrix * y_matrix
# 扩展为三通道
grad_3c = combined_grad.unsqueeze(0).expand(3, sy, sx)
# 随机对每个通道进行缩放(0.5)
color_scales = torch.tensor([random.uniform(0, 1) for _ in range(3)], device=device, dtype=torch.float32).view(3, 1, 1)
grad_3c = grad_3c * color_scales / color_scales.max() if random.uniform(0, 1) < color_aug else grad_3c
# 缩放到指定的最小值和最大值(默认原范围为0~1)
scaled_grad = min_val + (max_val - min_val) * grad_3c
return scaled_grad