diff --git a/compute_worker/compute_worker.py b/compute_worker/compute_worker.py index 573e72a94..bceb6eaca 100644 --- a/compute_worker/compute_worker.py +++ b/compute_worker/compute_worker.py @@ -20,6 +20,7 @@ from rich.pretty import pprint import requests import websockets +from websockets.exceptions import InvalidStatusCode import yaml from billiard.exceptions import SoftTimeLimitExceeded from celery import Celery, shared_task, utils @@ -84,10 +85,7 @@ # ----------------------------------------------- # Show Progress bar on downloading images # ----------------------------------------------- -tasks = {} - - -def show_progress(line, progress): +def show_progress(line, progress, tasks): try: status = line.get("status") or "" layer_id = line.get("id") @@ -585,6 +583,7 @@ def _update_status(self, status, extra_information=None): logger.exception(f"Failed to update submission status to {status}: {e}") def _get_container_image(self, image_name): + tasks = {} logger.info("Running pull for image: {}".format(image_name)) retries, max_retries = (0, 3) while retries < max_retries: @@ -594,7 +593,7 @@ def _get_container_image(self, image_name): for line in resp: if isinstance(line, dict) and line.get("error"): raise DockerImagePullException(line["error"]) - show_progress(line, progress) + show_progress(line, progress, tasks) break # Break if the loop is successful to exit "with Progress() as progress" except (docker.errors.APIError, Exception) as pull_error: @@ -753,6 +752,12 @@ async def _run_container_engine_cmd(self, container, kind): + "for container " + str(container.get("Id")) ) + except InvalidStatusCode as e: + logger.error( + f"There was an error trying to connect to the websocket on the codabench instance: {type(e)}" + ) + logger.exception(e) + websocket = None except Exception as e: logger.error( f"There was an error trying to connect to the websocket on the codabench instance: {e}" @@ -884,6 +889,9 @@ def _get_host_path(self, *paths): # Take our list of paths and smash 'em together path = os.path.join(*paths) + # Create if necessary + os.makedirs(path, exist_ok=True) + # pull front of path, which points to the location inside the container path = path[len(BASE_DIR) :] @@ -891,9 +899,6 @@ def _get_host_path(self, *paths): # can be seen properly path = os.path.join(HOST_DIRECTORY, path) - # Create if necessary - os.makedirs(path, exist_ok=True) - return path async def _run_program_directory(self, program_dir, kind): @@ -1251,6 +1256,13 @@ def start(self): ingestion_program_dir = os.path.join(self.root_dir, "ingestion_program") logger.info("Running scoring program, and then ingestion program") + try: + loop = asyncio.get_running_loop() + loop.close() + except RuntimeError as e: + logger.error("Error while closing running event loop: No event loop.") + # logger.exception(e) + loop = asyncio.new_event_loop() # Set the event loop for the gather asyncio.set_event_loop(loop)