@@ -1006,236 +1006,6 @@ def _get_host_path(self, *paths):
10061006
10071007 return path
10081008
1009- async def _rrun_program_directory (self , program_dir , kind ):
1010- """
1011- Function responsible for running program directory
1012-
1013- Args:
1014- - program_dir : can be either ingestion program or program/submission
1015- - kind : either `program` or `ingestion`
1016- """
1017- # If the directory doesn't even exist, move on
1018- if not os .path .exists (program_dir ):
1019- logger .warning (f"{ program_dir } not found, no program to execute" )
1020-
1021- # Communicate that the program is closing
1022- self .completed_program_counter += 1
1023- return
1024-
1025- if os .path .exists (os .path .join (program_dir , "metadata.yaml" )):
1026- metadata_path = "metadata.yaml"
1027- elif os .path .exists (os .path .join (program_dir , "metadata" )):
1028- metadata_path = "metadata"
1029- else :
1030- # Display a warning in logs when there is no metadata file in submission/program dir
1031- if kind == "program" :
1032- logger .warning (
1033- "Program directory missing metadata, assuming it's going to be handled by ingestion"
1034- )
1035- # Copy submission files into prediction output
1036- # This is useful for results submissions but wrongly uses storage
1037- shutil .copytree (program_dir , self .output_dir )
1038- return
1039- else :
1040- raise SubmissionException (
1041- "Program directory missing 'metadata.yaml/metadata'"
1042- )
1043-
1044- logger .info (f"Metadata path is { os .path .join (program_dir , metadata_path )} " )
1045- with open (os .path .join (program_dir , metadata_path ), "r" ) as metadata_file :
1046- try : # try to find a command in the metadata, in other cases set metadata to None
1047- metadata = yaml .load (metadata_file .read (), Loader = yaml .FullLoader )
1048- logger .info (f"Metadata contains:\n { metadata } " )
1049- if isinstance (metadata , dict ): # command found
1050- command = metadata .get ("command" )
1051- else :
1052- command = None
1053- except yaml .YAMLError as e :
1054- logger .error ("Error parsing YAML file: " , e )
1055- print ("Error parsing YAML file: " , e )
1056- command = None
1057- if not command and kind == "ingestion" :
1058- raise SubmissionException (
1059- "Program directory missing 'command' in metadata"
1060- )
1061- elif not command :
1062- logger .warning (
1063- f"Warning: { program_dir } has no command in metadata, continuing anyway "
1064- f"(may be meant to be consumed by an ingestion program)"
1065- )
1066- return
1067- volumes_host = [
1068- self ._get_host_path (program_dir ),
1069- self ._get_host_path (self .output_dir ),
1070- self .data_dir ,
1071- ]
1072- volumes_config = {
1073- volumes_host [0 ]: {
1074- "bind" : "/app/program" ,
1075- "mode" : "z" ,
1076- },
1077- volumes_host [1 ]: {
1078- "bind" : "/app/output" ,
1079- "mode" : "z" ,
1080- },
1081- volumes_host [2 ]: {
1082- "bind" : "/app/data" ,
1083- "mode" : "ro" ,
1084- },
1085- }
1086-
1087- if kind == "ingestion" :
1088- # program here is either scoring program or submission, depends on if this ran during Prediction or Scoring
1089- if self .ingestion_only_during_scoring and self .is_scoring :
1090- # submission program moved to 'input/res' with shutil.move() above
1091- ingested_program_location = "input/res"
1092- else :
1093- ingested_program_location = "program"
1094- volumes_host .extend (
1095- [self ._get_host_path (self .root_dir , ingested_program_location )]
1096- )
1097- tempvolumeConfig = {
1098- volumes_host [- 1 ]: {
1099- "bind" : "/app/ingested_program" ,
1100- }
1101- }
1102- volumes_config .update (tempvolumeConfig )
1103-
1104- if self .is_scoring :
1105- # For scoring programs, we want to have a shared directory just in case we have an ingestion program.
1106- # This will add the share dir regardless of ingestion or scoring, as long as we're `is_scoring`
1107- volumes_host .extend ([self ._get_host_path (self .root_dir , "shared" )])
1108- tempvolumeConfig = {
1109- volumes_host [- 1 ]: {
1110- "bind" : "/app/shared" ,
1111- }
1112- }
1113- volumes_config .update (tempvolumeConfig )
1114-
1115- # Input from submission (or submission + ingestion combo)
1116- volumes_host .extend ([self ._get_host_path (self .input_dir )])
1117- tempvolumeConfig = {
1118- volumes_host [- 1 ]: {
1119- "bind" : "/app/input" ,
1120- }
1121- }
1122- volumes_config .update (tempvolumeConfig )
1123-
1124- if self .input_data :
1125- volumes_host .extend ([self ._get_host_path (self .root_dir , "input_data" )])
1126- tempvolumeConfig = {
1127- volumes_host [- 1 ]: {
1128- "bind" : "/app/input_data" ,
1129- }
1130- }
1131- volumes_config .update (tempvolumeConfig )
1132-
1133- # Handle Legacy competitions by replacing anything in the run command
1134- command = replace_legacy_metadata_command (
1135- command = command ,
1136- kind = kind ,
1137- is_scoring = self .is_scoring ,
1138- ingestion_only_during_scoring = self .ingestion_only_during_scoring ,
1139- )
1140-
1141- cap_drop_list = [
1142- "AUDIT_WRITE" ,
1143- "CHOWN" ,
1144- "DAC_OVERRIDE" ,
1145- "FOWNER" ,
1146- "FSETID" ,
1147- "KILL" ,
1148- "MKNOD" ,
1149- "NET_BIND_SERVICE" ,
1150- "NET_RAW" ,
1151- "SETFCAP" ,
1152- "SETGID" ,
1153- "SETPCAP" ,
1154- "SETUID" ,
1155- "SYS_CHROOT" ,
1156- ]
1157- # Configure whether or not we use the GPU. Also setting auto_remove to False because
1158- if os .environ .get ("CONTAINER_ENGINE_EXECUTABLE" , "docker" ).lower () == "docker" :
1159- security_options = ["no-new-privileges" ]
1160- else :
1161- security_options = ["label=disable" ]
1162- # Setting the device ID like this allows users to specify which gpu to use in the .env file, with all being the default if no value is given
1163- device_id = [os .environ .get ("GPU_DEVICE" , "nvidia.com/gpu=all" )]
1164- if os .environ .get ("USE_GPU" , "false" ).lower () == "true" :
1165- logger .info ("Running the container with GPU capabilities" )
1166- host_config = client .create_host_config (
1167- auto_remove = False ,
1168- cap_drop = cap_drop_list ,
1169- binds = volumes_config ,
1170- userns_mode = "host" ,
1171- security_opt = security_options ,
1172- device_requests = [
1173- {
1174- "Driver" : "cdi" ,
1175- "DeviceIDs" : device_id ,
1176- },
1177- ],
1178- )
1179- else :
1180- host_config = client .create_host_config (
1181- auto_remove = False ,
1182- cap_drop = cap_drop_list ,
1183- binds = volumes_config ,
1184- userns_mode = "host" ,
1185- security_opt = security_options ,
1186- )
1187-
1188- logger .info ("Running container with command " + command )
1189- container_name = (
1190- self .ingestion_container_name
1191- if kind == "ingestion"
1192- else self .scoring_program_container_name
1193- )
1194- # Disable or not the competition container access to Internet (False by default)
1195- container_network_disabled = os .environ .get (
1196- "COMPETITION_CONTAINER_NETWORK_DISABLED" , ""
1197- )
1198-
1199- # HTTP and HTTPS proxy for the competition container if needed
1200- competition_container_proxy_http = os .environ .get (
1201- "COMPETITION_CONTAINER_HTTP_PROXY" , ""
1202- )
1203- competition_container_proxy_http = (
1204- "http_proxy=" + competition_container_proxy_http
1205- )
1206-
1207- competition_container_proxy_https = os .environ .get (
1208- "COMPETITION_CONTAINER_HTTPS_PROXY" , ""
1209- )
1210- competition_container_proxy_https = (
1211- "https_proxy=" + competition_container_proxy_https
1212- )
1213-
1214- container = client .create_container (
1215- self .container_image ,
1216- name = container_name ,
1217- host_config = host_config ,
1218- detach = False ,
1219- volumes = volumes_host ,
1220- command = command ,
1221- working_dir = "/app/program" ,
1222- environment = [
1223- "PYTHONUNBUFFERED=1" ,
1224- competition_container_proxy_http ,
1225- competition_container_proxy_https ,
1226- ],
1227- network_disabled = container_network_disabled .lower () == "true" ,
1228- )
1229- logger .debug ("Created container : " + str (container ))
1230- logger .info ("Volume configuration of the container: " )
1231- pprint (volumes_config )
1232- # This runs the container engine command and asynchronously passes data back via websocket
1233- try :
1234- return await self ._run_container_engine_cmd (container , kind = kind )
1235- except Exception as e :
1236- logger .exception ("Program directory execution failed" )
1237- raise SubmissionException (str (e ))
1238-
12391009 async def _run_program_directory (self , kind , program_dir ):
12401010 """
12411011 Function responsible for running
@@ -1254,7 +1024,7 @@ async def _run_program_directory(self, kind, program_dir):
12541024 self .completed_program_counter += 1
12551025 return
12561026
1257- # Find metadata file.
1027+ # Find metadata file.
12581028 # Raise error if metadata is not found for ingestion or scoring
12591029 if os .path .exists (os .path .join (program_dir , "metadata.yaml" )):
12601030 metadata_path = "metadata.yaml"
@@ -1623,23 +1393,24 @@ def start(self):
16231393
16241394 if self .is_scoring :
16251395 # Check if scoring program failed
1626- try :
1627- program_results , _ , _ = task_results
1628- except :
1629- program_results , _ = task_results
1630- # Gather returns either normal values or exception instances when return_exceptions=True
1631- had_async_exc = isinstance (
1632- program_results , BaseException
1633- ) and not isinstance (program_results , asyncio .CancelledError )
1634- program_rc = getattr (self , "program_exit_code" , None )
1635- failed_rc = (program_rc is None ) or (program_rc != 0 )
1636- if had_async_exc or failed_rc :
1637- self ._update_status (
1638- SubmissionStatus .FAILED ,
1639- extra_information = f"program_rc={ program_rc } , async={ task_results } " ,
1640- )
1641- # Raise so upstream marks failed immediately
1642- raise SubmissionException ("Child task failed or non-zero return code" )
1396+ # try:
1397+ # program_results, _, _ = task_results
1398+ # except:
1399+ # program_results, _ = task_results
1400+ # # Gather returns either normal values or exception instances when return_exceptions=True
1401+ # had_async_exc = isinstance(
1402+ # program_results, BaseException
1403+ # ) and not isinstance(program_results, asyncio.CancelledError)
1404+ # program_rc = getattr(self, "program_exit_code", None)
1405+ # failed_rc = (program_rc is None) or (program_rc != 0)
1406+ # if had_async_exc or failed_rc:
1407+ # self._update_status(
1408+ # SubmissionStatus.FAILED,
1409+ # extra_information=f"program_rc={program_rc}, async={task_results}",
1410+ # )
1411+ # # Raise so upstream marks failed immediately
1412+ # raise SubmissionException("Child task failed or non-zero return code")
1413+
16431414 self ._update_status (SubmissionStatus .FINISHED )
16441415
16451416 else :
0 commit comments