Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 37 additions & 4 deletions nfgda_service/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@
import asyncio
import redis
import logging
from datetime import datetime, timezone
from datetime import datetime, timezone, timedelta
from nfgda_service import NfgdaService
from process_output import generate_geotiff_output


logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
Expand Down Expand Up @@ -66,25 +67,57 @@ async def process_job(job_id: str) -> None:
def process_geotiff_output(job_id: str) -> None:
"""Process the output of the NFGDA algorithm for a given job
into a stack of GeoTIFFs for final display on the frontend."""

if redis_client.hget(f"job:{job_id}", "status") == "FAILED":
return

logger.info("generating GeoTIFF series for job %s", job_id)
result = generate_geotiff_output(job_id, redis_client)

if result is not None:
logger.error("failed to generate GeoTIFF series for job %s. Error message: %s", job_id, result)
redis_client.hset(f"job:{job_id}", mapping={"status": "FAILED", "error_message": result})
else:
logger.info("successfully generated GeoTIFF series for job %s", job_id)
redis_client.hset(f"job:{job_id}", mapping={"status": "COMPLETED", "num_frames": len(os.listdir(f"/processed_data/{job_id}")) - 1}) # subtract 1 for the timestamps.json file



def _wipe_job_output(job_id: str) -> None:
"""Helper function to wipe the output of a job for rerunning if frames are <= 2"""
for base in ("/nfgda_output", "/processed_data"):
path = os.path.join(base, job_id)
if os.path.isdir(path):
shutil.rmtree(path)

def _expand_start_utc(start_str: str, end_str: str) -> str:
"""Helper to compute the expanded time window for a job"""
fmt = "%Y-%m-%dT%H:%M:%SZ"
start = datetime.strptime(start_str, fmt).replace(tzinfo=timezone.utc)
end = datetime.strptime(end_str, fmt).replace(tzinfo=timezone.utc)
duration = end - start
max_duration = timedelta(minutes=int(os.getenv("MAX_JOB_DURATION", "180")))
new_start = max(start - duration, end - max_duration)
return new_start.strftime(fmt)

async def run_and_release_job(job_id: str) -> None:
"""Run a job and release the semaphore when finished."""
try:
await process_job(job_id)
process_geotiff_output(job_id)
# Silently rerun if there is less than 2 frames in a completed job
# I do not know how this will integrate with the new feature of forecasting
num_frames = int(redis_client.hget(f"job:{job_id}", "num_frames") or 0)
if num_frames <= 2:
logger.warning("retrying the job with a larger time window")
job_fields = redis_client.hgetall(f"job:{job_id}")
new_start = _expand_start_utc(job_fields["startUtc"], job_fields["endUtc"])
redis_client.hset(f"job:{job_id}", mapping={"startUtc": new_start, "status": "PENDING"})
_wipe_job_output(job_id)
await process_job(job_id)
process_geotiff_output(job_id)


finally:
# they took my jerb!
job_semaphore.release()
Expand Down Expand Up @@ -136,4 +169,4 @@ def main():
asyncio.run(listen_for_jobs())

if __name__ == "__main__":
main()
main()
Loading