|
30 | 30 | from billiard.exceptions import SoftTimeLimitExceeded |
31 | 31 |
|
32 | 32 | from logs_loguru import configure_logging, colorize_run_args |
| 33 | +from docker_image_update_checker import DockerImageStatus, DockerImageUpdateChecker |
33 | 34 |
|
34 | 35 | logger = logging.getLogger(__name__) |
35 | 36 |
|
@@ -113,6 +114,11 @@ def to_bool(val): |
113 | 114 |
|
114 | 115 | WORKER_BUNDLE_URL_REWRITE = get("WORKER_BUNDLE_URL_REWRITE", "").strip() |
115 | 116 |
|
| 117 | + # Docker image config |
| 118 | + DOCKER_IMAGE_NAMESPACE = get("DOCKER_IMAGE_NAMESPACE", "codalab") |
| 119 | + DOCKER_IMAGE_REPOSITORY = get("DOCKER_IMAGE_REPOSITORY", "codabench-compute-worker") |
| 120 | + DOCKER_IMAGE_TAG = get("DOCKER_IMAGE_TAG", "latest") |
| 121 | + |
116 | 122 |
|
117 | 123 | # ----------------------------------------------- |
118 | 124 | # Program Kind |
@@ -302,12 +308,78 @@ def rewrite_bundle_url_if_needed(url): |
302 | 308 | return url |
303 | 309 |
|
304 | 310 |
|
| 311 | +def check_docker_image_update(): |
| 312 | + """ |
| 313 | + Compare local and remote compute worker Docker images and log the |
| 314 | + synchronization status along with relevant image metadata. |
| 315 | + """ |
| 316 | + checker = DockerImageUpdateChecker( |
| 317 | + namespace=Settings.DOCKER_IMAGE_NAMESPACE, |
| 318 | + repository=Settings.DOCKER_IMAGE_REPOSITORY, |
| 319 | + tag=Settings.DOCKER_IMAGE_TAG, |
| 320 | + docker_base_url=Settings.CONTAINER_SOCKET |
| 321 | + ) |
| 322 | + result = checker.compare_local_vs_remote_images() |
| 323 | + status = result["status"] |
| 324 | + |
| 325 | + log_level = logging.INFO |
| 326 | + |
| 327 | + log_lines = [ |
| 328 | + "", |
| 329 | + "=" * 60, |
| 330 | + "DOCKER IMAGE UPDATE CHECK", |
| 331 | + "=" * 60, |
| 332 | + f"Image: {result.get('image_name')}", |
| 333 | + ] |
| 334 | + |
| 335 | + remote = result.get("remote") |
| 336 | + local = result.get("local") |
| 337 | + |
| 338 | + if remote: |
| 339 | + log_lines.append(f"Remote: digest={remote.get('digest')}, date={remote.get('date')}") |
| 340 | + |
| 341 | + if local: |
| 342 | + log_lines.append(f"Local: id={local.get('id')}, date={local.get('date')}") |
| 343 | + |
| 344 | + log_lines.append("-" * 60) |
| 345 | + |
| 346 | + if status == DockerImageStatus.UP_TO_DATE: |
| 347 | + log_lines.append("Status: Local image is synchronized with remote") |
| 348 | + log_level = logging.INFO |
| 349 | + |
| 350 | + elif status == DockerImageStatus.BEHIND: |
| 351 | + log_lines.append("Status: Local image is behind remote version. For better submission processing and to avoid any submission errors, fetch the latest image!") |
| 352 | + log_level = logging.ERROR |
| 353 | + |
| 354 | + elif status == DockerImageStatus.LOCAL_MISSING: |
| 355 | + log_lines.append("Status: Local image is not present. Pull required") |
| 356 | + log_level = logging.ERROR |
| 357 | + |
| 358 | + elif status == DockerImageStatus.REMOTE_UNAVAILABLE: |
| 359 | + log_lines.append("Status: Could not fetch remote image metadata") |
| 360 | + log_level = logging.ERROR |
| 361 | + |
| 362 | + elif status == "error": |
| 363 | + log_lines.append(f"Status: Image check failed: {result.get('error')}") |
| 364 | + log_level = logging.ERROR |
| 365 | + else: |
| 366 | + log_lines.append(f"Unknown image status: {status}") |
| 367 | + log_level = logging.ERROR |
| 368 | + |
| 369 | + log_lines.append("=" * 60) |
| 370 | + |
| 371 | + logger.log(log_level, "\n".join(log_lines)) |
| 372 | + |
| 373 | + |
305 | 374 | # ----------------------------------------------------------------------------- |
306 | 375 | # The main compute worker entrypoint, this is how a job is ran at the highest |
307 | 376 | # level. |
308 | 377 | # ----------------------------------------------------------------------------- |
309 | 378 | @shared_task(name="compute_worker_run") |
310 | 379 | def run_wrapper(run_args): |
| 380 | + # Check for docker image update |
| 381 | + check_docker_image_update() |
| 382 | + |
311 | 383 | # We need to convert the UUID given by celery into a byte like object otherwise things will break |
312 | 384 | run_args.update(secret=str(run_args["secret"])) |
313 | 385 | logger.info(f"Received run arguments: \n {colorize_run_args(json.dumps(run_args))}") |
@@ -338,7 +410,7 @@ def run_wrapper(run_args): |
338 | 410 | msg = "Submission failed. See logs for more details." |
339 | 411 | run._update_status(SubmissionStatus.FAILED, extra_information=msg) |
340 | 412 | raise |
341 | | - except Exception as e: |
| 413 | + except Exception: |
342 | 414 | # Catch any exception to avoid getting stuck in Running status |
343 | 415 | run._update_status(SubmissionStatus.FAILED, extra_information=traceback.format_exc()) |
344 | 416 | raise |
@@ -1303,7 +1375,7 @@ def start(self): |
1303 | 1375 | } |
1304 | 1376 | # Cleanup containers |
1305 | 1377 | containers_to_kill = [ |
1306 | | - self.ingestion_container_name, |
| 1378 | + self.ingestion_container_name, |
1307 | 1379 | self.program_container_name |
1308 | 1380 | ] |
1309 | 1381 | logger.debug( |
|
0 commit comments