diff --git a/backend/apis/retrieve_frames.py b/backend/apis/retrieve_frames.py index 3981ca5..d8c8893 100644 --- a/backend/apis/retrieve_frames.py +++ b/backend/apis/retrieve_frames.py @@ -1,6 +1,3 @@ -""" -Frame Data API: returns a single rendered GeoTIFF frame for a completed job. -""" import os import json @@ -9,7 +6,8 @@ def get_frame(job_id: str, index: int): """Return a single GeoTIFF frame file for the given job and frame index. - Includes an X-Frame-Timestamp header of the radar observation time. + Includes an X-Frame-Timestamp header of the radar observation time + and a boolean "X-Frame-Is-Forecast" header. """ job_dir = "/processed_data/" + job_id if not os.path.exists(job_dir): @@ -27,25 +25,30 @@ def get_frame(job_id: str, index: int): ) ) - # attach the radar observation timestamp if available - timestamp = get_frame_timestamp(job_dir, index) - if timestamp is not None: - response.headers["X-Frame-Timestamp"] = timestamp + # attach per-frame metadata from manifest + entry = get_frame_manifest_entry(job_dir, index) + if entry is not None: + timestamp = entry.get("timestamp") + if timestamp is not None: + response.headers["X-Frame-Timestamp"] = timestamp + is_forecast = entry.get("is_forecast", False) + response.headers["X-Frame-Is-Forecast"] = "true" if is_forecast else "false" return response -def get_frame_timestamp(job_dir: str, index: int) -> str | None: - """Read the per-frame observation timestamp from the job's manifest file. - Returns an ISO 8601 UTC string (e.g. "2024-07-07T01:22:24Z") or - None if the manifest is missing or the frame index has no entry. +def get_frame_manifest_entry(job_dir: str, index: int) -> dict | None: + """Read the per-frame metadata entry from the job's manifest.json. + Returns a dict with { "timestamp": "...", "is_forecast": bool }, + or None if the manifest is missing or the frame index has no entry. """ - manifest_path = os.path.join(job_dir, "timestamps.json") - if not os.path.exists(manifest_path): - return None - try: - with open(manifest_path) as f: - manifest = json.load(f) - return manifest.get(str(index)) - except Exception: - return None + manifest_path = os.path.join(job_dir, "manifest.json") + if os.path.exists(manifest_path): + try: + with open(manifest_path) as f: + manifest = json.load(f) + return manifest.get(str(index)) + except Exception: + return None + + return None diff --git a/nfgda_service/process_output.py b/nfgda_service/process_output.py index 20f0b97..6b5f4a7 100644 --- a/nfgda_service/process_output.py +++ b/nfgda_service/process_output.py @@ -7,7 +7,8 @@ import matplotlib.colors as mcolors from osgeo import gdal, osr from scipy.ndimage import binary_dilation -from skimage.morphology import disk +from skimage.morphology import skeletonize, disk + import os import redis import json @@ -62,8 +63,12 @@ _nexrad_boundaries = np.arange(-20, 75.1, 1) # 96 bins, matching colorlevel.py _nexrad_norm = mcolors.BoundaryNorm(boundaries=_nexrad_boundaries, ncolors=_nexrad_cmap.N) -# Gust-front overlay color (red, fully opaque) -_GF_RGBA = np.array([255, 0, 0, 255], dtype=np.uint8) +# Gust-front overlay colors +_GF_RGBA_DETECTED = np.array([255, 0, 0, 255], dtype=np.uint8) # red – observed detection +_GF_RGBA_FC_30 = np.array([255, 140, 0, 255], dtype=np.uint8) # orange – forecast ≥ 30% +_GF_RGBA_FC_50 = np.array([220, 10, 120, 255], dtype=np.uint8) # reddish-fuchsia – forecast ≥ 50% (slightly dimmed) +_GF_RGBA_FC_75 = np.array([120, 0, 255, 255], dtype=np.uint8) # vibrant violet – forecast ≥ 75% +_GF_RGBA_FORECAST = _GF_RGBA_FC_75 # alias used in detection rendering (is_forecast=True path) def generate_geotiff_output(job_id: str, redis_client: redis.Redis): @@ -77,10 +82,10 @@ def generate_geotiff_output(job_id: str, redis_client: redis.Redis): if not os.path.exists(f"/nfgda_output/{job_id}/nfgda_detection"): return "Could not find output directory" - # get list of files - files = os.listdir(f"/nfgda_output/{job_id}/nfgda_detection") - files = [f for f in files if f.endswith(".npz")] - if len(files) == 0: + # get list of detection files + det_files = os.listdir(f"/nfgda_output/{job_id}/nfgda_detection") + det_files = [f for f in det_files if f.endswith(".npz")] + if len(det_files) == 0: return "No files found in output directory" # get radar coordinates from redis @@ -94,22 +99,138 @@ def generate_geotiff_output(job_id: str, redis_client: redis.Redis): radar_lon, radar_lat = radar_coords - # process each file and collect per-frame observation timestamps - timestamps = {} - for i, file in enumerate(files): - logger.info(f'processing file {i+1} of {len(files)} into GeoTIFF format') + # --------------------------------------------------------------- + # Pass 1: process observed detection frames + # --------------------------------------------------------------- + # manifest entries: { frame_index: { "timestamp": "...", "is_forecast": bool } } + manifest: dict[int, dict] = {} + + # also track each detection file's timestamp so we can find the latest one + det_ts_to_npz: dict[str, str] = {} + for i, file in enumerate(det_files): + logger.info(f'processing detection file {i+1} of {len(det_files)} into GeoTIFF format') npz_path = os.path.join(f"/nfgda_output/{job_id}/nfgda_detection", file) ts = extract_timestamp(npz_path) if ts is not None: - timestamps[i] = ts - project_data(npz_path, radar_lat, radar_lon, out_dir, i) + manifest[i] = {"timestamp": ts, "is_forecast": False} + det_ts_to_npz[ts] = npz_path + project_data(npz_path, radar_lat, radar_lon, out_dir, i, is_forecast=False) + + # reorder detection frames so indexes are in chronological order + manifest = _reorder_frames_chronologically(manifest, out_dir) + num_detection_frames = len(manifest) + + # find the npz path for the latest detection frame (used as frozen background in forecasts) + last_det_npz: str | None = None + if manifest: + last_det_ts_str = manifest[max(manifest.keys())]["timestamp"] + last_det_npz = det_ts_to_npz.get(last_det_ts_str) + if last_det_npz: + logger.info(f"Last detection frame npz: {os.path.basename(last_det_npz)} ({last_det_ts_str})") + + # --------------------------------------------------------------- + # Pass 2: process forecast frames from forecast-summary/ + # Only include frames whose valid time falls within [last_detection_ts, + # last_detection_ts + 1 hour). Frames with earlier timestamps are + # redundant (covered by actual detections), and frames beyond one hour + # are too speculative to display. + # --------------------------------------------------------------- + forecast_dir = f"/nfgda_output/{job_id}/forecast-summary" + forecast_files = [] + if os.path.exists(forecast_dir): + # derive the last detection timestamp from the reordered manifest + last_det_ts: np.datetime64 | None = None + if manifest: + last_det_entry = manifest[max(manifest.keys())] + try: + last_det_ts = np.datetime64(last_det_entry["timestamp"].rstrip("Z"), "s") + except Exception: + pass + + if last_det_ts is not None: + window_end = last_det_ts + np.timedelta64(3600, "s") # +1 hour + candidate_files = sorted([ + f for f in os.listdir(forecast_dir) if f.endswith(".npz") + ]) + for fname in candidate_files: + npz_path = os.path.join(forecast_dir, fname) + ts_str = extract_timestamp(npz_path) + if ts_str is None: + continue + try: + fc_ts = np.datetime64(ts_str.rstrip("Z"), "s") + except Exception: + continue + if last_det_ts <= fc_ts <= window_end: + forecast_files.append(fname) + logger.info( + f"Forecast window: [{last_det_ts}Z, {window_end}Z] — " + f"{len(forecast_files)} of {len(candidate_files)} file(s) selected" + ) + else: + logger.warning("Could not determine last detection timestamp; skipping forecast pass") + + if forecast_files: + logger.info(f"Processing {len(forecast_files)} forecast frame(s)") + for j, file in enumerate(forecast_files): + frame_idx = num_detection_frames + j + logger.info(f'processing forecast file {j+1} of {len(forecast_files)} into GeoTIFF format') + npz_path = os.path.join(forecast_dir, file) + ts = extract_timestamp(npz_path) + if ts is not None: + manifest[frame_idx] = {"timestamp": ts, "is_forecast": True} + project_forecast(npz_path, radar_lat, radar_lon, out_dir, frame_idx, + background_det_npz=last_det_npz) + else: + logger.info("No forecast-summary frames found in window; skipping forecast pass") - # write manifest so the API can serve observation timestamps per frame - manifest_path = os.path.join(out_dir, "timestamps.json") + # write manifest so the API can serve per-frame metadata + manifest_path = os.path.join(out_dir, "manifest.json") with open(manifest_path, "w") as f: - json.dump(timestamps, f) - logger.info(f"Wrote timestamp manifest with {len(timestamps)} entries to {manifest_path}") - + json.dump(manifest, f) + logger.info(f"Wrote manifest with {len(manifest)} entries to {manifest_path}") + + +def _reorder_frames_chronologically(manifest: dict[int, dict], out_dir: str) -> dict[int, dict]: + """Sort frame GeoTIFFs so that frame index 0 is the earliest observation. + + manifest maps current frame index → { timestamp, is_forecast }. + Renames frame_.tif files so that indexes are in chronological order, + and returns a new manifest keyed by the corrected indexes. + """ + if not manifest: + return manifest + + # build a list of (current_index, entry) sorted by timestamp + sorted_pairs = sorted(manifest.items(), key=lambda kv: kv[1]["timestamp"]) + + # if already in order, skip the renaming process + already_ordered = all(sorted_pairs[j][0] == j for j in range(len(sorted_pairs))) + if already_ordered: + logger.info("Frames are already in chronological order; skipping rename.") + return manifest + + logger.info("Reordering frames to chronological order: %s", + [(old_idx, entry["timestamp"]) for old_idx, entry in sorted_pairs]) + + # rename every frame_.tif to frame_.tif.tmp first to avoid collisions + for old_idx, _ in sorted_pairs: + src = os.path.join(out_dir, f"frame_{old_idx}.tif") + tmp = os.path.join(out_dir, f"frame_{old_idx}.tif.tmp") + if os.path.exists(src): + os.rename(src, tmp) + + # rename frame_.tif.tmp to frame_.tif + new_manifest: dict[int, dict] = {} + for new_idx, (old_idx, entry) in enumerate(sorted_pairs): + tmp = os.path.join(out_dir, f"frame_{old_idx}.tif.tmp") + dst = os.path.join(out_dir, f"frame_{new_idx}.tif") + if os.path.exists(tmp): + os.rename(tmp, dst) + new_manifest[new_idx] = entry + + return new_manifest + def get_radar_coords(station_id: str, redis_client: redis.Redis) -> tuple[float, float]: station_json = redis_client.hget("stations", station_id) @@ -123,12 +244,13 @@ def get_radar_coords(station_id: str, redis_client: redis.Redis) -> tuple[float, return None -def _reflectivity_to_rgba(refl: np.ndarray, nfout: np.ndarray) -> np.ndarray: +def _reflectivity_to_rgba(refl: np.ndarray, nfout: np.ndarray, is_forecast: bool = False) -> np.ndarray: """Convert a 2-D reflectivity array + boolean gust-front mask to RGBA uint8. * Valid reflectivity pixels are colored with the NEXRAD Zhh colormap. * NaN pixels become fully transparent (alpha = 0). - * Gust-front detections (`nfout == True`) are drawn as dark pixels. + * Detected gust-fronts (is_forecast=False) are drawn red. + * Forecast gust-fronts (is_forecast=True) are drawn bright purple. """ ny, nx = refl.shape rgba = np.zeros((ny, nx, 4), dtype=np.uint8) # default: fully transparent @@ -142,26 +264,28 @@ def _reflectivity_to_rgba(refl: np.ndarray, nfout: np.ndarray) -> np.ndarray: rgba[valid] = mapped - # Overlay gust-front detections (dilated slightly for visibility) + # Overlay gust-front skeleton pixels (disk(1) dilation for visibility) if nfout is not None and np.any(nfout): - gf_mask = binary_dilation(nfout, structure=disk(2)) - gf_draw = gf_mask & valid - rgba[gf_draw] = _GF_RGBA + gf_color = _GF_RGBA_FORECAST if is_forecast else _GF_RGBA_DETECTED + gf_dilated = binary_dilation(nfout.astype(bool), structure=disk(1)) + gf_draw = gf_dilated & valid + rgba[gf_draw] = gf_color return rgba def extract_timestamp(npz_path: str) -> str | None: - """Return the radar observation time from a detection .npz as an ISO 8601 UTC string, - or None if the key is absent or unparseable.""" + """Return the observation/valid time from a .npz as an ISO 8601 UTC string, + or None if the key is absent or unparseable. + + Works for both detection files and forecast-summary files — both store + timestamps as numpy.datetime64 scalars. + """ try: data = np.load(npz_path, allow_pickle=True) if "timestamp" not in data: return None ts = data["timestamp"] - # numpy.datetime64 (0-d array) -> Python datetime -> ISO string - # .item() is required to get the Python scalar; .astype(object) keeps it - # as a 0-d ndarray which has no .strftime() ts_dt = ts.astype("datetime64[s]").item() return ts_dt.strftime("%Y-%m-%dT%H:%M:%SZ") except Exception as e: @@ -169,25 +293,62 @@ def extract_timestamp(npz_path: str) -> str | None: return None -def project_data(npz_path: str, radar_lat: float, radar_lon: float, out_dir: str, index: int) -> None: - - # --------------------------- - # Parameters - # --------------------------- - ae_tif = os.path.join(out_dir, f"radar_reflectivity_ae_{index}.tif") +def _write_geotiff(rgba: np.ndarray, radar_lat: float, radar_lon: float, + out_dir: str, index: int) -> None: + """Shared GeoTIFF writing logic: AE projection → reproject to EPSG:3857 → COG.""" + ae_tif = os.path.join(out_dir, f"radar_reflectivity_ae_{index}.tif") final_tif = os.path.join(out_dir, f"frame_{index}.tif") - pixel_size_m = 500.0 # 500 m spacing + pixel_size_m = 500.0 + ny, nx = rgba.shape[:2] + + # spatial references – azimuthal equidistant centered on the radar + ae_srs = osr.SpatialReference() + ae_srs.SetAE(radar_lat, radar_lon, 0.0, 0.0) + ae_srs.SetWellKnownGeogCS("WGS84") + + # geotransform (centered on radar) + origin_x = -(nx / 2) * pixel_size_m + origin_y = (ny / 2) * pixel_size_m + geotransform = (origin_x, pixel_size_m, 0.0, origin_y, 0.0, -pixel_size_m) + + # write RGBA Azimuthal Equidistant GeoTIFF + driver = gdal.GetDriverByName("GTiff") + ae_ds = driver.Create(ae_tif, nx, ny, 4, gdal.GDT_Byte, + options=["COMPRESS=LZW", "TILED=YES"]) + ae_ds.SetGeoTransform(geotransform) + ae_ds.SetProjection(ae_srs.ExportToWkt()) + for band_idx in range(4): + ae_ds.GetRasterBand(band_idx + 1).WriteArray(rgba[:, :, band_idx]) + ae_ds.GetRasterBand(4).SetColorInterpretation(gdal.GCI_AlphaBand) + ae_ds = None + + # reproject to EPSG:3857 for Leaflet + warped_ds = gdal.Warp("", ae_tif, dstSRS="EPSG:3857", + resampleAlg=gdal.GRA_NearestNeighbour, + format="MEM", dstAlpha=False) + + # write final Cloud-Optimized GeoTIFF + driver = gdal.GetDriverByName("COG") + driver.CreateCopy(final_tif, warped_ds, + options=["COMPRESS=DEFLATE", "OVERVIEWS=IGNORE_EXISTING"]) + + warped_ds = None + logger.info(f"Wrote {final_tif}") + os.remove(ae_tif) + + +def project_data(npz_path: str, radar_lat: float, radar_lon: float, + out_dir: str, index: int, is_forecast: bool = False) -> None: + """Project a detection .npz (inputNF + nfout) into a frame GeoTIFF.""" channel_index = 1 # channel 1 = reflectivity (0-based) - # --------------------------- - # Load data - # --------------------------- + # load data data = np.load(npz_path) array = data['inputNF'] nfout = data['nfout'] if 'nfout' in data else None - # Flip vertically + # flip vertically array = np.flipud(array) if nfout is not None: nfout = np.flipud(nfout) @@ -195,9 +356,7 @@ def project_data(npz_path: str, radar_lat: float, radar_lon: float, out_dir: str refl = array[:, :, channel_index].astype(np.float64) ny, nx = refl.shape - # --------------------------- - # Log data range for debugging - # --------------------------- + # log data range for debugging valid_mask = ~np.isnan(refl) nan_count = np.count_nonzero(~valid_mask) logger.info( @@ -210,89 +369,64 @@ def project_data(npz_path: str, radar_lat: float, radar_lon: float, out_dir: str if nfout is not None: logger.info(f"Gust-front pixels: {np.count_nonzero(nfout)}") - # --------------------------- - # Render to RGBA - # --------------------------- - rgba = _reflectivity_to_rgba(refl, nfout) - - # --------------------------- - # Spatial references - # Azimuthal Equidistant centered on the radar - # --------------------------- - ae_srs = osr.SpatialReference() - ae_srs.SetAE( - radar_lat, - radar_lon, - 0.0, - 0.0 - ) - ae_srs.SetWellKnownGeogCS("WGS84") - - # --------------------------- - # GeoTransform (centered on radar) - # --------------------------- - origin_x = -(nx / 2) * pixel_size_m - origin_y = (ny / 2) * pixel_size_m + # render to RGBA then write GeoTIFF + rgba = _reflectivity_to_rgba(refl, nfout, is_forecast=is_forecast) + _write_geotiff(rgba, radar_lat, radar_lon, out_dir, index) - geotransform = ( - origin_x, - pixel_size_m, - 0.0, - origin_y, - 0.0, - -pixel_size_m - ) - # --------------------------- - # Write RGBA Azimuthal Equidistant GeoTIFF - # --------------------------- - driver = gdal.GetDriverByName("GTiff") - ae_ds = driver.Create( - ae_tif, - nx, - ny, - 4, # R, G, B, A - gdal.GDT_Byte, - options=["COMPRESS=LZW", "TILED=YES"] - ) - - ae_ds.SetGeoTransform(geotransform) - ae_ds.SetProjection(ae_srs.ExportToWkt()) - - for band_idx in range(4): - band = ae_ds.GetRasterBand(band_idx + 1) - band.WriteArray(rgba[:, :, band_idx]) - - # Set alpha band interpretation - ae_ds.GetRasterBand(4).SetColorInterpretation(gdal.GCI_AlphaBand) - - ae_ds = None - - # --------------------------- - # Reproject to EPSG:3857 for Leaflet - # --------------------------- - warped_ds = gdal.Warp( - "", - ae_tif, - dstSRS="EPSG:3857", - resampleAlg=gdal.GRA_NearestNeighbour, - format="MEM", - dstAlpha=False, # keep our existing alpha band - ) - - # --------------------------- - # Write final Cloud-Optimized GeoTIFF - # --------------------------- - driver = gdal.GetDriverByName("COG") - driver.CreateCopy( - final_tif, - warped_ds, - options=["COMPRESS=DEFLATE", "OVERVIEWS=IGNORE_EXISTING"] - ) - - warped_ds = None - logger.info(f"Wrote {final_tif}") - - # prune ae tile - os.remove(ae_tif) +def project_forecast(npz_path: str, radar_lat: float, radar_lon: float, + out_dir: str, index: int, + background_det_npz: str | None = None) -> None: + """Project a forecast-summary .npz (nfproxy probability grid) into a frame GeoTIFF. + The background is the frozen radar reflectivity from the most recent + detection frame (background_det_npz). Pixels in the nfproxy map at or + above the 30% threshold are treated as gust-front likelihood and drawn in + bright purple on top of the reflectivity background, with no dilation. + """ + data = np.load(npz_path, allow_pickle=True) + nfproxy = data['nfproxy'].astype(np.float64) + nfproxy = np.flipud(nfproxy) + ny, nx = nfproxy.shape + + # start from the frozen reflectivity background of the last detection frame + if background_det_npz is not None: + try: + bg_data = np.load(background_det_npz) + bg_array = np.flipud(bg_data['inputNF']) + bg_refl = bg_array[:, :, 1].astype(np.float64) # channel 1 = reflectivity + rgba = _reflectivity_to_rgba(bg_refl, nfout=None) + logger.info(f"Forecast frame {index}: using frozen reflectivity background") + except Exception as e: + logger.warning(f"Could not load background reflectivity from {background_det_npz}: {e}") + rgba = np.zeros((ny, nx, 4), dtype=np.uint8) + else: + logger.warning(f"Forecast frame {index}: no background detection npz provided, using transparent background") + rgba = np.zeros((ny, nx, 4), dtype=np.uint8) + + # Draw three contour levels, lowest first so higher-confidence rings overwrite. + # Each level is skeletonized independently to produce a thin medial-axis line: + # 30% (orange) — gust front may pass here + # 50% (reddish-fuchsia) — moderate confidence + # 75% (purple) — high confidence core + forecast_levels = [ + (30.0, _GF_RGBA_FC_30), + (50.0, _GF_RGBA_FC_50), + (75.0, _GF_RGBA_FC_75), + ] + any_drawn = False + for threshold, color in forecast_levels: + mask = nfproxy >= threshold + if np.any(mask): + skel = skeletonize(mask) + if np.any(skel): + # disk(1) dilation for visibility — applied after skeletonize so we + # don't dilate the original blob, only the thin skeleton spine + dilated = binary_dilation(skel, structure=disk(1)) + rgba[dilated] = color + logger.info(f"Forecast contour level >={threshold:.0f}%: {np.count_nonzero(skel)} skeleton pixels") + any_drawn = True + if not any_drawn: + logger.info("Forecast frame has no gust-front probability >= 30%") + + _write_geotiff(rgba, radar_lat, radar_lon, out_dir, index) diff --git a/request_test_script.py b/request_test_script.py index d41db56..108edf1 100644 --- a/request_test_script.py +++ b/request_test_script.py @@ -113,7 +113,8 @@ def main(): resp = requests.get(url) if resp.status_code == 200: ts = resp.headers.get("X-Frame-Timestamp", "no timestamp") - print(f" ✓ frame {index}: {resp.status_code} ({len(resp.content)} bytes) timestamp={ts}") + is_forecast = resp.headers.get("X-Frame-Is-Forecast", "unknown") + print(f" ✓ frame {index}: {resp.status_code} ({len(resp.content)} bytes) timestamp={ts} forecast={is_forecast}") else: print(f" ✗ frame {index}: {resp.status_code} — {resp.text[:200]}")