1- from fastapi import APIRouter , HTTPException , status , Query
1+ from fastapi import APIRouter , HTTPException , status , Query , Response , Request
22from typing import Dict , Optional , Any
3+ import hashlib
34from ..models .schemas import (
45 JobListResponse , JobResponse , JobStatus , ProblemType , JobUpdateRequest ,
56 DeployRequest , DeployResponse , PreprocessingInfo , JobSummary
1314
1415
1516@router .get ("/{job_id}" , response_model = JobResponse )
16- async def get_job_status (job_id : str ) -> JobResponse :
17+ async def get_job_status (job_id : str , response : Response , request : Request ) -> JobResponse :
1718 """
18- Get the status and results of a training job
19+ Get the status and results of a training job.
20+ Implements ETag-based caching with must-revalidate for accurate state after deploy/undeploy.
1921 """
2022 try :
21- job = dynamodb_service .get_job (job_id )
23+ # Use consistent read to ensure we generate ETag from the absolute latest state
24+ job = dynamodb_service .get_job (job_id , consistent_read = True )
2225 if not job :
2326 raise HTTPException (
2427 status_code = status .HTTP_404_NOT_FOUND ,
@@ -51,7 +54,7 @@ async def get_job_status(job_id: str) -> JobResponse:
5154 target_mapping = job ['preprocessing_info' ].get ('target_mapping' )
5255 )
5356
54- response = JobResponse (
57+ job_response = JobResponse (
5558 job_id = job ['job_id' ],
5659 dataset_id = job .get ('dataset_id' , '' ),
5760 status = JobStatus (job ['status' ]),
@@ -77,7 +80,7 @@ async def get_job_status(job_id: str) -> JobResponse:
7780 # Extract bucket and key from s3:// path
7881 model_path = job ['model_path' ].replace ('s3://' , '' )
7982 bucket , key = model_path .split ('/' , 1 )
80- response .model_download_url = s3_service .generate_presigned_download_url (
83+ job_response .model_download_url = s3_service .generate_presigned_download_url_cached (
8184 bucket = bucket ,
8285 key = key
8386 )
@@ -86,7 +89,7 @@ async def get_job_status(job_id: str) -> JobResponse:
8689 if job .get ('onnx_model_path' ):
8790 onnx_path = job ['onnx_model_path' ].replace ('s3://' , '' )
8891 bucket , key = onnx_path .split ('/' , 1 )
89- response .onnx_model_download_url = s3_service .generate_presigned_download_url (
92+ job_response .onnx_model_download_url = s3_service .generate_presigned_download_url_cached (
9093 bucket = bucket ,
9194 key = key
9295 )
@@ -96,20 +99,45 @@ async def get_job_status(job_id: str) -> JobResponse:
9699 if eda_path :
97100 report_path = eda_path .replace ('s3://' , '' )
98101 bucket , key = report_path .split ('/' , 1 )
99- url = s3_service .generate_presigned_download_url (bucket = bucket , key = key )
100- response .report_download_url = url # Backward compatibility
101- response .eda_report_download_url = url
102+ url = s3_service .generate_presigned_download_url_cached (bucket = bucket , key = key )
103+ job_response .report_download_url = url # Backward compatibility
104+ job_response .eda_report_download_url = url
102105
103106 # Training Report
104107 if job .get ('training_report_path' ):
105108 training_path = job ['training_report_path' ].replace ('s3://' , '' )
106109 bucket , key = training_path .split ('/' , 1 )
107- response .training_report_download_url = s3_service .generate_presigned_download_url (
110+ job_response .training_report_download_url = s3_service .generate_presigned_download_url_cached (
108111 bucket = bucket ,
109112 key = key
110113 )
111114
112- return response
115+ # ============================================================================
116+ # HTTP Cache Strategy with ETag for accurate state after deploy/undeploy
117+ # ============================================================================
118+
119+ # 1. Generate ETag based on mutable fields (updated_at, deployed, deployed_at)
120+ # This changes whenever the job state changes (including deploy/undeploy)
121+ etag_source = f"{ job .get ('updated_at' , '' )} -{ job .get ('deployed' , False )} -{ job .get ('deployed_at' , '' )} "
122+ etag = f'"{ hashlib .md5 (etag_source .encode ()).hexdigest ()} "'
123+ response .headers ["ETag" ] = etag
124+
125+ # 2. Check If-None-Match header for conditional requests (304 Not Modified)
126+ if_none_match = request .headers .get ("If-None-Match" )
127+ if if_none_match == etag :
128+ # Resource hasn't changed - return 304 (browser will use cached version)
129+ response .status_code = 304
130+ return job_response
131+
132+ # 3. Always force revalidation
133+ # We used to have adaptive TTLs, but deployment status changes need to be reflected immediately.
134+ # max-age=0 + must-revalidate ensures the browser ALWAYS validates the ETag with the server.
135+ # Server (consistent read) -> Calculates ETag -> 304 if same, 200 if changed.
136+ # This is the most robust way to handle state changes like 'Deployed' vs 'Undeployed'.
137+ response .headers ["Cache-Control" ] = "private, max-age=0, must-revalidate"
138+ response .headers ["Vary" ] = "Authorization" # Vary by auth header if auth is added later
139+
140+ return job_response
113141
114142 except HTTPException :
115143 raise
@@ -121,7 +149,7 @@ async def get_job_status(job_id: str) -> JobResponse:
121149
122150
123151@router .delete ("/{job_id}" )
124- async def delete_job (job_id : str , delete_data : bool = True ) -> Dict [str , Any ]:
152+ async def delete_job (job_id : str , response : Response , delete_data : bool = True ) -> Dict [str , Any ]:
125153 """
126154 Delete a training job and optionally all associated data (model, report, dataset)
127155 """
@@ -189,6 +217,12 @@ async def delete_job(job_id: str, delete_data: bool = True) -> Dict[str, Any]:
189217 # Delete job record from DynamoDB
190218 dynamodb_service .delete_job (job_id )
191219
220+ # Ensure client caches are invalidated immediately
221+ response .headers ["Cache-Control" ] = "no-store, no-cache, must-revalidate, max-age=0"
222+
223+ # Ensure client caches are invalidated immediately
224+ response .headers ["Cache-Control" ] = "no-store, no-cache, must-revalidate, max-age=0"
225+
192226 return {
193227 "message" : "Job deleted successfully" ,
194228 "job_id" : job_id ,
@@ -205,7 +239,7 @@ async def delete_job(job_id: str, delete_data: bool = True) -> Dict[str, Any]:
205239
206240
207241@router .patch ("/{job_id}" , response_model = JobResponse )
208- async def update_job_metadata (job_id : str , request : JobUpdateRequest ) -> JobResponse :
242+ async def update_job_metadata (job_id : str , update_request : JobUpdateRequest , response : Response , request : Request ) -> JobResponse :
209243 """
210244 Update job metadata (tags and notes) for experiment tracking.
211245 Tags can be used to categorize jobs (e.g., "experiment-1", "baseline", "production").
@@ -221,14 +255,14 @@ async def update_job_metadata(job_id: str, request: JobUpdateRequest) -> JobResp
221255 )
222256
223257 # Validate tags if provided
224- if request .tags is not None :
225- if len (request .tags ) > 10 :
258+ if update_request .tags is not None :
259+ if len (update_request .tags ) > 10 :
226260 raise HTTPException (
227261 status_code = status .HTTP_400_BAD_REQUEST ,
228262 detail = "Maximum 10 tags allowed per job"
229263 )
230264 # Validate individual tag length
231- for tag in request .tags :
265+ for tag in update_request .tags :
232266 if not tag .strip ():
233267 raise HTTPException (
234268 status_code = status .HTTP_400_BAD_REQUEST ,
@@ -241,7 +275,7 @@ async def update_job_metadata(job_id: str, request: JobUpdateRequest) -> JobResp
241275 )
242276
243277 # Validate notes length if provided (defense-in-depth, Pydantic also validates)
244- if request .notes is not None and len (request .notes ) > 1000 :
278+ if update_request .notes is not None and len (update_request .notes ) > 1000 :
245279 raise HTTPException (
246280 status_code = status .HTTP_400_BAD_REQUEST ,
247281 detail = "Notes must be 1000 characters or less"
@@ -250,12 +284,12 @@ async def update_job_metadata(job_id: str, request: JobUpdateRequest) -> JobResp
250284 # Update job metadata in DynamoDB
251285 dynamodb_service .update_job_metadata (
252286 job_id = job_id ,
253- tags = request .tags ,
254- notes = request .notes
287+ tags = update_request .tags ,
288+ notes = update_request .notes
255289 )
256290
257- # Return updated job
258- return await get_job_status (job_id )
291+ # Return updated job (pass response and request for HTTP headers + ETag)
292+ return await get_job_status (job_id , response , request )
259293
260294 except HTTPException :
261295 raise
@@ -298,6 +332,14 @@ async def deploy_model(job_id: str, request: DeployRequest) -> DeployResponse:
298332 # Update deployed status
299333 dynamodb_service .update_job_deployed (job_id , request .deploy )
300334
335+ # IMPORTANT: Invalidate HTTP cache for this job
336+ # Force clients to fetch fresh data with updated deployed/deployed_at fields
337+ # Note: This does NOT invalidate S3 presigned URL cache (those remain valid)
338+ from fastapi import Response
339+ response = Response ()
340+ response .headers ["Cache-Control" ] = "no-cache, no-store, must-revalidate"
341+ response .headers ["X-Cache-Invalidated" ] = "deploy-status-changed"
342+
301343 action = "deployed" if request .deploy else "undeployed"
302344 return DeployResponse (
303345 job_id = job_id ,
@@ -340,7 +382,8 @@ async def list_jobs(
340382 # Convert to JobSummary (lightweight) instead of full JobResponse
341383 jobs = []
342384 for job in raw_jobs :
343- metrics = job .get ('metrics' , {})
385+ # Safely handle None/null metrics (happens when jobs fail before completion)
386+ metrics = job .get ('metrics' ) or {}
344387
345388 # Extract primary metric (accuracy for classification, r2_score for regression)
346389 problem_type = job .get ('problem_type' )
@@ -350,7 +393,7 @@ async def list_jobs(
350393 elif problem_type == 'regression' and metrics .get ('r2_score' ):
351394 primary_metric = float (metrics ['r2_score' ])
352395
353- # Extract training time and best estimator
396+ # Extract training time and best estimator (safely handle None)
354397 training_time = float (metrics ['training_time' ]) if metrics .get ('training_time' ) else None
355398 best_estimator = str (metrics ['best_estimator' ]) if metrics .get ('best_estimator' ) else None
356399
0 commit comments