-
Notifications
You must be signed in to change notification settings - Fork 11
Expand file tree
/
Copy pathbasic_nomal_infill.py
More file actions
277 lines (204 loc) · 10.2 KB
/
basic_nomal_infill.py
File metadata and controls
277 lines (204 loc) · 10.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
import argparse
import numpy as np
import os
import torch
import cv2
from concurrent.futures import ThreadPoolExecutor, as_completed
from threading import Semaphore
from scipy.ndimage import binary_dilation
from stereo_rerender import infill_using_normals, masked_blur
import depth_frames_helper
from infill_common import mark_lower_side
# -----------------------
# Config / Globals
# -----------------------
num_inference_steps = None # More steps look better but slowe set by arg
black = np.array([0, 0, 0], dtype=np.uint8)
blue = np.array([0, 0, 255], dtype=np.uint8)
black_color = np.array([0,0,0])
pipeline = None
# Allow only ONE generate_infilled_frames on GPU at any time.
_GPU_GATE = Semaphore(1)
# -----------------------
# Helpers for batch mode
# -----------------------
def _is_txt(path: str) -> bool:
return isinstance(path, str) and path.lower().endswith(".txt")
def _read_list_file(path: str):
"""
Returns a list of stripped lines, ignoring blanks and lines starting with '#'.
"""
items = []
with open(path, "r", encoding="utf-8") as f:
for line in f:
s = line.strip()
if not s or s.startswith("#"):
continue
items.append(s)
return items
def blur_under_mask(img, bool_mask, ksize=(6,6), sigma=0):
"""
Gaussian-blur only the area where bool_mask == True.
The blur uses only masked pixels in the convolution window.
Areas outside mask remain unchanged.
img: H×W×C uint8 BGR
bool_mask: H×W boolean (True = blur)
ksize: kernel size for Gaussian
sigma: Gaussian sigma
"""
# 1) Gaussian kernel
g1d = cv2.getGaussianKernel(ksize[0], sigma)
kernel = g1d @ g1d.T
# 2) prepare float32
img_f = img.astype(np.float32)
# Mask as float (1 = included in blur, 0 = excluded)
m = bool_mask.astype(np.float32)
# 3) Convolve masked image and the mask separately
# weighted sum of pixels
blurred_sum = cv2.filter2D(img_f * m[..., None], -1, kernel, borderType=cv2.BORDER_ISOLATED)
# weighted sum of mask values
weight_sum = cv2.filter2D(m, -1, kernel, borderType=cv2.BORDER_ISOLATED)
# 4) Normalize (avoid division by zero)
w = weight_sum[..., None]
w_safe = np.where(w == 0, 1.0, w)
blurred = blurred_sum / w_safe
# 5) Combine:
# - where mask is True → blurred value
# - where mask is False → original
out = img_f.copy()
out[bool_mask] = blurred[bool_mask]
return np.clip(out, 0, 255).astype(np.uint8)
def normal_infill(img, infill_mask):
bg_mask = np.all(infill_mask != black_color, axis=-1)
#set everything under the normals to black dont want any dots
img[bg_mask] = black
#convert to float normals
img_mask_minus = ((infill_mask.astype('float32')/255.0)*2)-1
#blur image ro remove some remnats of halo objects (stuff that to projected to the wrong side of an edge)
blured_img = masked_blur(img)
#Fill in using the normals
filled_in_img = infill_using_normals(blured_img, bg_mask, img_mask_minus)
#Blur image and specifically the filled in parts as that looks better
blured_filled_in_img = blurred = cv2.blur(filled_in_img, (4, 4))
#write the now blured infill to the black areas
img[bg_mask] = blured_filled_in_img[bg_mask]
# Find the lower side of infill areas
backedge_mask_blue = mark_lower_side(infill_mask)
backedge_mask = np.all(backedge_mask_blue == blue, axis=-1)
#make mask surounding that lower edge
expanded_backedge_mask = binary_dilation(backedge_mask, iterations=6)
#blur under the lower edge mask to hide the halos
img = blur_under_mask(img, expanded_backedge_mask)
return img
def process_pair(sbs_color_video_path: str, sbs_mask_video_path: str, args):
if not os.path.isfile(sbs_color_video_path):
raise Exception(f"input sbs_color_video does not exist: {sbs_color_video_path}")
if not os.path.isfile(sbs_mask_video_path):
raise Exception(f"input sbs_mask_video does not exist: {sbs_mask_video_path}")
print(f"Processing: {sbs_color_video_path}")
raw_video = cv2.VideoCapture(sbs_color_video_path)
frame_width = int(raw_video.get(cv2.CAP_PROP_FRAME_WIDTH))
frame_height = int(raw_video.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = raw_video.get(cv2.CAP_PROP_FPS)
out_size = (frame_width, frame_height)
mask_video = cv2.VideoCapture(sbs_mask_video_path)
m_frame_width = int(mask_video.get(cv2.CAP_PROP_FRAME_WIDTH))
m_frame_height = int(mask_video.get(cv2.CAP_PROP_FRAME_HEIGHT))
assert frame_width == m_frame_width and frame_height == m_frame_height, "mask and color video not same resolution"
output_tmp_video_file = sbs_color_video_path + "_tmp_infilled.mkv"
output_video_file = sbs_color_video_path + "_infilled.mkv"
codec = cv2.VideoWriter_fourcc(*"FFV1")
out = cv2.VideoWriter(output_tmp_video_file, codec, fps, out_size)
frame_buffer = []
first_chunk = True
last_chunk = False
frame_n = 0
frames_chunk = 25
pic_width = int(frame_width // 2)
try:
while raw_video.isOpened():
print(f"Frame: {frame_n} {frame_n / max(fps, 1e-6)}s")
ret, raw_frame = raw_video.read()
if not ret:
break
frame_n += 1
rgb = cv2.cvtColor(raw_frame, cv2.COLOR_BGR2RGB)
ret_mask, mask_frame = mask_video.read()
if not ret_mask:
# If mask video ended early, assume blank mask remainder
mask_frame = np.zeros_like(raw_frame)
mask_frame = cv2.cvtColor(mask_frame, cv2.COLOR_BGR2RGB)
# Right mask
right_img_mask = mask_frame[:frame_height, pic_width:]
# Right image
right_org_img = rgb[:frame_height, pic_width:]
# Left mask (fliplr)
left_org_img_mask = mask_frame[:frame_height, :pic_width]
# Left image (fliplr)
left_org_img = rgb[:frame_height, :pic_width]
left_img = normal_infill(left_org_img, left_org_img_mask)
#left_bg_mask = np.all(left_org_img_mask != black_color, axis=-1)
#left_img_mask_minus = ((left_org_img_mask.astype('float32')/255.0)*2)-1
#blured_img = masked_blur(left_org_img)#.astype('float32')/255.0
#filled_in_img = infill_using_normals(blured_img, left_bg_mask, left_img_mask_minus)
#blured_filled_in_img = masked_blur(filled_in_img)
#left_org_img[left_bg_mask] = blured_filled_in_img[left_bg_mask]
#left_img = left_org_img#(left_img*255).astype(np.uint8)
# Edge blending to avoid halos
#left_mask_blue = mark_lower_side(left_org_img_mask)
#left_backedge_mask = np.all(left_mask_blue == blue, axis=-1)
#left_backedge_mask = binary_dilation(left_backedge_mask, iterations=6)
#left_img = blur_under_mask(left_img, left_backedge_mask)
#left_alpha = cv2.GaussianBlur(left_backedge_mask.astype(np.float32), (15, 15), 0)[..., np.newaxis]
#left_img = left_alpha * left_img + (1 - left_alpha) * left_org_img
#right_bg_mask = np.all(right_img_mask != black_color, axis=-1)
#right_img_mask_minus = ((right_img_mask.astype('float32')/255.0)*2)-1
#right_img = infill_using_normals(right_org_img.astype('float32')/255.0, right_bg_mask, right_img_mask_minus)
#right_img = (right_img*255).astype(np.uint8)
right_img = normal_infill(right_org_img, right_img_mask)
out_image = cv2.hconcat([left_img, right_img])
out_image_uint8 = np.clip(out_image, 0, 255).astype(np.uint8)
out.write(cv2.cvtColor(out_image_uint8, cv2.COLOR_RGB2BGR))
if args.max_frames != -1 and frame_n >= args.max_frames:
break
last_chunk = True
finally:
raw_video.release()
mask_video.release()
out.release()
depth_frames_helper.verify_and_move(output_tmp_video_file, frame_n, output_video_file)
print(f"Done. Wrote: {output_video_file}")
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Normal infill script')
parser.add_argument('--sbs_color_video', type=str, required=True, help='side by side stereo video renderd with point clouds in the masked area')
parser.add_argument('--sbs_mask_video', type=str, required=True, help='side by side stereo video mask')
parser.add_argument('--max_frames', default=-1, type=int, help='quit after max_frames nr of frames', required=False)
args = parser.parse_args()
# -----------------------
# Single vs Batch logic
# -----------------------
if _is_txt(args.sbs_color_video):
if not _is_txt(args.sbs_mask_video):
raise ValueError("If --sbs_color_video is a .txt file, then --sbs_mask_video must also be a .txt file.")
color_list = _read_list_file(args.sbs_color_video)
mask_list = _read_list_file(args.sbs_mask_video)
if len(color_list) != len(mask_list):
raise ValueError(
f"List length mismatch: {args.sbs_color_video} has {len(color_list)} entries, "
f"{args.sbs_mask_video} has {len(mask_list)} entries."
)
print(f"Batch mode: {len(color_list)} pairs")
# Run up to 2 clips in parallel. GPU sections are serialized by _GPU_GATE.
with ThreadPoolExecutor(max_workers=2) as ex:
futures = [ex.submit(process_pair, c_path, m_path, args)
for (c_path, m_path) in zip(color_list, mask_list)]
# Consume as they finish to keep the pool busy
for fut in as_completed(futures):
try:
fut.result()
except Exception as e:
# Surface errors but keep other jobs running
print(f"[ERROR] A clip failed: {e}")
else:
# Single-file mode (original behavior)
process_pair(args.sbs_color_video, args.sbs_mask_video, args)