Skip to content

Commit 4fdec37

Browse files
committed
orthophoto merge: parallelize block processing with parallel_map
The split-merge orthophoto merge runs all ~61k blocks in a single thread, pegging one CPU for 10h+ on large surveys (issue #2034). Each block is an independent pure function of its source windows and source ordering, so all blocks can be processed in parallel. Changes: - Add `max_workers=1` parameter to `merge()` (default 1 = serial, unchanged behavior). - Hoist `dst_count = first.count` before the block loop so worker closures see it without holding the outer file handle. - Replace the `for idx, dst_window in dstrast.block_windows()` loop with `parallel_map(process_block, block_windows, max_workers)`. - Use thread-local rasterio source handles (`get_sources()` via `threading.local`) since GDAL/rasterio file handles are not thread-safe. - Serialize writes to the shared output dataset under `write_lock`. - Keep `merge_skip_blending` behavior: after pass 1, if the flag is set, write under the lock and return early (skipping passes 2 & 3). - Keep progress logging every 5% of blocks via `progress_lock`. - Wire `max_workers=args.max_concurrency` in `stages/splitmerge.py`. With `max_workers <= 1` (the default) `parallel_map` runs serially, so existing deployments are unaffected. Set `--max-concurrency N` to use N threads during the merge stage. Validated on opendronemap/odm:3.5.6 (identical algorithm): serial ~26.7s vs 8-worker ~7.9s on a 3-submodel Griend split (~440 blocks). The win grows with block count; a 15.9 Gpx prod orthophoto (~61k blocks) was observed at ~1.6/16 vCPU utilization before this fix. Pixel-identical correctness verified band-by-band vs serial output. Closes #2034
1 parent f90fa27 commit 4fdec37

2 files changed

Lines changed: 92 additions & 15 deletions

File tree

opendm/orthophoto.py

Lines changed: 91 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
11
import os
2+
import threading
23
from opendm import log
34
from opendm import system
45
from opendm.cropper import Cropper
5-
from opendm.concurrency import get_max_memory
6+
from opendm.concurrency import get_max_memory, parallel_map
67
import math
78
import numpy as np
89
import rasterio
@@ -265,10 +266,25 @@ def feather_raster(input_raster, output_raster, blend_distance=20):
265266

266267
return output_raster
267268

268-
def merge(input_ortho_and_ortho_cuts, output_orthophoto, orthophoto_vars={}, merge_skip_blending=False):
269+
def merge(input_ortho_and_ortho_cuts, output_orthophoto, orthophoto_vars={}, merge_skip_blending=False, max_workers=1):
269270
"""
270271
Based on https://github.com/mapbox/rio-merge-rgba/
271272
Merge orthophotos around cutlines using a blend buffer.
273+
274+
Each output block is an independent pure function of its source windows and
275+
the fixed source ordering, so blocks are processed in parallel. With
276+
max_workers <= 1 (the default) processing is strictly serial and the output
277+
is byte-for-byte identical to the original single-threaded loop.
278+
279+
Args:
280+
input_ortho_and_ortho_cuts: iterable of (orthophoto_path, cut_path) pairs.
281+
output_orthophoto: path for the merged output GeoTIFF.
282+
orthophoto_vars: rasterio profile overrides (TILED, COMPRESS, etc.).
283+
merge_skip_blending: if True, skip the feather and cutline blend passes
284+
(only the naive-copy pass 1 runs); faster but no blending at seams.
285+
max_workers: number of parallel worker threads (default 1 = serial).
286+
Returns:
287+
The output_orthophoto path, or None if there were no valid inputs.
272288
"""
273289
inputs = []
274290
bounds=None
@@ -293,6 +309,7 @@ def merge(input_ortho_and_ortho_cuts, output_orthophoto, orthophoto_vars={}, mer
293309
profile = first.profile
294310
num_bands = first.meta['count'] - 1 # minus alpha
295311
colorinterp = first.colorinterp
312+
dst_count = first.count
296313

297314
log.ODM_INFO("%s valid orthophoto rasters to merge" % len(inputs))
298315
sources = [(rasterio.open(o), rasterio.open(c)) for o,c in inputs]
@@ -308,6 +325,10 @@ def merge(input_ortho_and_ortho_cuts, output_orthophoto, orthophoto_vars={}, mer
308325
if src.profile["count"] < 2:
309326
raise ValueError("Inputs must be at least 2-band rasters")
310327
dst_w, dst_s, dst_e, dst_n = min(xs), min(ys), max(xs), max(ys)
328+
# Close the pre-scan handles; they are unused in the parallel block loop.
329+
for s, c in sources:
330+
s.close()
331+
c.close()
311332
log.ODM_INFO("Output bounds: %r %r %r %r" % (dst_w, dst_s, dst_e, dst_n))
312333

313334
output_transform = Affine.translation(dst_w, dst_n)
@@ -341,20 +362,66 @@ def merge(input_ortho_and_ortho_cuts, output_orthophoto, orthophoto_vars={}, mer
341362
# create destination file
342363
with rasterio.open(output_orthophoto, "w", **profile) as dstrast:
343364
dstrast.colorinterp = colorinterp
344-
for idx, dst_window in dstrast.block_windows():
345-
left, bottom, right, top = dstrast.window_bounds(dst_window)
365+
366+
# Each output block is an independent function of its source windows and
367+
# the source ordering, so blocks can be processed in parallel. GDAL/rasterio
368+
# datasets are not thread-safe on a single handle, so each worker thread
369+
# opens its own source handles (thread-local), and writes to the single
370+
# output dataset are serialized under a lock. With max_workers <= 1,
371+
# parallel_map runs serially and behavior is identical to the original loop.
372+
write_lock = threading.Lock()
373+
tls = threading.local()
374+
block_windows = [(dst_window, dstrast.window_bounds(dst_window))
375+
for _, dst_window in dstrast.block_windows()]
376+
total_blocks = len(block_windows)
377+
progress_lock = threading.Lock()
378+
progress = {"done": 0}
379+
log_every = max(1, total_blocks // 20)
380+
381+
opened_sources = []
382+
opened_lock = threading.Lock()
383+
384+
def get_sources():
385+
"""Return this thread's (ortho, cut) rasterio dataset handles.
386+
387+
GDAL/rasterio handles are not safe to share across threads, so each
388+
worker thread lazily opens and caches its own set on first use and
389+
registers it in opened_sources for cleanup after the parallel run.
390+
"""
391+
srcs = getattr(tls, "sources", None)
392+
if srcs is None:
393+
srcs = [(rasterio.open(o), rasterio.open(c)) for o, c in inputs]
394+
tls.sources = srcs
395+
with opened_lock:
396+
opened_sources.append(srcs)
397+
return srcs
398+
399+
def process_block(item):
400+
"""Compute and write one output block (rasterio Window).
401+
402+
Runs the naive-copy, feather-blend, and cutline-blend passes into a
403+
block-local array, then writes it to the shared output dataset under
404+
write_lock. Updates the shared progress counter (logged every ~5%).
405+
If merge_skip_blending is set, only the naive-copy pass runs and the
406+
block is written immediately.
407+
"""
408+
dst_window, (left, bottom, right, top) = item
409+
with progress_lock:
410+
progress["done"] += 1
411+
n = progress["done"]
412+
if n % log_every == 0:
413+
log.ODM_INFO("Merging orthophoto: %s / %s blocks" % (n, total_blocks))
414+
415+
local_sources = get_sources()
346416

347417
blocksize = dst_window.width
348418
dst_rows, dst_cols = (dst_window.height, dst_window.width)
349-
350-
# initialize array destined for the block
351-
dst_count = first.count
352419
dst_shape = (dst_count, dst_rows, dst_cols)
353420

354421
dstarr = np.zeros(dst_shape, dtype=dtype)
355422

356423
# First pass, write all rasters naively without blending
357-
for src, _ in sources:
424+
for src, _ in local_sources:
358425
src_window = tuple(zip(rowcol(
359426
src.transform, left, top, op=round, precision=precision
360427
), rowcol(
@@ -378,12 +445,13 @@ def merge(input_ortho_and_ortho_cuts, output_orthophoto, orthophoto_vars={}, mer
378445

379446
# Skip expensive blending operations if flag passed
380447
if merge_skip_blending:
381-
dstrast.write(dstarr, window=dst_window)
382-
continue
448+
with write_lock:
449+
dstrast.write(dstarr, window=dst_window)
450+
return
383451

384452
# Second pass, write all feathered rasters
385453
# blending the edges
386-
for src, _ in sources:
454+
for src, _ in local_sources:
387455
src_window = tuple(zip(rowcol(
388456
src.transform, left, top, op=round, precision=precision
389457
), rowcol(
@@ -400,14 +468,14 @@ def merge(input_ortho_and_ortho_cuts, output_orthophoto, orthophoto_vars={}, mer
400468
blended = temp[-1] / 255.0 * temp[b] + (1 - temp[-1] / 255.0) * dstarr[b]
401469
np.copyto(dstarr[b], blended, casting='unsafe', where=where)
402470
dstarr[-1][where] = 255.0
403-
471+
404472
# check if dest has any nodata pixels available
405473
if np.count_nonzero(dstarr[-1]) == blocksize:
406474
break
407475

408476
# Third pass, write cut rasters
409477
# blending the cutlines
410-
for _, cut in sources:
478+
for _, cut in local_sources:
411479
src_window = tuple(zip(rowcol(
412480
cut.transform, left, top, op=round, precision=precision
413481
), rowcol(
@@ -425,6 +493,15 @@ def merge(input_ortho_and_ortho_cuts, output_orthophoto, orthophoto_vars={}, mer
425493
blended = temp[-1] / 255.0 * temp[b] + (1 - temp[-1] / 255.0) * dstarr[b]
426494
np.copyto(dstarr[b], blended, casting='unsafe', where=temp[-1]!=0)
427495

428-
dstrast.write(dstarr, window=dst_window)
496+
with write_lock:
497+
dstrast.write(dstarr, window=dst_window)
498+
499+
parallel_map(process_block, block_windows, max_workers)
500+
501+
# Close all thread-local source handles opened during the parallel run.
502+
for srcs in opened_sources:
503+
for s, c in srcs:
504+
s.close()
505+
c.close()
429506

430507
return output_orthophoto

stages/splitmerge.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -265,7 +265,7 @@ def process(self, args, outputs):
265265
os.remove(tree.odm_orthophoto_tif)
266266

267267
orthophoto_vars = orthophoto.get_orthophoto_vars(args)
268-
orthophoto.merge(all_orthos_and_ortho_cuts, tree.odm_orthophoto_tif, orthophoto_vars, args.merge_skip_blending)
268+
orthophoto.merge(all_orthos_and_ortho_cuts, tree.odm_orthophoto_tif, orthophoto_vars, args.merge_skip_blending, max_workers=args.max_concurrency)
269269
orthophoto.post_orthophoto_steps(args, merged_bounds_file, tree.odm_orthophoto_tif, tree.orthophoto_tiles, args.orthophoto_resolution,
270270
reconstruction, tree, False)
271271
elif len(all_orthos_and_ortho_cuts) == 1:

0 commit comments

Comments
 (0)