Skip to content

Commit 1022e4f

Browse files
committed
compute worker updates
1 parent 66457a9 commit 1022e4f

2 files changed

Lines changed: 63 additions & 42 deletions

File tree

compute_worker/compute_worker.py

Lines changed: 60 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
class ProgramKind:
4343
INGESTION_PROGRAM = "ingestion_program"
4444
SCORING_PROGRAM = "scoring_program"
45+
SUBMISSION = "submission"
4546

4647

4748
# -----------------------------------------------
@@ -970,13 +971,13 @@ async def _run_container_engine_cmd(self, container, kind):
970971
"data": logs_Unified[0],
971972
"stream": logs_Unified[0],
972973
"continue": True,
973-
"location": self.stdout if kind == "program" else self.ingestion_stdout,
974+
"location": self.stdout if kind == ProgramKind.SCORING_PROGRAM else self.ingestion_stdout,
974975
},
975976
"stderr": {
976977
"data": logs_Unified[1],
977978
"stream": logs_Unified[1],
978979
"continue": True,
979-
"location": self.stderr if kind == "program" else self.ingestion_stderr,
980+
"location": self.stderr if kind == ProgramKind.SCORING_PROGRAM else self.ingestion_stderr,
980981
},
981982
}
982983

@@ -1005,7 +1006,7 @@ def _get_host_path(self, *paths):
10051006

10061007
return path
10071008

1008-
async def _run_program_directory(self, program_dir, kind):
1009+
async def _rrun_program_directory(self, program_dir, kind):
10091010
"""
10101011
Function responsible for running program directory
10111012
@@ -1235,63 +1236,87 @@ async def _run_program_directory(self, program_dir, kind):
12351236
logger.exception("Program directory execution failed")
12361237
raise SubmissionException(str(e))
12371238

1238-
async def _run_ingestion_program_directory(self, program_dir):
1239+
async def _run_program_directory(self, kind, program_dir):
12391240
"""
1240-
Run ingestion program directory.
1241+
Function responsible for running
1242+
- ingestion program
1243+
- scoring program
1244+
- submission
12411245
12421246
Args:
1243-
program_dir: path to ingestion program
1247+
kind: `ingestion_program` or `scoring_program` or `submission`
1248+
program_dir: path to the program to run
12441249
"""
12451250
# Return if directory does not exist
12461251
if not os.path.exists(program_dir):
1247-
logger.warning(f"{program_dir} not found, no program to execute")
1252+
logger.warning(f"{program_dir} for {kind} not found, no program to execute")
12481253
# Communicate that the program is closing
12491254
self.completed_program_counter += 1
12501255
return
12511256

1252-
# Find metadata file. Raise error if metadata is not founc
1257+
# Find metadata file.
1258+
# Raise error if metadata is not found for ingestion or scoring
12531259
if os.path.exists(os.path.join(program_dir, "metadata.yaml")):
12541260
metadata_path = "metadata.yaml"
12551261
elif os.path.exists(os.path.join(program_dir, "metadata")):
12561262
metadata_path = "metadata"
12571263
else:
1258-
raise SubmissionException(
1259-
"Ingestion program directory missing 'metadata.yaml/metadata'"
1260-
)
1264+
if kind in [ProgramKind.INGESTION_PROGRAM, ProgramKind.SCORING_PROGRAM]:
1265+
error_message = f"{program_dir} for {kind} missing 'metadata.yaml/metadata' file."
1266+
logger.error(error_message)
1267+
raise SubmissionException(error_message)
1268+
else:
1269+
logger.warning(f"{program_dir} for {kind} missing 'metadata.yaml/metadata' file. Assuming it is going to be handled by ingestion or scoring")
12611270

1271+
# Metadata file is found
12621272
logger.info(f"Metadata path is {os.path.join(program_dir, metadata_path)}")
1273+
1274+
# Reading metadata file to find command.
1275+
# Raise error if command is not found for ingestion or scoring
12631276
with open(os.path.join(program_dir, metadata_path), "r") as metadata_file:
1264-
# try to find a command in the metadata, in other cases set metadata to None
1277+
command = None
12651278
try:
12661279
metadata = yaml.load(metadata_file.read(), Loader=yaml.FullLoader)
12671280
logger.info(f"Metadata contains:\n {metadata}")
12681281
if isinstance(metadata, dict):
12691282
command = metadata.get("command")
1270-
else:
1271-
command = None
12721283
except yaml.YAMLError as e:
12731284
logger.error("Error parsing YAML file: ", e)
1274-
print("Error parsing YAML file: ", e)
1275-
command = None
12761285

1277-
if not command:
1286+
if not command and kind in [ProgramKind.INGESTION_PROGRAM, ProgramKind.SCORING_PROGRAM]:
12781287
raise SubmissionException(
12791288
"Missing 'command' in metadata or metadata format is not correct!"
12801289
)
1290+
else:
1291+
logger.warning(
1292+
"Missing 'command' in metadata or metadata format is not correct! Continuing anyway assuming it is going to be handled by ingestion or scoring"
1293+
)
12811294

1295+
# Setting volume host and volumes config.
1296+
# To be used by `_create_container` function
12821297
volumes_host = [
12831298
self._get_host_path(program_dir),
12841299
self._get_host_path(self.output_dir),
12851300
self.data_dir,
1286-
self._get_host_path(self.root_dir, "program")
1301+
self._get_host_path(self.root_dir, "submission")
12871302
]
12881303
volumes_config = {
12891304
volumes_host[0]: {"bind": "/app/program", "mode": "z"},
12901305
volumes_host[1]: {"bind": "/app/output", "mode": "z"},
12911306
volumes_host[2]: {"bind": "/app/data", "mode": "ro"},
1292-
volumes_host[3]: {"bind": "/app/ingested_program"}
1307+
volumes_host[3]: {"bind": "/app/ingested_program", "mode": "ro"},
12931308
}
12941309

1310+
if kind == ProgramKind.SCORING_PROGRAM:
1311+
# For scoring program, we want to have a shared directory just in case we have an ingestion program.
1312+
volumes_host.extend([self._get_host_path(self.root_dir, "shared")])
1313+
volumes_config.update({volumes_host[-1]: {"bind": "/app/shared"}})
1314+
1315+
# Input dir for scoring program
1316+
volumes_host.extend([self._get_host_path(self.root_dir, "input")])
1317+
volumes_config.update({volumes_host[-1]: {"bind": "/app/input"}})
1318+
1319+
# NOTE: self.input_data is valid when running an ingestion program and competition task has input data
12951320
if self.input_data:
12961321
volumes_host.append(self._get_host_path(self.root_dir, "input_data"))
12971322
volumes_config.update({volumes_host[-1]: {"bind": "/app/input_data"}})
@@ -1317,7 +1342,7 @@ async def _run_ingestion_program_directory(self, program_dir):
13171342
pprint(volumes_config)
13181343
# This runs the container engine command and asynchronously passes data back via websocket
13191344
try:
1320-
return await self._run_container_engine_cmd(container, kind=ProgramKind.INGESTION_PROGRAM)
1345+
return await self._run_container_engine_cmd(container, kind=kind)
13211346
except Exception as e:
13221347
logger.exception("Program directory execution failed")
13231348
raise SubmissionException(str(e))
@@ -1456,14 +1481,13 @@ def prepare(self):
14561481
self._get_container_image(self.container_image)
14571482

14581483
def start(self):
1459-
# program_dir = os.path.join(self.root_dir, "program")
1460-
# ingestion_program_dir = os.path.join(self.root_dir, "ingestion_program")
1461-
submission_dir = os.path.join(self.root_dir, "submission")
1462-
scoring_program_dir = os.path.join(self.root_dir, "scoring_program")
1463-
ingestion_program_dir = os.path.join(self.root_dir, "ingestion_program")
14641484

1465-
# logger.info("Running scoring program, and then ingestion program")
1466-
logger.info(f"Starting run: {ProgramKind.SCORING_PROGRAM if self.is_scoring else ProgramKind.INGESTION_PROGRAM}")
1485+
logger.info(f"Preparing to run: {ProgramKind.SCORING_PROGRAM if self.is_scoring else ProgramKind.INGESTION_PROGRAM}")
1486+
1487+
# Define directories for ingestion, scoring and submission
1488+
ingestion_program_dir = os.path.join(self.root_dir, "ingestion_program")
1489+
scoring_program_dir = os.path.join(self.root_dir, "scoring_program")
1490+
submission_dir = os.path.join(self.root_dir, "submission")
14671491

14681492
loop = asyncio.new_event_loop()
14691493
# Set the event loop for the gather
@@ -1473,31 +1497,26 @@ def start(self):
14731497
if self.is_scoring:
14741498
# During scoring, run scoring program directory
14751499
tasks.append(
1476-
self._run_scoring_program_directory(scoring_program_dir)
1500+
self._run_program_directory(kind=ProgramKind.SCORING_PROGRAM, program_dir=scoring_program_dir)
14771501
)
14781502

1479-
# If ingestion_only_during_scoring is true, we also run ingestion program directory
1503+
# If ingestion_only_during_scoring is true, we also run ingestion program directory in parallel to scoring program
14801504
if self.ingestion_only_during_scoring:
14811505
tasks.append(
1482-
self._run_ingestion_program_directory(ingestion_program_dir)
1506+
self._run_program_directory(kind=ProgramKind.INGESTION_PROGRAM, program_dir=ingestion_program_dir)
14831507
)
14841508

1485-
tasks.append(
1486-
self.watch_detailed_results()
1487-
)
1509+
# During scoring we watch for detailed results
1510+
# tasks.append(
1511+
# self.watch_detailed_results()
1512+
# )
14881513
else:
14891514
# During ingestion we run ingestion program directory and submission directory
14901515
tasks.extend([
1491-
self._run_ingestion_program_directory(ingestion_program_dir),
1492-
self._run_submission_directory(submission_dir)
1516+
self._run_program_directory(kind=ProgramKind.INGESTION_PROGRAM, program_dir=ingestion_program_dir),
1517+
self._run_program_directory(kind=ProgramKind.SUBMISSION, program_dir=submission_dir)
14931518
])
14941519

1495-
# gathered_tasks = asyncio.gather(
1496-
# self._run_program_directory(program_dir, kind="program"),
1497-
# self._run_program_directory(ingestion_program_dir, kind="ingestion"),
1498-
# self.watch_detailed_results(),
1499-
# return_exceptions=True,
1500-
# )
15011520
gathered_tasks = asyncio.gather(*tasks, return_exceptions=True)
15021521

15031522
task_results = [] # will store results/exceptions from gather

src/apps/competitions/tasks.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,9 @@ def _send_to_compute_worker(submission, is_scoring):
175175

176176
run_args['ingestion_only_during_scoring'] = task.ingestion_only_during_scoring
177177

178-
run_args['scoring_program_data'] = make_url_sassy(path=task.scoring_program.data_file.name)
178+
if is_scoring:
179+
run_args['scoring_program_data'] = make_url_sassy(path=task.scoring_program.data_file.name)
180+
179181
run_args['submission_data'] = make_url_sassy(path=submission.data.data_file.name)
180182

181183
if not is_scoring:

0 commit comments

Comments
 (0)