Skip to content

Commit b835d7d

Browse files
daggum
1 parent fec833f commit b835d7d

5 files changed

Lines changed: 70 additions & 44 deletions

File tree

README.md

Lines changed: 23 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -30,16 +30,31 @@ NFGDA Service directory structure:
3030
- /nfgda_service contains the NFGDA service logic
3131
- responsible for all NFGDA execution, output processing, and file management
3232

33-
# Todo (before MSU handoff)
34-
35-
- Guard against short jobs that run forever for some reason
36-
- Figure out zoom level / blank frame issue on frontend
37-
- Switching to a new station view pauses slide deck playthrough
38-
- Convert geotiff output to cloud-optimized-geotiffs
39-
- Code cleanup / add comments where necessary
33+
And then there's a redis instance living at port 6379 where all the job status and asset information is stored.
34+
35+
# Todo before MSU handoff
36+
37+
- [ ] Figure out zoom level / blank frame issue on frontend
38+
- [ ] Switching to a new station view pauses slide deck playthrough
39+
- [ ] Can we pretty up the landing page? Put a title on it somewhere before the research celebration?
40+
- [ ] Set opacity slider on frontend
41+
- [ ] Enhance resolution of output on frontend
42+
- [ ] Add a "clear" button to the map that clears all job assets from the map
43+
- [ ] Deliver frame time-stamps to the frontend
44+
- [ ] Switch to cloud-optimized geotiffs
45+
- [ ] Make some stuff environment variables instead of random variables everywhere
46+
- [ ] Discuss pixel-width of gust fronts written to output file next team meeting
47+
- [ ] Diff the NFGDA code used in nfgda_service with the original NFGDA code, see if there are any useful features we're missing out on or bugs we introduced
48+
- [ ] Backend code cleanup / add comments where necessary
4049

4150
# "Nice to have" features
4251

52+
- There a should probably be a warning that shows up for small numbers of assets per job (2 frames produced or less). Maybe if not enough assets are produced, the job request could automatically re-run with a larger time window?
4353
- Average time to job completion estimator (small addition: new counter in redis, average out)
44-
- Serve tiles instead of individual GeoTIFFs (big refactor)
54+
- Serve tiles instead of individual GeoTIFFs (big refactor, honestly might not be worth at as Cloud-optimized-geotiffs are kinda the future anyway)
4555
- Hash job IDs to make them unguessable, so resources can't be directly accessed via URL (little development effort, likely med/large refactor effort)
56+
57+
# Todo after MSU handoff (futures devs read this pls)
58+
59+
- Check that automatic asset deletion occurs within the timeframe specified (should be 24 hours)
60+
- Familiarize with the .env file and environment variables, and what they do

frontend/vite.config.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ export default defineConfig({
77
plugins: [react(), tailwindcss()],
88
server: {
99
proxy: {
10-
"/APIs": "http://backend:8001", //backend to work with docker, localhost w/o
10+
"/apis": "http://backend:8001", //backend to work with docker, localhost w/o
1111
},
1212
},
1313
});

nfgda_service/nfgda_runner.py

Lines changed: 35 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,12 @@
66

77
logger = logging.getLogger(__name__)
88

9-
MAX_NO_DATA_POLLS = int(os.getenv("MAX_NO_DATA_POLLS", "10"))
10-
119
class NfgdaRunner:
12-
"""Executes the NFGDA algorithm for a given run request."""
10+
""" executes the NFGDA algorithm for a given run request. """
1311

1412
def __init__(self, station_id: str, start_utc: str, end_utc: str, job_id: str, out_dir: str) -> None:
1513
"""
16-
Initialize the NfgdaRunner with the given parameters.
14+
initialize the NfgdaRunner with the given parameters.
1715
1816
Args:
1917
station_id (str): The station code.
@@ -42,20 +40,25 @@ async def run(self):
4240
Returns:
4341
bool: True if the NFGDA process completed successfully, False otherwise.
4442
"""
45-
43+
44+
# get the number of consecutive no data polls to allow before killing the process
45+
no_data_polls = int(os.getenv("MAX_NO_DATA_POLLS", "10"))
46+
4647
logger.info(f"timebox parameters set to start_utc: {self.start_utc}, end_utc: {self.end_utc}")
4748

49+
# create a temporary config file for the algorithm
4850
config_path = self.create_temp_config(self.out_dir)
4951
logger.info("setting environment variable NFGDA_CONFIG_PATH to %s", config_path)
5052

53+
# if the config file couldn't be created for some reason return False
5154
if config_path is None:
5255
return False, "Failed to create config file"
5356

5457
logger.info("running algorithm for job %s", self.job_id)
5558
state = {"no_data_count": 0, "fatal_error_count": 0}
5659

5760
try:
58-
# Build an env dict with the per-job config path
61+
# build an env dict with the per-job config path
5962
env = os.environ.copy()
6063
env["NFGDA_CONFIG_PATH"] = config_path
6164

@@ -67,41 +70,41 @@ async def run(self):
6770
stderr=asyncio.subprocess.PIPE,
6871
)
6972

70-
# Stream stdout and stderr from algorithm subprocesses line-by-line in real time
73+
# stream stdout and stderr from algorithm subprocesses line-by-line in real time
7174
stream_tasks = [
7275
asyncio.create_task(self.stream_pipe(proc.stdout, "stdout")),
7376
asyncio.create_task(self.monitored_stream(proc.stderr, "stderr", proc, state)),
7477
]
7578

76-
# Wait for the algorithm to complete, with a timeout
79+
# wait for the algorithm to complete, with a timeout
7780
try:
7881
await asyncio.wait_for(proc.wait(), timeout=self.algo_timeout_seconds)
7982
except asyncio.TimeoutError:
80-
logger.error("NFGDA algorithm timed out — killing process")
83+
logger.error("NFGDA algorithm timed out. Killing process")
8184
proc.kill()
8285
await proc.wait()
8386
return False, "NFGDA algorithm timed out"
8487
finally:
8588
# flush remaining buffered output
8689
await asyncio.gather(*stream_tasks)
8790

88-
# Check if the algorithm was killed due to a data gap
89-
if state["no_data_count"] >= MAX_NO_DATA_POLLS:
91+
# check if the algorithm was killed due to a lack of data in the nexrad S3 bucket
92+
if state["no_data_count"] >= no_data_polls:
9093
logger.error(
91-
"NFGDA process killed due to data gap — no scans found after %d consecutive polls",
92-
MAX_NO_DATA_POLLS,
94+
"NFGDA process killed due to data gap. No scans found after %d consecutive polls",
95+
no_data_polls,
9396
)
94-
return False, f"No radar data found after {MAX_NO_DATA_POLLS} polls"
97+
return False, f"No radar data found after {no_data_polls} polls"
9598

96-
# Check if the algorithm exited with a non-zero return code
99+
# check if the algorithm exited with a non-zero return code
97100
if proc.returncode != 0:
98101
logger.error(
99102
"NFGDA algorithm exited with code %d",
100103
proc.returncode,
101104
)
102105
return False, f"an error occurred processing the algorithm. Error code: {proc.returncode}"
103106

104-
# Check if fatal errors were logged during processing
107+
# check if fatal errors were logged during processing
105108
if state["fatal_error_count"] > 0:
106109
logger.error(
107110
"NFGDA algorithm reported %d fatal error(s) during processing",
@@ -118,15 +121,19 @@ async def run(self):
118121

119122
@staticmethod
120123
def iso_to_csv_time(iso_str: str) -> str:
121-
"""Convert an ISO 8601 timestamp (e.g. '2024-07-07T01:22:24Z') to the
124+
""" convert an ISO 8601 timestamp (e.g. '2024-07-07T01:22:24Z') to the
122125
comma-separated format that NFGDA config asks for (e.g. 'year,month,day,hour,minute,second').
126+
127+
Why tf did they format their timestamps like this???
123128
"""
124-
dt = datetime.strptime(iso_str, "%Y-%m-%dT%H:%M:%SZ").replace(tzinfo=timezone.utc)
125-
return f"{dt.year},{dt.month},{dt.day},{dt.hour},{dt.minute},{dt.second}"
129+
# convert the iso string to a datetime object
130+
date_time = datetime.strptime(iso_str, "%Y-%m-%dT%H:%M:%SZ").replace(tzinfo=timezone.utc)
131+
return f"{date_time.year},{date_time.month},{date_time.day},{date_time.hour},{date_time.minute},{date_time.second}"
126132

127133
def create_temp_config(self, out_dir: str) -> str:
128-
"""Create a temporary NFGDA config file. Returns the path to the file."""
134+
""" create a temporary NFGDA config file. Returns the path to the file."""
129135

136+
# brief check against the config file shipped w/ the original algo for SnG
130137
if not os.path.exists("/app/scripts/NFGDA.ini"):
131138
logger.warning("NFGDA.ini default config not found (proceeding anyway)")
132139

@@ -135,6 +142,7 @@ def create_temp_config(self, out_dir: str) -> str:
135142
logger.info("config times: start=%s -> %s, end=%s -> %s",
136143
self.start_utc, csv_start, self.end_utc, csv_end)
137144

145+
# write the config file out
138146
with tempfile.NamedTemporaryFile("w", delete=False, suffix=".ini", prefix="nfgda_") as f:
139147
f.write(f"""[Settings]
140148
radar_id = {self.station_id}
@@ -165,13 +173,15 @@ async def stream_pipe(stream, label: str):
165173

166174
@staticmethod
167175
async def monitored_stream(stream, label: str, proc, state: dict):
168-
"""Read lines and, for stderr, count consecutive no-data polls.
176+
""" read logs, monitor for no data and fatal errors. Kill wonky processes
169177
170178
Args:
171179
stream: asyncio subprocess stream (stdout or stderr).
172-
MAX_NO_DATA_POLLS: Kill the process after this many consecutive
180+
no_data_polls: Kill the process after this many consecutive
173181
"no new scans found" messages.
174182
"""
183+
184+
# read the log stream lines and check for patterns that indicate error
175185
while True:
176186
line = await stream.readline()
177187
if not line:
@@ -180,11 +190,11 @@ async def monitored_stream(stream, label: str, proc, state: dict):
180190
logger.info("[NFGDA_Host %s] %s", label, text)
181191

182192
if label == "stderr":
183-
if "no new scans found" in text:
193+
if "no new scans found" in text:
184194
state["no_data_count"] += 1
185-
if state["no_data_count"] >= MAX_NO_DATA_POLLS:
195+
if state["no_data_count"] >= no_data_polls:
186196
logger.error(
187-
"no data found after %d consecutive polls — killing process",
197+
"no data found after %d consecutive polls. Killing process",
188198
state["no_data_count"],
189199
)
190200
proc.kill()

nfgda_service/nfgda_service.py

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,7 @@
66

77

88
class NfgdaService:
9-
"""High-level service that orchestrates a single NFGDA run, including
10-
job lifecycle updates in Redis."""
9+
""" high-level service that orchestrates a single NFGDA run, including job lifecycle updates in Redis. """
1110

1211
def __init__(self, redis_client, job_id: str, job_fields: dict, out_dir: str) -> None:
1312
self.redis_client = redis_client
@@ -17,7 +16,7 @@ def __init__(self, redis_client, job_id: str, job_fields: dict, out_dir: str) ->
1716
self.out_dir = out_dir
1817

1918
async def run(self) -> None:
20-
"""Execute the NFGDA algorithm and update job status in Redis."""
19+
""" execute the NFGDA algorithm and update job status in Redis. """
2120
try:
2221
self.redis_client.hset(self.job_key, mapping={"status": "PROCESSING"})
2322

@@ -34,13 +33,15 @@ async def run(self) -> None:
3433
)
3534
success, message = await runner.run()
3635

36+
# update job status in redis
3737
if success:
38-
logger.info("Algorithm processing for job %s completed successfully", self.job_id)
38+
logger.info("algorithm processing for job %s completed successfully", self.job_id)
3939
else:
40+
# no, this is patrick
4041
self.redis_client.hset(self.job_key, mapping={"status": "FAILED", "error_message": message})
41-
logger.warning("Job %s failed (runner returned falsy)", self.job_id)
42-
logger.warning("Error message: %s", message)
42+
logger.warning("job %s failed (runner returned falsy)", self.job_id)
43+
logger.warning("error message: %s", message)
4344

4445
except Exception as e:
4546
self.redis_client.hset(self.job_key, mapping={"status": "FAILED"})
46-
logger.exception("Job %s failed with exception: %s", self.job_id, e)
47+
logger.exception("job %s failed with exception: %s", self.job_id, e)

nfgda_service/process_output.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -252,13 +252,13 @@ def project_data(npz_path: str, radar_lat: float, radar_lon: float, out_dir: str
252252
)
253253

254254
# ---------------------------
255-
# Write final GeoTIFF
255+
# Write final Cloud-Optimized GeoTIFF
256256
# ---------------------------
257-
driver = gdal.GetDriverByName("GTiff")
257+
driver = gdal.GetDriverByName("COG")
258258
driver.CreateCopy(
259259
final_tif,
260260
warped_ds,
261-
options=["COMPRESS=DEFLATE", "TILED=YES"]
261+
options=["COMPRESS=DEFLATE", "OVERVIEWS=IGNORE_EXISTING"]
262262
)
263263

264264
warped_ds = None

0 commit comments

Comments
 (0)