-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathreplace_area_in_pointcloud.py
More file actions
280 lines (232 loc) · 10.3 KB
/
Copy pathreplace_area_in_pointcloud.py
File metadata and controls
280 lines (232 loc) · 10.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
import argparse
import os
import shutil
import tempfile
import time
import warnings
import geopandas as gpd
import numpy as np
import pdal
from numpy.lib import recfunctions as rfn
from osgeo import gdal
from shapely.geometry import box
from pdaltools.las_info import (
get_bounds_from_header_info,
get_writer_parameters_from_reader_metadata,
las_info_metadata,
)
def argument_parser():
parser = argparse.ArgumentParser(
"Replace points in a pointcloud, based on an area. "
"Source may come from from another pointcloud (command from_cloud), "
"or may be derivated from a digital surface model (command from_DSM).\n"
)
subparsers = parser.add_subparsers(required=True)
# first command is 'from_cloud'
from_cloud = subparsers.add_parser("from_cloud", help="Source is a point cloud")
from_cloud.add_argument("--source_cloud", "-s", required=True, type=str, help="path of source point cloud")
add_common_options(from_cloud)
from_cloud.set_defaults(func=from_cloud_func)
# second command is 'from_DSM'
from_DSM = subparsers.add_parser("from_DSM", help="Source is a digital surface model (DSM)")
from_DSM.add_argument(
"--source_dsm",
"-d",
required=True,
type=str,
help="path of the source digital surface model (DSM), used to generate source points",
)
from_DSM.add_argument(
"--source_ground_mask",
"-g",
required=True,
type=str,
help=(
"ground mask, a raster file used to filter source cloud. "
"Pixel with value > 0 is considered as ground, and define the source cloud we keep. "
"(tif or other raster format readable by GDAL)"
),
)
from_DSM.add_argument(
"--source_classification",
"-c",
required=True,
type=int,
help="classification to apply to the points extracted from the DSM",
)
add_common_options(from_DSM)
from_DSM.set_defaults(func=from_DSM_func)
return parser
def add_common_options(parser):
parser.add_argument(
"--source_pdal_filter", "-f", type=str, help="pdal filter expression to apply to source point cloud"
)
parser.add_argument("--target_cloud", "-t", type=str, required=True, help="path of target cloud to be modified")
parser.add_argument(
"--replacement_area",
"-r",
required=True,
type=str,
help="area to replace (shapefile, geojson or other vector format readable by GDAL)",
)
parser.add_argument("--output_cloud", "-o", required=True, type=str, help="output cloud file")
def from_cloud_func(args):
replace_area(
target_cloud=args.target_cloud,
pipeline_source=pipeline_read_from_cloud(args.source_cloud),
replacement_area=args.replacement_area,
output_cloud=args.output_cloud,
source_pdal_filter=args.source_pdal_filter,
)
def from_DSM_func(args):
replace_area(
target_cloud=args.target_cloud,
pipeline_source=pipeline_read_from_DSM(
dsm=args.source_dsm, ground_mask=args.source_ground_mask, classification=args.source_classification
),
replacement_area=args.replacement_area,
output_cloud=args.output_cloud,
source_pdal_filter=args.source_pdal_filter,
)
def get_writer_params(input_file):
pipeline = pdal.Pipeline()
pipeline |= pdal.Reader.las(filename=input_file)
pipeline.execute()
params = get_writer_parameters_from_reader_metadata(pipeline.metadata)
return params
def pipeline_read_from_cloud(filename):
print("source cloud: ", filename)
pipeline_source = pdal.Pipeline()
pipeline_source |= pdal.Reader.las(filename=filename)
return pipeline_source
def pipeline_read_from_DSM(dsm, ground_mask, classification):
print("DSM: ", dsm)
print("ground_mask: ", ground_mask)
print("classification: ", classification)
# get nodata value
ds = gdal.Open(dsm)
band = ds.GetRasterBand(1)
nodata_value = band.GetNoDataValue()
print("DSM: nodata:", nodata_value)
ds.Close()
pipeline = pdal.Pipeline()
pipeline |= pdal.Reader.gdal(filename=dsm, header="Z")
if nodata_value is not None: # nodata_value may be None and cause bugs
pipeline |= pdal.Filter.expression(expression=f"Z != {nodata_value}")
pipeline |= pdal.Filter.ferry(dimensions="=> ground")
pipeline |= pdal.Filter.assign(assignment="ground[:]=-1")
pipeline |= pdal.Filter.colorization(dimensions="ground:1:1.0", raster=ground_mask)
# Keep only points in the area
pipeline |= pdal.Filter.expression(expression="ground>0")
# assign class
pipeline |= pdal.Filter.ferry(dimensions="=>Classification")
pipeline |= pdal.Filter.assign(assignment=f"Classification[:]={classification}")
return pipeline
def clip_area(area_input, area_output, las_file):
start = time.time()
gdf = gpd.read_file(area_input)
minx, maxx, miny, maxy = get_bounds_from_header_info(las_info_metadata(las_file))
selection = gdf[gdf.intersects(box(minx, miny, maxx, maxy))]
num_polygon = len(selection)
selection.to_file(area_output)
end = time.time()
print(f"Create replacement area cropped: {num_polygon} polygons in {end-start:.2f} seconds")
return num_polygon
def replace_area(
target_cloud, pipeline_source, replacement_area, output_cloud, source_pdal_filter="", target_pdal_filter=""
):
print("target cloud: ", target_cloud)
print("replacement area: ", replacement_area)
print("output cloud: ", output_cloud)
print("source pdal filter: ", source_pdal_filter)
print("target pdal filter: ", target_pdal_filter)
start = time.time()
tmpdir = tempfile.mkdtemp()
replacement_name, ext = os.path.splitext(os.path.basename(replacement_area))
las_name, _ = os.path.splitext(os.path.basename(target_cloud))
replacement_area_crop = os.path.join(tmpdir, f"{replacement_name}_{las_name}{ext}")
print("replacement area crop: ", replacement_area_crop)
num_polygon = clip_area(replacement_area, replacement_area_crop, target_cloud)
if num_polygon == 0:
print("No polygon inside target cloud, output file is target cloud. We copy target in output.")
shutil.copy2(src=target_cloud, dst=output_cloud)
end = time.time()
print("all steps done in ", f"{end-start:.2f}", " seconds")
return
replacement_area = replacement_area_crop
crops = []
# pipeline to read target_cloud and remove points inside the polygon
pipeline_target = pdal.Pipeline()
pipeline_target |= pdal.Reader.las(filename=target_cloud)
pipeline_target |= pdal.Filter.ferry(dimensions="=> geometryFid")
# Assign -1 to all points because overlay replaces values from 0 and more
pipeline_target |= pdal.Filter.assign(assignment="geometryFid[:]=-1")
if target_pdal_filter:
pipeline_target |= pdal.Filter.expression(expression=target_pdal_filter)
pipeline_target |= pdal.Filter.overlay(column="fid", dimension="geometryFid", datasource=replacement_area)
# Keep only points out of the area
pipeline_target |= pdal.Filter.expression(expression="geometryFid==-1", tag="A")
target_count = pipeline_target.execute()
t1 = time.time()
print(f"Step 1: target count: {target_count} points in {t1-start:.2f} seconds")
# get input dimensions dtype from target
if pipeline_target.arrays:
input_dim_dtype = pipeline_target.arrays[0].dtype
else:
# re-read the LAS only if we cant have dimensions with previous pipeline (empty output)
pipeline_target2 = pdal.Pipeline()
pipeline_target2 |= pdal.Reader.las(filename=target_cloud)
pipeline_target2.execute()
input_dim_dtype = pipeline_target2.arrays[0].dtype
t1_bis = time.time()
print(f"Step 1-bis: re-read to have dimensions: {t1_bis-t1:.2f} seconds")
t2 = time.time()
# get input dimensions names
input_dimensions = list(input_dim_dtype.fields.keys())
# do not keep geometryFid
output_dimensions = [dim for dim in input_dimensions if dim not in "geometryFid"]
# add target to the result after keeping only the expected dimensions
if target_count:
target_cloud_pruned = pipeline_target.arrays[0][output_dimensions]
crops.append(target_cloud_pruned)
# pipeline to read source_cloud and remove points outside the polygon
pipeline_source |= pdal.Filter.ferry(dimensions="=> geometryFid")
pipeline_source |= pdal.Filter.assign(assignment="geometryFid[:]=-1")
if source_pdal_filter:
pipeline_source |= pdal.Filter.expression(expression=source_pdal_filter)
pipeline_source |= pdal.Filter.overlay(column="fid", dimension="geometryFid", datasource=replacement_area)
# Keep only points in the area
pipeline_source |= pdal.Filter.expression(expression="geometryFid>=0", tag="B")
source_count = pipeline_source.execute()
t3 = time.time()
print(f"Step 2: source count: {source_count} points in {t3-t2:.2f} seconds")
# add source to the result
if source_count:
# eventually add dimensions in source to have same dimensions as target cloud
# we do that in numpy (instead of PDAL filter) to keep dimension types
source_cloud_crop = pipeline_source.arrays[0]
nb_points = source_cloud_crop.shape[0]
source_dims = source_cloud_crop.dtype.fields.keys()
for dim_name, dim_type in input_dim_dtype.fields.items():
if dim_name not in source_dims:
source_cloud_crop = rfn.append_fields(
base=source_cloud_crop,
names=dim_name,
data=np.zeros(nb_points, dtype=dim_type[0]),
dtypes=dim_type[0],
)
source_cloud_pruned = source_cloud_crop[output_dimensions]
crops.append(source_cloud_pruned)
# Merge
if not crops:
warnings.warn("WARNING: Empty LAS, extra dims are lost")
pipeline = pdal.Filter.merge().pipeline(*crops)
writer_params = get_writer_params(target_cloud)
pipeline |= pdal.Writer.las(filename=output_cloud, **writer_params)
points = pipeline.execute()
end = time.time()
print(f"Step 3: merge: {points}, points in {end-t3:.2f} seconds")
print("all steps done in ", f"{end-start:.2f}", " seconds")
if __name__ == "__main__":
args = argument_parser().parse_args()
args.func(args)