Skip to content

Commit d8886ee

Browse files
committed
Updates to chunk out analysis, reducing peak memory use.
1 parent fe26096 commit d8886ee

9 files changed

Lines changed: 2156 additions & 204 deletions

File tree

.claude/TODO.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ Short running list of in-progress / upcoming work. Edit freely; trim older compl
66

77
## Upcoming
88

9+
- [ ] Trim peak memory in `scripts/conflation/conflate.py` so the OSM(8.7M) × Overture(13M) run fits inside 16GB+4GB swap. Widened Overture allowlist (6.29M → 13.05M) overflowed WSL2 and rebooted the VM on 2026-04-17. Tactics: drop `osm_gdf`/`overture_gdf` down to the minimal column set `build_merge_parts` needs before the scoring pass, free normalized name arrays right after `compute_name_scores`, and narrow `osm_idx`/`overture_idx` to int32. See [src/openpois/conflation/match.py](../src/openpois/conflation/match.py) and [scripts/conflation/conflate.py](../scripts/conflation/conflate.py). Added 2026-04-17.
10+
- [ ] Instrument setup / matching-reload phase of `scripts/conflation/conflate.py` to pin down the ~17 GB VmHWM spike observed on the 2026-04-17 chunked run (checkpoints reloaded, merge phase bounded — spike is upstream of both). Likely culprits: `pd.concat` of 128 chunk parquets, name/brand array construction holding dual refs, or taxonomy crosswalk transient. Add `psutil` RSS logging at each phase boundary so we can see exactly which step jumps. Added 2026-04-17.
911
- [ ] Watch for a DuckDB release that fixes the WSL2 httpfs "Information loss on integer cast" crash (issue #21669, fix PR #21395). Once a tagged release ships with the fix and a full `scripts/overture/download.py` run on WSL2 completes, we can unpin from `duckdb==1.4.1` and revert the per-part download to a single-query DuckDB scan. Added 2026-04-17.
1012
- [ ] Auto-check taxonomy changes whenever we switch to a new Overture Maps version (detect new/removed L0/L1/L2 categories vs. `taxonomy_crosswalk_overture_maps.csv` and flag gaps). Added 2026-04-16.
1113
- [ ] Watch for Overture L0/L1 → flat `basic_category` migration (~June 2026). Crosswalk CSV + `assign_overture_shared_label` will need updating. See [docs/taxonomy-setup.md](docs/taxonomy-setup.md).

config.yaml

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,9 +40,9 @@ download:
4040
history_pbf_url: "https://osm-internal.download.geofabrik.de/north-america/us-internal.osh.pbf"
4141
pr_history_pbf_url: "https://osm-internal.download.geofabrik.de/north-america/us/puerto-rico-internal.osh.pbf"
4242
history_cookie_file: "~/data/openpois/.creds/geofabrik_cookies.txt"
43-
overwrite_download: false
44-
overwrite_filter: false
45-
overwrite_parse: false
43+
overwrite_download: true
44+
overwrite_filter: true
45+
overwrite_parse: true
4646
source_label: "osm"
4747
keep_all_keys: false
4848
chunk_size: 100_000
@@ -223,6 +223,7 @@ conflation:
223223
type_weight: 0.30
224224
identifier_weight: 0.20
225225
chunk_size: 500_000
226+
chunk_target_pois: 200_000
226227
test_bbox:
227228
xmin: -122.45
228229
ymin: 47.50

scripts/conflation/conflate.py

Lines changed: 165 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,9 @@
3737

3838
import argparse
3939
import gc
40+
import shutil
4041
import time
42+
from pathlib import Path
4143

4244
import geopandas as gpd
4345
import numpy as np
@@ -47,11 +49,15 @@
4749

4850
from openpois.conflation.match import (
4951
compute_match_scores,
52+
find_and_score_matches_chunked,
5053
find_spatial_candidates,
5154
select_best_matches,
5255
)
56+
57+
CHECKPOINT_SUBDIR = "chunk_matches"
5358
from openpois.conflation.merge import (
5459
build_merge_parts,
60+
build_merge_parts_chunked,
5561
save_conflated_from_parts,
5662
)
5763
from openpois.conflation.taxonomy import (
@@ -89,6 +95,9 @@
8995
TYPE_WEIGHT = config.get("conflation", "type_weight")
9096
IDENTIFIER_WEIGHT = config.get("conflation", "identifier_weight")
9197
CHUNK_SIZE = config.get("conflation", "chunk_size")
98+
CHUNK_TARGET_POIS = config.get(
99+
"conflation", "chunk_target_pois",
100+
)
92101
TEST_BBOX = config.get("conflation", "test_bbox")
93102

94103
# Columns needed for matching (memory optimization)
@@ -146,6 +155,15 @@ def _load_gdf(
146155
"(Seattle area) for testing."
147156
),
148157
)
158+
parser.add_argument(
159+
"--no-chunk",
160+
action = "store_true",
161+
help = (
162+
"Disable spatial chunking and run matching over the "
163+
"full dataset in one pass. Uses more memory; kept as "
164+
"a debug/baseline option."
165+
),
166+
)
149167
args = parser.parse_args()
150168
t0 = time.time()
151169

@@ -209,75 +227,135 @@ def _load_gdf(
209227
overture_gdf.drop(columns = col, inplace = True)
210228
gc.collect()
211229

212-
# -- Spatial candidates ----------------------------------------
213-
print(f"\nFinding spatial candidates (max {MAX_RADIUS_M}m) ...")
214-
candidates = find_spatial_candidates(
215-
osm_geom = osm_gdf.geometry.values,
216-
overture_geom = overture_gdf.geometry.values,
217-
osm_radii_m = osm_radii,
218-
max_radius_m = MAX_RADIUS_M,
219-
chunk_size = CHUNK_SIZE,
230+
# -- Matching --------------------------------------------------
231+
# Prepare name/brand arrays once (used by both code paths).
232+
osm_names = osm_gdf["name"].to_numpy()
233+
osm_brands = (
234+
osm_gdf["brand"].to_numpy()
235+
if "brand" in osm_gdf.columns
236+
else np.full(len(osm_gdf), None, dtype = object)
237+
)
238+
overture_names = overture_gdf["overture_name"].to_numpy()
239+
overture_brands = (
240+
overture_gdf["brand_name"].to_numpy()
241+
if "brand_name" in overture_gdf.columns
242+
else np.full(
243+
len(overture_gdf), None, dtype = object
244+
)
220245
)
221-
print(f" Found {len(candidates):,} candidate pairs")
222-
gc.collect()
223246

224-
if candidates.empty:
225-
print("No spatial candidates found. Merging all as unmatched.")
226-
matches = candidates
227-
else:
228-
# -- Scoring -----------------------------------------------
229-
print("\nScoring candidates ...")
230-
osm_names = osm_gdf["name"].to_numpy()
231-
osm_brands = (
232-
osm_gdf["brand"].to_numpy()
233-
if "brand" in osm_gdf.columns
234-
else np.full(len(osm_gdf), None, dtype = object)
247+
chunk_summary: dict | None = None
248+
checkpoint_dir: Path | None = None
249+
250+
if args.no_chunk:
251+
# -- Non-chunked baseline (full-dataset pipeline) ----------
252+
print(
253+
f"\nFinding spatial candidates (max {MAX_RADIUS_M}m) ..."
235254
)
236-
overture_names = overture_gdf["overture_name"].to_numpy()
237-
overture_brands = (
238-
overture_gdf["brand_name"].to_numpy()
239-
if "brand_name" in overture_gdf.columns
240-
else np.full(
241-
len(overture_gdf), None, dtype = object
242-
)
255+
candidates = find_spatial_candidates(
256+
osm_geom = osm_gdf.geometry.values,
257+
overture_geom = overture_gdf.geometry.values,
258+
osm_radii_m = osm_radii,
259+
max_radius_m = MAX_RADIUS_M,
260+
chunk_size = CHUNK_SIZE,
243261
)
262+
print(f" Found {len(candidates):,} candidate pairs")
263+
gc.collect()
244264

245-
scored = compute_match_scores(
246-
candidates = candidates,
247-
osm_names = osm_names,
248-
osm_brands = osm_brands,
249-
overture_names = overture_names,
250-
overture_brands = overture_brands,
265+
if candidates.empty:
266+
print(
267+
"No spatial candidates found. "
268+
"Merging all as unmatched."
269+
)
270+
matches = candidates
271+
else:
272+
print("\nScoring candidates ...")
273+
scored = compute_match_scores(
274+
candidates = candidates,
275+
osm_names = osm_names,
276+
osm_brands = osm_brands,
277+
overture_names = overture_names,
278+
overture_brands = overture_brands,
279+
osm_shared_labels = osm_shared_labels,
280+
overture_shared_labels = overture_shared_labels,
281+
osm_radii_m = osm_radii,
282+
osm_l0_bits = osm_l0_bits,
283+
overture_l0_bits = overture_l0_bits,
284+
distance_weight = DISTANCE_WEIGHT,
285+
name_weight = NAME_WEIGHT,
286+
type_weight = TYPE_WEIGHT,
287+
identifier_weight = IDENTIFIER_WEIGHT,
288+
)
289+
print(
290+
f" Mean composite score: "
291+
f"{scored['composite_score'].mean():.3f}"
292+
)
293+
294+
print(
295+
f"\nSelecting best matches "
296+
f"(min_score={MIN_MATCH_SCORE}) ..."
297+
)
298+
matches = select_best_matches(
299+
scored, min_score = MIN_MATCH_SCORE,
300+
)
301+
print(
302+
f" Selected {len(matches):,} one-to-one matches"
303+
)
304+
305+
del scored, candidates
306+
gc.collect()
307+
else:
308+
# -- Chunked driver (default) ------------------------------
309+
conflation_dir = Path(OUTPUT_PATH).parent
310+
checkpoint_dir = conflation_dir / CHECKPOINT_SUBDIR
311+
print(
312+
f"\nRunning chunked matching "
313+
f"(target ~{CHUNK_TARGET_POIS:,} POIs/chunk, "
314+
f"max {MAX_RADIUS_M}m) ..."
315+
)
316+
matches, chunk_summary = find_and_score_matches_chunked(
317+
osm_geom = osm_gdf.geometry.values,
318+
overture_geom = overture_gdf.geometry.values,
319+
osm_radii_m = osm_radii,
251320
osm_shared_labels = osm_shared_labels,
252321
overture_shared_labels = overture_shared_labels,
253-
osm_radii_m = osm_radii,
254322
osm_l0_bits = osm_l0_bits,
255323
overture_l0_bits = overture_l0_bits,
324+
osm_names = osm_names,
325+
osm_brands = osm_brands,
326+
overture_names = overture_names,
327+
overture_brands = overture_brands,
256328
distance_weight = DISTANCE_WEIGHT,
257329
name_weight = NAME_WEIGHT,
258330
type_weight = TYPE_WEIGHT,
259331
identifier_weight = IDENTIFIER_WEIGHT,
332+
min_match_score = MIN_MATCH_SCORE,
333+
max_radius_m = MAX_RADIUS_M,
334+
chunk_target_pois = CHUNK_TARGET_POIS,
335+
chunk_size = CHUNK_SIZE,
336+
checkpoint_dir = checkpoint_dir,
260337
)
261338
print(
262-
f" Mean composite score: "
263-
f"{scored['composite_score'].mean():.3f}"
339+
f" Selected {len(matches):,} one-to-one matches "
340+
f"across {chunk_summary['n_chunks']} chunks "
341+
f"(Overture dedup drops: "
342+
f"{chunk_summary['n_overture_dedup_drops']:,})"
264343
)
265344

266-
# -- Best matches ------------------------------------------
267-
print(
268-
f"\nSelecting best matches "
269-
f"(min_score={MIN_MATCH_SCORE}) ..."
270-
)
271-
matches = select_best_matches(
272-
scored, min_score = MIN_MATCH_SCORE,
273-
)
274-
print(f" Selected {len(matches):,} one-to-one matches")
345+
del osm_names, osm_brands
346+
del overture_names, overture_brands
347+
gc.collect()
275348

276-
# Free scoring intermediates
277-
del scored, candidates
278-
del osm_names, osm_brands
279-
del overture_names, overture_brands
280-
gc.collect()
349+
# Drop scoring intermediates the merge doesn't need. Keeps only
350+
# the four columns read by ``_build_matched_gdf``.
351+
keep_cols = [
352+
"osm_idx", "overture_idx",
353+
"composite_score", "distance_m",
354+
]
355+
matches = matches[
356+
[c for c in keep_cols if c in matches.columns]
357+
].reset_index(drop = True)
358+
gc.collect()
281359

282360
# -- Merge (disk-backed to limit memory) -----------------------
283361
print("\nMerging into unified dataset ...")
@@ -291,14 +369,27 @@ def _load_gdf(
291369
)
292370
n_matches = len(matches)
293371

294-
part_paths = build_merge_parts(
295-
osm_gdf = osm_gdf,
296-
overture_gdf = overture_gdf,
297-
matches = matches,
298-
osm_shared_labels = osm_shared_labels,
299-
overture_shared_labels = overture_shared_labels,
300-
overture_confidence_weight = OVERTURE_CONF_WEIGHT,
301-
)
372+
if chunk_summary is not None:
373+
part_paths = build_merge_parts_chunked(
374+
osm_gdf = osm_gdf,
375+
overture_gdf = overture_gdf,
376+
matches = matches,
377+
osm_shared_labels = osm_shared_labels,
378+
overture_shared_labels = overture_shared_labels,
379+
osm_primary = chunk_summary["osm_primary"],
380+
overture_primary = chunk_summary["overture_primary"],
381+
n_chunks = chunk_summary["n_chunks"],
382+
overture_confidence_weight = OVERTURE_CONF_WEIGHT,
383+
)
384+
else:
385+
part_paths = build_merge_parts(
386+
osm_gdf = osm_gdf,
387+
overture_gdf = overture_gdf,
388+
matches = matches,
389+
osm_shared_labels = osm_shared_labels,
390+
overture_shared_labels = overture_shared_labels,
391+
overture_confidence_weight = OVERTURE_CONF_WEIGHT,
392+
)
302393

303394
# Free ALL source data before concat+save
304395
del osm_gdf, overture_gdf, matches
@@ -311,6 +402,10 @@ def _load_gdf(
311402
n_total = save_conflated_from_parts(part_paths, OUTPUT_PATH)
312403
config.write_self("conflation")
313404

405+
# Clear chunk checkpoints after a successful save.
406+
if checkpoint_dir is not None and checkpoint_dir.exists():
407+
shutil.rmtree(checkpoint_dir)
408+
314409
# -- Summary ---------------------------------------------------
315410
elapsed = time.time() - t0
316411
print(f"\n{'=' * 60}")
@@ -324,4 +419,15 @@ def _load_gdf(
324419
print(
325420
f" Mean match distance: {match_dist_mean:.1f}m"
326421
)
422+
if chunk_summary is not None:
423+
print(
424+
f" Chunks: {chunk_summary['n_chunks']} "
425+
f"(OSM/chunk: "
426+
f"{chunk_summary['min_chunk_pois']:,}"
427+
f"–{chunk_summary['max_chunk_pois']:,})"
428+
)
429+
print(
430+
f" Overture dedup drops: "
431+
f"{chunk_summary['n_overture_dedup_drops']:,}"
432+
)
327433
print(f" Output: {OUTPUT_PATH}")

0 commit comments

Comments
 (0)