-
Notifications
You must be signed in to change notification settings - Fork 11
Expand file tree
/
Copy pathdepth_frames_helper.py
More file actions
279 lines (225 loc) · 9.53 KB
/
depth_frames_helper.py
File metadata and controls
279 lines (225 loc) · 9.53 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
import numpy as np
import cv2
import os
def encode_depth_as_uint32(depth, max_depth):
depth = np.clip(depth, a_max=max_depth, a_min=0.0)
multi = (255**4)/float(max_depth)
encoded_value = (multi*depth.astype(np.float64)).astype(np.uint32)
return encoded_value
def decode_uint32_as_depth(encoded_value, max_depth):
"""
encoded_value: numpy array of dtype uint32 (or scalar)
MODEL_maxOUTPUT_depth: the same max depth used in the encoder
returns: numpy array of dtype float32 giving depth in metres
"""
# cast up to float for the division
e = encoded_value.astype(np.float32)
multi = float(max_depth) / (255**4)
depth = e * multi
return depth
#These values have been picked as they give reaonable resolution at max_depth 100
#The resolution
C = 2.0
A = 16538.0
def encode_depth_as_uint32_log(depth, max_depth):
depth = np.clip(depth, a_max=max_depth, a_min=0.0)
encoded_value = np.round(A * np.log1p(depth / C)).astype(np.uint32)
return encoded_value
def decode_uint32_log_as_depth(encoded_value, max_depth):
"""
encoded_value: numpy array of dtype uint32 (or scalar)
returns: numpy array of dtype float32 giving depth in metres
"""
# promote to float for the expm1/division
e = encoded_value.astype(np.float32)
# invert the log1p mapping
depth = C * np.expm1(e / A)
return depth.astype(np.float32)
def encode_data_as_BGR(data, frame_width, frame_height, bit16 = False):
# View the uint32 as raw bytes: shape (H, W, 4)
save_bytes = data.view(np.uint8).reshape(frame_height, frame_width, 4)
if bit16:
R = (save_bytes[:, :, 3])# if 16 bit Most significant bits in R and G channel (duplicated in 16bit for visulization)
G = (save_bytes[:, :, 3])
B = (save_bytes[:, :, 2]) # Least significant bit in blue channel
else:#24 bif format is absolute or mabye it depends on input format like int32 is one thing sanf float32 another
R = (save_bytes[:, :, 2])
G = (save_bytes[:, :, 1])
B = (save_bytes[:, :, 0])
return np.dstack((B, G, R))
def decode_rgb_as_data(rgb, frame_width, frame_height, bit16 = False):
# View the uint32 as raw bytes: shape (H, W, 4)
data = np.zeros((frame_height, frame_width), dtype=np.uint32)
depth_unit = data.view(np.uint8).reshape((frame_height, frame_width, 4))
if bit16:
depth_unit[..., 3] = rgb[..., 0]
depth_unit[..., 2] = rgb[..., 2]
else:
depth_unit[..., 0] = rgb[..., 2]
depth_unit[..., 1] = rgb[..., 0]
depth_unit[..., 2] = rgb[..., 1]
return data
def rescale_image(img, side_length, mode="max"):
"""
Rescale image so either the longest or shortest side becomes `side_length`.
mode: "max" → longest side becomes `side_length`
"min" → shortest side becomes `side_length`
"""
h, w = img.shape[:2]
if mode == "max":
scale = side_length / max(h, w)
elif mode == "min":
scale = side_length / min(h, w)
else:
raise ValueError("mode must be 'max' or 'min'")
new_w = int(w * scale)
new_h = int(h * scale)
return cv2.resize(img, (new_w, new_h), interpolation=cv2.INTER_AREA)
def decode_rgb_depth_frame(rgb, max_depth, bit16):
frame_height = rgb.shape[0]
frame_width = rgb.shape[1]
encoded_value = decode_rgb_as_data(rgb, frame_width, frame_height, bit16)
return decode_uint32_as_depth(encoded_value, max_depth)
def normalize_depth(d):
d = d.astype(np.float32)
# keep only valid numbers
d_valid = d[np.isfinite(d)]
if d_valid.size == 0:
return None
# compute percentiles
d_min = np.percentile(d_valid, 1)
d_max = np.percentile(d_valid, 99)
# degenerate case: no variation
if d_max <= d_min + 1e-6:
# return a flat image (e.g. zeros)
return np.zeros_like(d, dtype=np.float32)
# normal case
return np.clip((d - d_min) / (d_max - d_min), 0, 1).reshape(d.shape)
def save_depth_video(frames, output_video_path, fps, max_depth_arg, rescale_width, rescale_height):
"""
Saves depth maps encoded in the R, G and B channels of a video (to increse accuracy as when compared to gray scale)
"""
MODEL_maxOUTPUT_depth = max_depth_arg ### pick a value slitght above max metric depth to save the depth in th video file nicly
# if you pick a high value you will lose resolution
if isinstance(frames, np.ndarray):
height = frames.shape[1]
width = frames.shape[2]
max_depth = frames.max()
print("max metric depth: ", max_depth)
# incase you did not pick a absolute value we max out (this mean each video will have depth relative to max_depth)
# (if you want to use the video as a depth souce a absolute value is prefrable)
if MODEL_maxOUTPUT_depth < max_depth:
print("warning: output depth is deeper than max_depth. The depth will be clipped")
nr_frames = frames.shape[0]
else:
nr_frames = len(frames)
height = frames[0].shape[0]
width = frames[0].shape[1]
out = cv2.VideoWriter(output_video_path, cv2.VideoWriter_fourcc(*"FFV1"), fps, (rescale_width, rescale_height))
for i in range(nr_frames):
if rescale_width != width or rescale_height != height:
depth = cv2.resize(frames[i], (rescale_width, rescale_height), interpolation=cv2.INTER_LINEAR)
else:
depth = frames[i]
encoded_depth = encode_depth_as_uint32(depth, MODEL_maxOUTPUT_depth)
bgr24bit = encode_data_as_BGR(encoded_depth, rescale_width, rescale_height, bit16 = True)
out.write(bgr24bit)
out.release()
def verify_and_move(tmp_file, expected_frames, output_file):
if not os.path.isfile(tmp_file):
return False
cap = cv2.VideoCapture(tmp_file)
if not cap.isOpened():
return False
actual_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
cap.release()
if actual_frames != expected_frames:
print(tmp_file, "not the correct nr of frames ", expected_frames, "!=", actual_frames)
return False
if os.path.exists(output_file):
os.remove(output_file)
os.rename(tmp_file, output_file)
return True
def save_grayscale_video(frames, output_video_path, fps, max_depth_arg, rescale_width, rescale_height):
"""
Saves depth maps as grayscale (R=G=B), using FFV1 (lossless).
Depth values are linearly mapped to [0, 255] using `max_depth_arg`.
If depth > max_depth_arg, it is clipped.
"""
MODEL_maxOUTPUT_depth = float(max_depth_arg)
# determine input shape
if isinstance(frames, np.ndarray):
# Expecting [N, H, W] or [N, H, W, 1]
nr_frames = frames.shape[0]
height = frames.shape[1]
width = frames.shape[2]
max_depth = np.max(frames)
if MODEL_maxOUTPUT_depth < max_depth:
print("warning: output depth exceeds max_depth_arg; values will be clipped.")
else:
# list/sequence of HxW arrays
nr_frames = len(frames)
height, width = frames[0].shape[:2]
out = cv2.VideoWriter(
output_video_path,
cv2.VideoWriter_fourcc(*"FFV1"),
fps,
(int(rescale_width), int(rescale_height))
)
for i in range(nr_frames):
depth = frames[i]
# squeeze last channel if present
if depth.ndim == 3 and depth.shape[-1] == 1:
depth = depth[..., 0]
# resize if needed (linear is fine for metric depth visualization)
if (rescale_width != width) or (rescale_height != height):
depth = cv2.resize(depth, (int(rescale_width), int(rescale_height)), interpolation=cv2.INTER_LINEAR)
# map depth to 0..255 (uint8)
# avoid division by zero
denom = MODEL_maxOUTPUT_depth if MODEL_maxOUTPUT_depth > 0 else (depth.max() if np.max(depth) > 0 else 1.0)
gray = (np.clip(depth, 0, MODEL_maxOUTPUT_depth) / denom) * 255.0
gray_u8 = gray.astype(np.uint8)
# OpenCV wants BGR; replicate gray to 3 channels
bgr = cv2.merge([gray_u8, gray_u8, gray_u8])
out.write(bgr)
out.release()
def write_video_frames_to_path(out_video, mask_frames, fps, H0, W0):
# ---- write output video ----
writer = cv2.VideoWriter(
out_video,
cv2.VideoWriter_fourcc(*"FFV1"), # lossless; switch to MJPG/mp4v if needed
fps,
(W0, H0)
)
assert writer.isOpened(), "Failed to open VideoWriter (FFV1/MKV). Try MJPG or mp4v if needed."
for f in mask_frames:
f = cv2.cvtColor(f, cv2.COLOR_RGB2BGR)
if f.shape[0] != H0 or f.shape[1] != W0:
f = cv2.resize(f, (W0, H0), interpolation=cv2.INTER_NEAREST)
writer.write(f)
writer.release()
print(f"[ok] wrote {len(mask_frames)} frames to {out_video}")
def load_video_frames_from_path(video_path, start_frame=0, max_frames=-1):
"""
Load frames from a video file with OpenCV (BGR uint8).
Returns (frames, fps).
"""
if not os.path.exists(video_path):
raise Exception("video file: "+ video_path + " does not exist")
cap = cv2.VideoCapture(video_path)
assert cap.isOpened(), f"Failed to open video: {video_path}"
fps = cap.get(cv2.CAP_PROP_FPS)
frames = []
idx = 0
while True:
ok, frame = cap.read()
if not ok:
break
if idx >= start_frame:
frames.append(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
if max_frames > 0 and len(frames) >= max_frames:
break
idx += 1
cap.release()
assert len(frames) > 0, "No frames read"
return frames, fps