diff --git a/compute_worker/compute_worker.py b/compute_worker/compute_worker.py index 147ed2d2f..28a0f4fc2 100644 --- a/compute_worker/compute_worker.py +++ b/compute_worker/compute_worker.py @@ -117,11 +117,9 @@ def to_bool(val): # ----------------------------------------------- # Program Kind # ----------------------------------------------- -# NOTE: This is not used, to be used in next PR class ProgramKind: INGESTION_PROGRAM = "ingestion_program" SCORING_PROGRAM = "scoring_program" - SUBMISSION = "submission" # ----------------------------------------------- @@ -355,7 +353,7 @@ def replace_legacy_metadata_command( command, kind, is_scoring, ingestion_only_during_scoring=False ): vars_to_replace = [ - ("$input", "/app/input_data" if kind == "ingestion" else "/app/input"), + ("$input", "/app/input_data" if kind == ProgramKind.INGESTION_PROGRAM else "/app/input"), ("$output", "/app/output"), ( "$program", @@ -476,24 +474,24 @@ def __init__(self, run_args): self.stdout, self.stderr, self.ingestion_stdout, self.ingestion_stderr = ( self._get_stdout_stderr_file_names(run_args) ) - self.ingestion_container_name = f"ingestion_{self.run_related_name}" - self.program_container_name = f"scoring_{self.run_related_name}" - self.program_data = run_args.get("program_data") - self.ingestion_program_data = run_args.get("ingestion_program") + # Setting up container names for ingestion, scoring and submission + self.ingestion_program_container_name = f"ingestion_{self.run_related_name}" + self.scoring_program_container_name = f"scoring_{self.run_related_name}" + + # Setting up ingestion, scoring and submission data + self.ingestion_program_data = run_args.get("ingestion_program_data") + self.scoring_program_data = run_args.get("scoring_program_data") + self.submission_data = run_args.get("submission_data") + self.input_data = run_args.get("input_data") self.reference_data = run_args.get("reference_data") - self.ingestion_only_during_scoring = run_args.get( - "ingestion_only_during_scoring" - ) + self.ingestion_only_during_scoring = run_args.get("ingestion_only_during_scoring") self.detailed_results_url = run_args.get("detailed_results_url") - # During prediction program will be the submission program, during scoring it will be the - # scoring program - self.program_exit_code = None self.ingestion_program_exit_code = None - - self.program_elapsed_time = None - self.ingestion_elapsed_time = None + self.ingestion_program_elapsed_time = None + self.scoring_program_exit_code = None + self.scoring_program_elapsed_time = None # Socket connection to stream output of submission submission_api_url_parsed = urlparse(self.submissions_api_url) @@ -522,7 +520,10 @@ async def watch_detailed_results(self): start = time.time() expiration_seconds = 60 - while self.watch and self.completed_program_counter < 2: + # When running scoring program, we have at least one program to run i.e. scoring_program + # Sometimes when ingestion_only_during_scoring is True, we have two programs to run + expected_completed_program_counters = 1 + int(bool(self.ingestion_only_during_scoring)) + while self.watch and self.completed_program_counter < expected_completed_program_counters: if file_path: new_time = os.path.getmtime(file_path) if new_time != last_modified_time: @@ -774,6 +775,98 @@ def _get_bundle(self, url, destination, cache=True): # Return the zip file path for other uses, e.g. for creating a MD5 hash to identify it return bundle_file + def _create_container( + self, + container_name: str, + command: str, + volumes_host: list, + volumes_config: dict + ): + """ + Helper to create and configure a container for ingestion, scoring, or submission. + Returns the container object. + """ + + logger.info( + "Creating Container with: " + f"Container Name: {container_name} \n" + f"Command: {command} \n" + "Volumes config:" + ) + pprint(volumes_config) + + cap_drop_list = [ + "AUDIT_WRITE", + "CHOWN", + "DAC_OVERRIDE", + "FOWNER", + "FSETID", + "KILL", + "MKNOD", + "NET_BIND_SERVICE", + "NET_RAW", + "SETFCAP", + "SETGID", + "SETPCAP", + "SETUID", + "SYS_CHROOT", + ] + + # Configure whether or not we use the GPU. Also setting auto_remove to False because + if Settings.CONTAINER_ENGINE_EXECUTABLE == Settings.DOCKER: + security_options = ["no-new-privileges"] + else: + security_options = ["label=disable"] + + # Setting the device ID like this allows users to specify which gpu to use in the .env file, with all being the default if no value is given + device_id = [Settings.GPU_DEVICE] + if Settings.USE_GPU: + logger.info("Running the container with GPU capabilities") + host_config = client.create_host_config( + auto_remove=False, + cap_drop=cap_drop_list, + binds=volumes_config, + userns_mode="host", + security_opt=security_options, + device_requests=[ + { + "Driver": "cdi", + "DeviceIDs": device_id, + }, + ], + ) + else: + host_config = client.create_host_config( + auto_remove=False, + cap_drop=cap_drop_list, + binds=volumes_config, + userns_mode="host", + security_opt=security_options, + ) + + # Creating container + # COMPETITION_CONTAINER_NETWORK_DISABLED: Disable or not the competition container access to Internet (False by default) + # HTTP and HTTPS proxy for the competition container if needed + container = client.create_container( + self.container_image, + name=container_name, + host_config=host_config, + detach=False, + volumes=volumes_host, + command=command, + working_dir="/app/program", + environment=[ + "PYTHONUNBUFFERED=1", + "http_proxy=" + Settings.COMPETITION_CONTAINER_HTTP_PROXY, + "https_proxy=" + Settings.COMPETITION_CONTAINER_HTTPS_PROXY, + ], + network_disabled=Settings.COMPETITION_CONTAINER_NETWORK_DISABLED, + ) + + logger.debug("Created container: " + str(container)) + + return container + async def _run_container_engine_cmd(self, container, kind): """This runs a command and asynchronously writes the data to both a storage file and a socket @@ -902,13 +995,13 @@ async def _run_container_engine_cmd(self, container, kind): "data": logs_Unified[0], "stream": logs_Unified[0], "continue": True, - "location": self.stdout if kind == "program" else self.ingestion_stdout, + "location": self.stdout if kind == ProgramKind.SCORING_PROGRAM else self.ingestion_stdout, }, "stderr": { "data": logs_Unified[1], "stream": logs_Unified[1], "continue": True, - "location": self.stderr if kind == "program" else self.ingestion_stderr, + "location": self.stderr if kind == ProgramKind.SCORING_PROGRAM else self.ingestion_stderr, }, } @@ -937,129 +1030,85 @@ def _get_host_path(self, *paths): return path - async def _run_program_directory(self, program_dir, kind): + async def _run_program_directory(self, kind, program_dir): """ - Function responsible for running program directory + Function responsible for running + - ingestion program + - scoring program Args: - - program_dir : can be either ingestion program or program/submission - - kind : either `program` or `ingestion` + kind: `ingestion_program` or `scoring_program` + program_dir: path to the program to run """ - # If the directory doesn't even exist, move on + # Return if directory does not exist if not os.path.exists(program_dir): - logger.warning(f"{program_dir} not found, no program to execute") + logger.warning(f"{program_dir} for {kind} not found, no program to execute") # Communicate that the program is closing self.completed_program_counter += 1 return + # Find metadata file. + # Raise error if metadata is not found if os.path.exists(os.path.join(program_dir, "metadata.yaml")): metadata_path = "metadata.yaml" elif os.path.exists(os.path.join(program_dir, "metadata")): metadata_path = "metadata" else: - # Display a warning in logs when there is no metadata file in submission/program dir - if kind == "program": - logger.warning( - "Program directory missing metadata, assuming it's going to be handled by ingestion" - ) - # Copy submission files into prediction output - # This is useful for results submissions but wrongly uses storage - shutil.copytree(program_dir, self.output_dir) - return - else: - raise SubmissionException( - "Program directory missing 'metadata.yaml/metadata'" - ) + error_message = f"{program_dir} for {kind} missing 'metadata.yaml/metadata' file." + logger.error(error_message) + raise SubmissionException(error_message) + # Metadata file is found logger.info(f"Metadata path is {os.path.join(program_dir, metadata_path)}") + + # Reading metadata file to find command. + # Raise error if command is not found for ingestion or scoring with open(os.path.join(program_dir, metadata_path), "r") as metadata_file: - try: # try to find a command in the metadata, in other cases set metadata to None + command = None + try: metadata = yaml.safe_load(metadata_file.read()) logger.info(f"Metadata contains:\n {metadata}") - if isinstance(metadata, dict): # command found + if isinstance(metadata, dict): command = metadata.get("command") - else: - command = None + except yaml.YAMLError as e: + logger.error(f"Error parsing YAML file: {e}") - print(f"Error parsing YAML file: {e}") - command = None - if not command and kind == "ingestion": - raise SubmissionException( - "Program directory missing 'command' in metadata" - ) - elif not command: - logger.warning( - f"Warning: {program_dir} has no command in metadata, continuing anyway " - f"(may be meant to be consumed by an ingestion program)" - ) - return + + if not command: + raise SubmissionException(f"Missing 'command' for {kind} in metadata or metadata format is not correct!") + + # Prepare volumes_host and volumes_config volumes_host = [ self._get_host_path(program_dir), self._get_host_path(self.output_dir), self.data_dir, + self._get_host_path(self.root_dir, "submission") ] volumes_config = { - volumes_host[0]: { - "bind": "/app/program", - "mode": "z", - }, - volumes_host[1]: { - "bind": "/app/output", - "mode": "z", - }, - volumes_host[2]: { - "bind": "/app/data", - "mode": "ro", - }, + volumes_host[0]: {"bind": "/app/program", "mode": "z"}, + volumes_host[1]: {"bind": "/app/output", "mode": "z"}, + volumes_host[2]: {"bind": "/app/data", "mode": "ro"}, + volumes_host[3]: {"bind": "/app/ingested_program", "mode": "ro"}, } - if kind == "ingestion": - # program here is either scoring program or submission, depends on if this ran during Prediction or Scoring - if self.ingestion_only_during_scoring and self.is_scoring: - # submission program moved to 'input/res' with shutil.move() above - ingested_program_location = "input/res" - else: - ingested_program_location = "program" - volumes_host.extend( - [self._get_host_path(self.root_dir, ingested_program_location)] - ) - tempvolumeConfig = { - volumes_host[-1]: { - "bind": "/app/ingested_program", - } - } - volumes_config.update(tempvolumeConfig) - - if self.is_scoring: - # For scoring programs, we want to have a shared directory just in case we have an ingestion program. - # This will add the share dir regardless of ingestion or scoring, as long as we're `is_scoring` + # During Scoring: Add `shared` and `/app/input` to volumes_host and update volumes_config + if kind == ProgramKind.SCORING_PROGRAM: + # For scoring program, we want to have a shared directory just in case we have an ingestion program. volumes_host.extend([self._get_host_path(self.root_dir, "shared")]) - tempvolumeConfig = { - volumes_host[-1]: { - "bind": "/app/shared", - } - } - volumes_config.update(tempvolumeConfig) + volumes_config.update({volumes_host[-1]: {"bind": "/app/shared"}}) - # Input from submission (or submission + ingestion combo) + # Input dir for scoring program volumes_host.extend([self._get_host_path(self.input_dir)]) - tempvolumeConfig = { - volumes_host[-1]: { - "bind": "/app/input", - } - } - volumes_config.update(tempvolumeConfig) - - if self.input_data: - volumes_host.extend([self._get_host_path(self.root_dir, "input_data")]) - tempvolumeConfig = { - volumes_host[-1]: { - "bind": "/app/input_data", - } - } - volumes_config.update(tempvolumeConfig) + volumes_config.update({volumes_host[-1]: {"bind": "/app/input"}}) + + # During Ingestion: Add `/app/input_data` to volumes_host and update volumes_config + if kind == ProgramKind.INGESTION_PROGRAM: + # NOTE: self.input_data is valid when running an ingestion program and competition task has input data + if self.input_data: + volumes_host.extend([self._get_host_path(self.root_dir, "input_data")]) + volumes_config.update({volumes_host[-1]: {"bind": "/app/input_data"}}) # Handle Legacy competitions by replacing anything in the run command command = replace_legacy_metadata_command( @@ -1069,84 +1118,15 @@ async def _run_program_directory(self, program_dir, kind): ingestion_only_during_scoring=self.ingestion_only_during_scoring, ) - cap_drop_list = [ - "AUDIT_WRITE", - "CHOWN", - "DAC_OVERRIDE", - "FOWNER", - "FSETID", - "KILL", - "MKNOD", - "NET_BIND_SERVICE", - "NET_RAW", - "SETFCAP", - "SETGID", - "SETPCAP", - "SETUID", - "SYS_CHROOT", - ] - - # Configure whether or not we use the GPU. Also setting auto_remove to False because - if Settings.CONTAINER_ENGINE_EXECUTABLE == Settings.DOCKER: - security_options = ["no-new-privileges"] - else: - security_options = ["label=disable"] - - # Setting the device ID like this allows users to specify which gpu to use in the .env file, with all being the default if no value is given - device_id = [Settings.GPU_DEVICE] - if Settings.USE_GPU: - logger.info("Running the container with GPU capabilities") - host_config = client.create_host_config( - auto_remove=False, - cap_drop=cap_drop_list, - binds=volumes_config, - userns_mode="host", - security_opt=security_options, - device_requests=[ - { - "Driver": "cdi", - "DeviceIDs": device_id, - }, - ], - ) - else: - host_config = client.create_host_config( - auto_remove=False, - cap_drop=cap_drop_list, - binds=volumes_config, - userns_mode="host", - security_opt=security_options, - ) - - logger.info("Running container with command " + command) - container_name = ( - self.ingestion_container_name - if kind == "ingestion" - else self.program_container_name - ) - - # Creating container - # COMPETITION_CONTAINER_NETWORK_DISABLED: Disable or not the competition container access to Internet (False by default) - # HTTP and HTTPS proxy for the competition container if needed - container = client.create_container( - self.container_image, - name=container_name, - host_config=host_config, - detach=False, - volumes=volumes_host, + # Create container + container_name = self.ingestion_program_container_name if kind == ProgramKind.INGESTION_PROGRAM else self.scoring_program_container_name + container = self._create_container( + container_name=container_name, command=command, - working_dir="/app/program", - environment=[ - "PYTHONUNBUFFERED=1", - "http_proxy=" + Settings.COMPETITION_CONTAINER_HTTP_PROXY, - "https_proxy=" + Settings.COMPETITION_CONTAINER_HTTPS_PROXY, - ], - network_disabled=Settings.COMPETITION_CONTAINER_NETWORK_DISABLED, + volumes_host=volumes_host, + volumes_config=volumes_config ) - logger.debug("Created container: " + str(container)) - logger.info("Volume configuration of the container: ") - pprint(volumes_config) # This runs the container engine command and asynchronously passes data back via websocket try: return await self._run_container_engine_cmd(container, kind=kind) @@ -1221,6 +1201,30 @@ def _prep_cache_dir(self, max_size=Settings.MAX_CACHE_DIR_SIZE_GB): else: logger.info("Cache directory does not need to be pruned!") + def _copy_submission_to_input_res(self): + """ + Temporary backward-compatibility function. + + Earlier, scoring programs expected submission files in ingestion output: + /app/input/res/ + + Newer changes expose submission under: + /app/ingested_program/ + + To avoid breaking older scoring programs, we copy the submission + directory into input/res + """ + + submission_directory = os.path.join(self.root_dir, "submission") + ingestion_res_directory = os.path.join(self.root_dir, "input/res") + + # copy from submission_directory ingestion_res_directory + try: + shutil.copytree(submission_directory, ingestion_res_directory, dirs_exist_ok=True) + logger.info("Copied submission files to input/res successfully") + except Exception as e: + logger.error(f"Failed to copy submission to input/res: {e}") + def prepare(self): hostname = utils.nodenames.gethostname() if self.is_scoring: @@ -1236,8 +1240,9 @@ def prepare(self): # sub folder. bundles = [ # (url to file, relative folder destination) - (self.program_data, "program"), (self.ingestion_program_data, "ingestion_program"), + (self.scoring_program_data, "scoring_program"), + (self.submission_data, "submission"), (self.input_data, "input_data"), (self.reference_data, "input/ref"), ] @@ -1251,8 +1256,8 @@ def prepare(self): cache_this_bundle = path in ("input_data", "input/ref") zip_file = self._get_bundle(url, path, cache=cache_this_bundle) - # TODO: When we have `is_scoring_only` this needs to change... - if url == self.program_data and not self.is_scoring: + # Computing checksum of the submission file during ingestion run + if url == self.submission_data and not self.is_scoring: # We want to get a checksum of submissions so we can check if they are # a solution, or maybe match them against other submissions later logger.info(f"Beginning MD5 checksum of submission: {zip_file}") @@ -1260,6 +1265,11 @@ def prepare(self): logger.info(f"Checksum result: {checksum}") self._update_submission({"md5": checksum}) + # During scoring: copy submission files into "input/res" + if self.is_scoring: + # NOTE: Temporary compatibility hook (To be removed in the future) + self._copy_submission_to_input_res() + # For logging purposes let's dump file names for filename in glob.iglob(self.root_dir + "**/*.*", recursive=True): logger.info(filename) @@ -1270,19 +1280,42 @@ def prepare(self): self._update_status(SubmissionStatus.RUNNING) def start(self): - program_dir = os.path.join(self.root_dir, "program") + + logger.info(f"Preparing to run: {ProgramKind.SCORING_PROGRAM if self.is_scoring else ProgramKind.INGESTION_PROGRAM}") + + # Define directories for ingestion, scoring and submission ingestion_program_dir = os.path.join(self.root_dir, "ingestion_program") + scoring_program_dir = os.path.join(self.root_dir, "scoring_program") - logger.info("Running scoring program, and then ingestion program") loop = asyncio.new_event_loop() # Set the event loop for the gather asyncio.set_event_loop(loop) - gathered_tasks = asyncio.gather( - self._run_program_directory(program_dir, kind="program"), - self._run_program_directory(ingestion_program_dir, kind="ingestion"), - self.watch_detailed_results(), - return_exceptions=True, - ) + + tasks = [] + if self.is_scoring: + # During scoring, run scoring program directory + tasks.append( + self._run_program_directory(kind=ProgramKind.SCORING_PROGRAM, program_dir=scoring_program_dir) + ) + + # If ingestion_only_during_scoring is true, we also run ingestion program directory in parallel to scoring program + if self.ingestion_only_during_scoring: + tasks.append( + self._run_program_directory(kind=ProgramKind.INGESTION_PROGRAM, program_dir=ingestion_program_dir) + ) + + # During scoring we watch for detailed results + tasks.append( + self.watch_detailed_results() + ) + else: + # During ingestion we run ingestion program directory and submission directory + tasks.extend([ + self._run_program_directory(kind=ProgramKind.INGESTION_PROGRAM, program_dir=ingestion_program_dir), + ]) + + gathered_tasks = asyncio.gather(*tasks, return_exceptions=True) + task_results = [] # will store results/exceptions from gather signal.signal(signal.SIGALRM, alarm_handler) signal.alarm(self.execution_time_limit) @@ -1301,25 +1334,24 @@ def start(self): "error_message": error_message, "is_scoring": self.is_scoring, } + # Cleanup containers containers_to_kill = [ self.ingestion_container_name, self.program_container_name ] - logger.debug( - "Trying to kill and remove container " + str(containers_to_kill) - ) + logger.debug("Trying to kill and remove container " + str(containers_to_kill)) + for container in containers_to_kill: try: client.remove_container(str(container), force=True) except docker.errors.APIError as e: logger.error(e) except Exception as e: - logger.error( - f"There was a problem killing {containers_to_kill}: {e}" - ) + logger.error(f"There was a problem killing {containers_to_kill}: {e}") if Settings.LOG_LEVEL == Settings.LOG_LEVEL_DEBUG: logger.exception(e) + # Send data to be written to ingestion/scoring std_err self._update_submission(execution_time_limit_exceeded_data) # Send error through web socket to the frontend @@ -1357,10 +1389,10 @@ def start(self): ) if return_code is None: logger.warning("No return code from Process. Killing it") - if kind == "ingestion": - containers_to_kill = self.ingestion_container_name + if kind == ProgramKind.INGESTION_PROGRAM: + containers_to_kill = self.ingestion_program_container_name else: - containers_to_kill = self.program_container_name + containers_to_kill = self.scoring_program_container_name try: client.kill(containers_to_kill) client.remove_container(containers_to_kill, force=True) @@ -1372,12 +1404,12 @@ def start(self): ) if Settings.LOG_LEVEL == Settings.LOG_LEVEL_DEBUG: logger.exception(e) - if kind == "program": - self.program_exit_code = return_code - self.program_elapsed_time = elapsed_time - elif kind == "ingestion": + if kind == ProgramKind.SCORING_PROGRAM: + self.scoring_program_exit_code = return_code + self.scoring_program_elapsed_time = elapsed_time + elif kind == ProgramKind.INGESTION_PROGRAM: self.ingestion_program_exit_code = return_code - self.ingestion_elapsed_time = elapsed_time + self.ingestion_program_elapsed_time = elapsed_time logger.info(f"[exited with {logs['returncode']}]") for key, value in logs.items(): if key not in ["stdout", "stderr"]: @@ -1392,12 +1424,17 @@ def start(self): if self.is_scoring: # Check if scoring program failed - program_results, _, _ = task_results + # We have try except here because when running scoring program we can have 2 or 3 gathered tasks + # 3 gathered tasks in case when `ingestion_only_during_scoring` is True + try: + program_results, _, _ = task_results + except Exception: + program_results, _ = task_results # Gather returns either normal values or exception instances when return_exceptions=True had_async_exc = isinstance( program_results, BaseException ) and not isinstance(program_results, asyncio.CancelledError) - program_rc = getattr(self, "program_exit_code", None) + program_rc = getattr(self, "scoring_program_exit_code", None) failed_rc = (program_rc is None) or (program_rc != 0) if had_async_exc or failed_rc: self._update_status( @@ -1452,11 +1489,11 @@ def push_output(self): """Output is pushed at the end of both prediction and scoring steps.""" # V1.5 compatibility, write program statuses to metadata file prog_status = { - "exitCode": self.program_exit_code, + "exitCode": self.scoring_program_exit_code, # for v1.5 compat, send `ingestion_elapsed_time` if no `program_elapsed_time` - "elapsedTime": self.program_elapsed_time or self.ingestion_elapsed_time, + "elapsedTime": self.scoring_program_elapsed_time or self.ingestion_program_elapsed_time, "ingestionExitCode": self.ingestion_program_exit_code, - "ingestionElapsedTime": self.ingestion_elapsed_time, + "ingestionElapsedTime": self.ingestion_program_elapsed_time, } logger.info(f"Metadata output: {prog_status}") diff --git a/src/apps/competitions/tasks.py b/src/apps/competitions/tasks.py index 623ac2522..8c6b2aa62 100644 --- a/src/apps/competitions/tasks.py +++ b/src/apps/competitions/tasks.py @@ -165,7 +165,7 @@ def _send_to_compute_worker(submission, is_scoring): if task.ingestion_program: if (task.ingestion_only_during_scoring and is_scoring) or (not task.ingestion_only_during_scoring and not is_scoring): - run_args['ingestion_program'] = make_url_sassy(task.ingestion_program.data_file.name) + run_args['ingestion_program_data'] = make_url_sassy(task.ingestion_program.data_file.name) if task.input_data and (not is_scoring or task.ingestion_only_during_scoring): run_args['input_data'] = make_url_sassy(task.input_data.data_file.name) @@ -175,9 +175,10 @@ def _send_to_compute_worker(submission, is_scoring): run_args['ingestion_only_during_scoring'] = task.ingestion_only_during_scoring - run_args['program_data'] = make_url_sassy( - path=submission.data.data_file.name if not is_scoring else task.scoring_program.data_file.name - ) + if is_scoring: + run_args['scoring_program_data'] = make_url_sassy(path=task.scoring_program.data_file.name) + + run_args['submission_data'] = make_url_sassy(path=submission.data.data_file.name) if not is_scoring: detail_names = SubmissionDetails.DETAILED_OUTPUT_NAMES_PREDICTION