4848from shapely .geometry import box
4949
5050from openpois .conflation .chunking import extract_centroids_lonlat
51+ from openpois .conflation .dedup_overture import mark_no_conflate
5152from openpois .conflation .match import (
5253 compute_match_scores ,
5354 find_and_score_matches_chunked ,
5657)
5758
5859CHECKPOINT_SUBDIR = "chunk_matches"
60+ DEDUP_CHECKPOINT_SUBDIR = "chunk_selfdedup"
61+ DEDUP_DROPPED_FILE = "overture_dedup_dropped.parquet"
62+ DEDUP_POST_FILTER_FILE = "overture_post_dedup.parquet"
5963from openpois .conflation .merge import (
6064 build_merge_parts ,
6165 build_merge_parts_chunked ,
@@ -140,6 +144,19 @@ def log_rss(label: str) -> None:
140144)
141145TEST_BBOX = config .get ("conflation" , "test_bbox" )
142146
147+ DEDUP_CFG = config .get (
148+ "conflation" , "overture_internal_dedup" ,
149+ )
150+ DEDUP_ENABLED = bool (DEDUP_CFG .get ("enabled" , True ))
151+ DEDUP_MIN_SCORE = float (DEDUP_CFG .get ("min_match_score" , 0.75 ))
152+ DEDUP_MAX_RADIUS_M = float (DEDUP_CFG .get ("max_radius_m" , 100 ))
153+ DEDUP_CHUNK_TARGET = int (
154+ DEDUP_CFG .get ("chunk_target_pois" , CHUNK_TARGET_POIS )
155+ )
156+ DEDUP_DUCKDB_MEM = str (
157+ DEDUP_CFG .get ("duckdb_memory_limit" , "4GB" )
158+ )
159+
143160# Columns needed for matching (memory optimization)
144161OSM_MATCH_COLS = [
145162 "osm_id" , "osm_type" , "name" , "brand" , "brand:wikidata" ,
@@ -168,6 +185,48 @@ def log_rss(label: str) -> None:
168185# -----------------------------------------------------------------
169186
170187
188+ def _write_dedup_audit (
189+ overture_gdf ,
190+ no_conflate : np .ndarray ,
191+ components : np .ndarray ,
192+ audit_path : Path ,
193+ ) -> None :
194+ """Write one row per dropped Overture POI with its cluster winner.
195+
196+ Columns: overture_id, cluster_id, winner_overture_id,
197+ confidence, geometry. Small file (~1 row per dropped POI); lets
198+ us spot-check dedup decisions in a GIS or with DuckDB.
199+ """
200+ losers_idx = np .where (no_conflate )[0 ]
201+ if len (losers_idx ) == 0 :
202+ return
203+
204+ n_comps = int (components .max ()) + 1
205+ winner_of = np .full (n_comps , - 1 , dtype = np .int64 )
206+ not_loser_idx = np .where (~ no_conflate )[0 ]
207+ winner_of [components [not_loser_idx ]] = not_loser_idx
208+ winner_idx = winner_of [components [losers_idx ]]
209+
210+ ids = overture_gdf ["overture_id" ].to_numpy ()
211+ confidence = (
212+ overture_gdf ["confidence" ].to_numpy ()
213+ if "confidence" in overture_gdf .columns
214+ else np .full (len (overture_gdf ), None , dtype = object )
215+ )
216+ audit = gpd .GeoDataFrame (
217+ {
218+ "overture_id" : ids [losers_idx ],
219+ "cluster_id" : components [losers_idx ],
220+ "winner_overture_id" : ids [winner_idx ],
221+ "confidence" : confidence [losers_idx ],
222+ "geometry" : overture_gdf .geometry .values [losers_idx ],
223+ },
224+ crs = overture_gdf .crs ,
225+ )
226+ audit_path .parent .mkdir (parents = True , exist_ok = True )
227+ audit .to_parquet (audit_path , compression = "zstd" )
228+
229+
171230def _load_gdf (
172231 path , columns , test_bbox = None , label = "dataset"
173232):
@@ -235,40 +294,132 @@ def _load_gdf(
235294 log_rss ("after Overture load" )
236295
237296 # -- Taxonomy assignment ---------------------------------------
238- print ("\n Assigning shared labels ..." )
239- osm_crosswalk = load_osm_crosswalk ()
297+ # Overture taxonomy is assigned first so the internal-dedup pass
298+ # (below) can reuse shared_label + L0 bits for its type scoring.
299+ # OSM taxonomy follows after dedup has filtered Overture rows.
300+ print ("\n Assigning Overture shared labels ..." )
240301 overture_crosswalk = load_overture_crosswalk ()
241302 match_radii = load_match_radii ()
242303 top_level_matches = load_top_level_matches ()
243304
244- osm_shared_labels , osm_radii = assign_osm_shared_label (
245- osm_gdf , osm_crosswalk , match_radii , FILTER_KEYS ,
246- default_radius_m = DEFAULT_RADIUS_M ,
247- )
248305 overture_shared_labels , overture_radii = (
249306 assign_overture_shared_label (
250307 overture_gdf , overture_crosswalk , match_radii ,
251308 default_radius_m = DEFAULT_RADIUS_M ,
252309 )
253310 )
254-
255- osm_assigned = np .sum (osm_shared_labels != "" )
256311 ov_assigned = np .sum (overture_shared_labels != "" )
257- print (
258- f" OSM: { osm_assigned :,} /{ len (osm_gdf ):,} assigned"
259- )
260312 print (
261313 f" Overture: { ov_assigned :,} /{ len (overture_gdf ):,} "
262314 f" assigned"
263315 )
316+ overture_l0_bits = compute_overture_l0_bits (
317+ overture_gdf ["taxonomy_l0" ].fillna ("" ).to_numpy (),
318+ )
319+ del overture_crosswalk
320+ log_rss ("after Overture taxonomy assignment" )
321+
322+ # -- Overture internal deduplication ---------------------------
323+ conflation_dir = Path (OUTPUT_PATH ).parent
324+ dedup_summary : dict | None = None
325+ # Path used for the Overture reload at merge time. Overridden to
326+ # a post-dedup temp parquet when dedup runs so the reload's row
327+ # indices match the match-phase output.
328+ overture_merge_source_path = OVERTURE_PATH
329+ overture_merge_needs_test_bbox = True
330+ if DEDUP_ENABLED :
331+ dedup_checkpoint_dir = (
332+ conflation_dir / DEDUP_CHECKPOINT_SUBDIR
333+ )
334+ (
335+ no_conflate ,
336+ dedup_components ,
337+ dedup_pairs ,
338+ dedup_summary ,
339+ ) = mark_no_conflate (
340+ overture_gdf ,
341+ overture_shared_labels ,
342+ overture_l0_bits ,
343+ min_match_score = DEDUP_MIN_SCORE ,
344+ max_radius_m = DEDUP_MAX_RADIUS_M ,
345+ chunk_target_pois = DEDUP_CHUNK_TARGET ,
346+ chunk_size = CHUNK_SIZE ,
347+ checkpoint_dir = dedup_checkpoint_dir ,
348+ duckdb_memory_limit = DEDUP_DUCKDB_MEM ,
349+ )
350+ log_rss (
351+ f"after Overture dedup "
352+ f"({ dedup_summary ['n_dropped' ]:,} marked)"
353+ )
354+
355+ if no_conflate .any ():
356+ audit_path = conflation_dir / DEDUP_DROPPED_FILE
357+ _write_dedup_audit (
358+ overture_gdf , no_conflate ,
359+ dedup_components , audit_path ,
360+ )
361+ print (
362+ f" Wrote { dedup_summary ['n_dropped' ]:,} dropped "
363+ f"rows to { audit_path } "
364+ )
365+
366+ # Filter overture rows + parallel arrays. A boolean-mask
367+ # copy allocates a fresh frame; ``del`` + ``gc.collect`` on
368+ # the old one releases the dropped-rows' memory before the
369+ # OSM taxonomy + matching phase begins.
370+ keep_mask = ~ no_conflate
371+ n_before = len (overture_gdf )
372+ overture_gdf = (
373+ overture_gdf .loc [keep_mask ].reset_index (drop = True )
374+ )
375+ overture_shared_labels = overture_shared_labels [keep_mask ]
376+ overture_radii = overture_radii [keep_mask ]
377+ overture_l0_bits = overture_l0_bits [keep_mask ]
378+ del (
379+ no_conflate , dedup_components ,
380+ dedup_pairs , keep_mask ,
381+ )
382+ gc .collect ()
383+ if dedup_checkpoint_dir .exists ():
384+ shutil .rmtree (dedup_checkpoint_dir )
385+ print (
386+ f" Overture rows after dedup: { len (overture_gdf ):,} "
387+ f"(dropped { n_before - len (overture_gdf ):,} )"
388+ )
389+
390+ # Spill post-dedup Overture to disk so the later merge-phase
391+ # reload sees rows whose indices match the match-phase output
392+ # (the pre-dedup snapshot on disk would misalign against
393+ # ``matches.overture_idx``). Minimal column set — merge adds
394+ # no metadata beyond what's already here.
395+ overture_merge_source_path = (
396+ conflation_dir / DEDUP_POST_FILTER_FILE
397+ )
398+ overture_merge_source_path .parent .mkdir (
399+ parents = True , exist_ok = True ,
400+ )
401+ overture_gdf .to_parquet (
402+ overture_merge_source_path , compression = "zstd" ,
403+ )
404+ overture_merge_needs_test_bbox = False
405+ log_rss ("after Overture dedup filter" )
406+ else :
407+ print ("\n Overture internal deduplication disabled." )
264408
265- # Compute L0 bitmasks BEFORE dropping tag columns
409+ # -- OSM taxonomy assignment -----------------------------------
410+ print ("\n Assigning OSM shared labels ..." )
411+ osm_crosswalk = load_osm_crosswalk ()
412+ osm_shared_labels , osm_radii = assign_osm_shared_label (
413+ osm_gdf , osm_crosswalk , match_radii , FILTER_KEYS ,
414+ default_radius_m = DEFAULT_RADIUS_M ,
415+ )
416+ osm_assigned = np .sum (osm_shared_labels != "" )
417+ print (
418+ f" OSM: { osm_assigned :,} /{ len (osm_gdf ):,} assigned"
419+ )
266420 osm_l0_bits = compute_osm_l0_bits (
267421 osm_gdf , top_level_matches ,
268422 )
269- overture_l0_bits = compute_overture_l0_bits (
270- overture_gdf ["taxonomy_l0" ].fillna ("" ).to_numpy (),
271- )
272423
273424 # Drop columns only needed for taxonomy assignment
274425 for col in [
@@ -280,7 +431,7 @@ def _load_gdf(
280431 for col in ["taxonomy_l0" , "taxonomy_l1" , "taxonomy_l2" ]:
281432 if col in overture_gdf .columns :
282433 overture_gdf .drop (columns = col , inplace = True )
283- del osm_crosswalk , overture_crosswalk , top_level_matches
434+ del osm_crosswalk , top_level_matches , match_radii
284435 log_rss ("after taxonomy assignment + tag-col drop" )
285436
286437 # -- Matching --------------------------------------------------
@@ -362,7 +513,6 @@ def _load_gdf(
362513 gc .collect ()
363514 else :
364515 # -- Chunked driver (default) ------------------------------
365- conflation_dir = Path (OUTPUT_PATH ).parent
366516 checkpoint_dir = conflation_dir / CHECKPOINT_SUBDIR
367517 print (
368518 f"\n Running chunked matching "
@@ -451,8 +601,12 @@ def _load_gdf(
451601 test_bbox = test_bbox , label = "OSM (merge cols)" ,
452602 )
453603 overture_gdf = _load_gdf (
454- OVERTURE_PATH , OVERTURE_MERGE_COLS ,
455- test_bbox = test_bbox ,
604+ overture_merge_source_path ,
605+ OVERTURE_MERGE_COLS ,
606+ test_bbox = (
607+ test_bbox if overture_merge_needs_test_bbox
608+ else None
609+ ),
456610 label = "Overture (merge cols)" ,
457611 )
458612 log_rss ("after reload for merge" )
@@ -493,6 +647,13 @@ def _load_gdf(
493647 # Clear chunk checkpoints after a successful save.
494648 if checkpoint_dir is not None and checkpoint_dir .exists ():
495649 shutil .rmtree (checkpoint_dir )
650+ # Clear the post-dedup Overture temp parquet (kept only to back
651+ # the merge-phase reload).
652+ if (
653+ overture_merge_source_path != OVERTURE_PATH
654+ and overture_merge_source_path .exists ()
655+ ):
656+ overture_merge_source_path .unlink ()
496657
497658 # -- Summary ---------------------------------------------------
498659 elapsed = time .time () - t0
@@ -518,4 +679,10 @@ def _load_gdf(
518679 f" Overture dedup drops: "
519680 f"{ chunk_summary ['n_overture_dedup_drops' ]:,} "
520681 )
682+ if dedup_summary is not None :
683+ print (
684+ f" Overture internal dedup: "
685+ f"{ dedup_summary ['n_dropped' ]:,} dropped across "
686+ f"{ dedup_summary ['n_multi_clusters' ]:,} clusters"
687+ )
521688 print (f" Output: { OUTPUT_PATH } " )
0 commit comments