Skip to content

Commit b94529c

Browse files
authored
Merge pull request #2223 from codalab/fix-worker-logs
Compute worker - Improve status update and logs
2 parents 61d352a + 646aa33 commit b94529c

File tree

3 files changed

+172
-93
lines changed

3 files changed

+172
-93
lines changed

compute_worker/compute_worker.py

Lines changed: 141 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import hashlib
44
import json
55
import os
6+
import traceback
67
import shutil
78
import signal
89
import socket
@@ -18,7 +19,6 @@
1819
from rich.progress import Progress
1920
from rich.pretty import pprint
2021
import requests
21-
2222
import websockets
2323
import yaml
2424
from billiard.exceptions import SoftTimeLimitExceeded
@@ -89,22 +89,31 @@
8989

9090
def show_progress(line, progress):
9191
try:
92-
if "Status: Image is up to date" in line["status"]:
93-
logger.info(line["status"])
92+
status = line.get("status") or ""
93+
layer_id = line.get("id")
94+
detail = line.get("progressDetail") or {}
95+
current = detail.get("current")
96+
total = detail.get("total")
97+
98+
if "Status: Image is up to date" in status:
99+
logger.info(status)
100+
101+
if not layer_id:
102+
return
94103

95104
completed = False
96-
if line["status"] == "Download complete":
105+
if status == "Download complete":
97106
description = (
98-
f"[blue][Download complete, waiting for extraction {line['id']}]"
107+
f"[blue][Download complete, waiting for extraction {layer_id}]"
99108
)
100109
completed = True
101-
elif line["status"] == "Downloading":
102-
description = f"[bold][Downloading {line['id']}]"
103-
elif line["status"] == "Pull complete":
104-
description = f"[green][Extraction complete {line['id']}]"
110+
elif status == "Downloading":
111+
description = f"[bold][Downloading {layer_id}]"
112+
elif status == "Pull complete":
113+
description = f"[green][Extraction complete {layer_id}]"
105114
completed = True
106-
elif line["status"] == "Extracting":
107-
description = f"[blue][Extracting {line['id']}]"
115+
elif status == "Extracting":
116+
description = f"[blue][Extracting {layer_id}]"
108117

109118
else:
110119
# skip other statuses, but show extraction progress
@@ -121,7 +130,7 @@ def show_progress(line, progress):
121130
)
122131
else:
123132
tasks[task_id] = progress.add_task(
124-
description, total=line["progressDetail"]["total"]
133+
description, total=total
125134
)
126135
else:
127136
if completed:
@@ -134,12 +143,12 @@ def show_progress(line, progress):
134143
else:
135144
progress.update(
136145
tasks[task_id],
137-
completed=line["progressDetail"]["current"],
138-
total=line["progressDetail"]["total"],
146+
completed=current,
147+
total=total,
139148
)
140149
except Exception as e:
141-
logger.error("There was an error showing the progress bar")
142-
logger.error(e)
150+
if os.environ.get("LOG_LEVEL", "info").lower() == "debug":
151+
logger.exception("There was an error showing the progress bar")
143152

144153

145154
# -----------------------------------------------
@@ -244,20 +253,44 @@ def run_wrapper(run_args):
244253
run_args.update(secret=str(run_args["secret"]))
245254
logger.info(f"Received run arguments: \n {colorize_run_args(json.dumps(run_args))}")
246255
run = Run(run_args)
247-
248256
try:
249257
run.prepare()
250258
run.start()
251259
if run.is_scoring:
252260
run.push_scores()
253261
run.push_output()
254262
except DockerImagePullException as e:
255-
run._update_status(STATUS_FAILED, str(e))
256-
except SubmissionException as e:
257-
run._update_status(STATUS_FAILED, str(e))
263+
msg = str(e).strip()
264+
if msg:
265+
msg = f"Docker image pull failed: {msg}"
266+
else:
267+
msg = "Docker image pull failed."
268+
run._update_status(STATUS_FAILED, extra_information=msg)
269+
raise
258270
except SoftTimeLimitExceeded:
259-
run._update_status(STATUS_FAILED, "Soft time limit exceeded!")
271+
run._update_status(
272+
STATUS_FAILED,
273+
extra_information="Execution time limit exceeded.",
274+
)
275+
raise
276+
except SubmissionException as e:
277+
msg = str(e).strip()
278+
if msg:
279+
msg = f"Submission failed: {msg}. See logs for more details."
280+
else:
281+
msg = "Submission failed. See logs for more details."
282+
run._update_status(STATUS_FAILED, extra_information=msg)
283+
raise
284+
except Exception as e:
285+
# Catch any exception to avoid getting stuck in Running status
286+
run._update_status(STATUS_FAILED, extra_information=traceback.format_exc())
287+
raise
260288
finally:
289+
try:
290+
# Try to push logs before cleanup
291+
run.push_logs()
292+
except Exception:
293+
logger.exception("push_logs failed")
261294
run.clean_up()
262295

263296

@@ -446,6 +479,22 @@ async def watch_detailed_results(self):
446479
if file_path:
447480
await self.send_detailed_results(file_path)
448481

482+
def push_logs(self):
483+
"""Upload any collected logs, even in case of crash.
484+
"""
485+
try:
486+
for kind, logs in (self.logs or {}).items():
487+
for stream_key in ("stdout", "stderr"):
488+
entry = logs.get(stream_key) if isinstance(logs, dict) else None
489+
if not entry:
490+
continue
491+
location = entry.get("location")
492+
data = entry.get("data") or b""
493+
if location:
494+
self._put_file(location, raw_data=data)
495+
except Exception as e:
496+
logger.exception(f"Failed best-effort log upload: {e}")
497+
449498
def get_detailed_results_file_path(self):
450499
default_detailed_results_path = os.path.join(
451500
self.output_dir, "detailed_results.html"
@@ -467,7 +516,7 @@ async def send_detailed_results(self, file_path):
467516
)
468517
websocket_url = f"{self.websocket_url}?kind=detailed_results"
469518
logger.info(f"Connecting to {websocket_url} for detailed results")
470-
# Wrap this with a Try ... Except otherwise a failure here will make the submission get stuck on Running
519+
# Wrap this with a Try block to avoid getting stuck on Running
471520
try:
472521
websocket = await asyncio.wait_for(
473522
websockets.connect(websocket_url), timeout=30.0
@@ -480,14 +529,8 @@ async def send_detailed_results(self, file_path):
480529
)
481530
)
482531
except Exception as e:
483-
logger.error(
484-
f"This error might result in a Execution Time Exceeded error: {e}"
485-
)
486-
if os.environ.get("LOG_LEVEL", "info").lower() == "debug":
487-
logger.exception(e)
488-
raise SubmissionException(
489-
"Could not connect to instance to update detailed result"
490-
)
532+
logger.exception(e)
533+
return
491534

492535
def _get_stdout_stderr_file_names(self, run_args):
493536
# run_args should be the run_args argument passed to __init__ from the run_wrapper.
@@ -513,7 +556,7 @@ def _update_submission(self, data):
513556

514557
logger.info(f"Updating submission @ {url} with data = {data}")
515558

516-
resp = self.requests_session.patch(url, data, timeout=150)
559+
resp = self.requests_session.patch(url, data=data, timeout=150)
517560
if resp.status_code == 200:
518561
logger.info("Submission updated successfully!")
519562
else:
@@ -523,23 +566,17 @@ def _update_submission(self, data):
523566
raise SubmissionException("Failure updating submission data.")
524567

525568
def _update_status(self, status, extra_information=None):
569+
# Update submission status
526570
if status not in AVAILABLE_STATUSES:
527571
raise SubmissionException(
528572
f"Status '{status}' is not in available statuses: {AVAILABLE_STATUSES}"
529573
)
530-
531-
data = {
532-
"status": status,
533-
"status_details": extra_information,
534-
}
535-
536-
# TODO: figure out if we should pull this task code later(submission.task should always be set)
537-
# When we start
538-
# if status == STATUS_SCORING:
539-
# data.update({
540-
# "task_pk": self.task_pk,
541-
# })
542-
self._update_submission(data)
574+
data = {"status": status, "status_details": extra_information}
575+
try:
576+
self._update_submission(data)
577+
except Exception as e:
578+
# Always catch exception and never raise error
579+
logger.exception(f"Failed to update submission status to {status}: {e}")
543580

544581
def _get_container_image(self, image_name):
545582
logger.info("Running pull for image: {}".format(image_name))
@@ -549,6 +586,8 @@ def _get_container_image(self, image_name):
549586
with Progress() as progress:
550587
resp = client.pull(image_name, stream=True, decode=True)
551588
for line in resp:
589+
if isinstance(line, dict) and line.get("error"):
590+
raise DockerImagePullException(line["error"])
552591
show_progress(line, progress)
553592
break # Break if the loop is successful to exit "with Progress() as progress"
554593

@@ -684,8 +723,13 @@ async def _run_container_engine_cmd(self, container, kind):
684723
# Creating this and setting 2 values to None in case there is not enough time for the worker to get logs, otherwise we will have errors later on
685724
logs_Unified = [None, None]
686725

726+
# To store on-going logs and avoid empty logs returning to the platform
727+
stdout_chunks = []
728+
stderr_chunks = []
729+
687730
# Create a websocket to send the logs in real time to the codabench instance
688731
# We need to set a timeout for the websocket connection otherwise the program will get stuck if he websocket does not connect.
732+
websocket = None
689733
try:
690734
websocket_url = f"{self.websocket_url}?kind={kind}"
691735
logger.debug(
@@ -732,21 +776,27 @@ async def _run_container_engine_cmd(self, container, kind):
732776
"Show the logs and stream them to codabench " + container.get("Id")
733777
)
734778
for log in container_LogsDemux:
735-
if str(log[0]) != "None":
779+
# Output
780+
if log[0] is not None:
781+
stdout_chunks.append(log[0])
736782
logger.info(log[0].decode())
737783
try:
738-
await websocket.send(
739-
json.dumps({"kind": kind, "message": log[0].decode()})
740-
)
784+
if websocket is not None:
785+
await websocket.send(
786+
json.dumps({"kind": kind, "message": log[0].decode()})
787+
)
741788
except Exception as e:
742789
logger.error(e)
743-
744-
elif str(log[1]) != "None":
790+
791+
# Errors
792+
elif log[1] is not None:
793+
stderr_chunks.append(log[1])
745794
logger.error(log[1].decode())
746795
try:
747-
await websocket.send(
748-
json.dumps({"kind": kind, "message": log[1].decode()})
749-
)
796+
if websocket is not None:
797+
await websocket.send(
798+
json.dumps({"kind": kind, "message": log[1].decode()})
799+
)
750800
except Exception as e:
751801
logger.error(e)
752802

@@ -762,12 +812,17 @@ async def _run_container_engine_cmd(self, container, kind):
762812
# Get the return code of the competition container once done
763813
try:
764814
# Gets the logs of the container, sperating stdout and stderr (first and second position) thanks for demux=True
765-
logs_Unified = client.attach(container, logs=True, demux=True)
766815
return_Code = client.wait(container)
816+
logs_Unified = (b"".join(stdout_chunks), b"".join(stderr_chunks))
767817
logger.debug(
768818
f"WORKER_MARKER: Disconnecting from {websocket_url}, program counter = {self.completed_program_counter}"
769819
)
770-
await websocket.close()
820+
if websocket is not None:
821+
try:
822+
await websocket.close()
823+
await websocket.wait_closed()
824+
except Exception as e:
825+
logger.error(e)
771826
client.remove_container(container, force=True)
772827

773828
logger.debug(
@@ -785,6 +840,13 @@ async def _run_container_engine_cmd(self, container, kind):
785840
logger.error(e)
786841
return_Code = {"StatusCode": 1}
787842

843+
finally:
844+
try:
845+
# Last chance of removing container
846+
client.remove_container(container_id, force=True)
847+
except Exception:
848+
pass
849+
788850
self.logs[kind] = {
789851
"returncode": return_Code["StatusCode"],
790852
"start": start,
@@ -1055,9 +1117,8 @@ async def _run_program_directory(self, program_dir, kind):
10551117
try:
10561118
return await self._run_container_engine_cmd(container, kind=kind)
10571119
except Exception as e:
1058-
logger.error(e)
1059-
if os.environ.get("LOG_LEVEL", "info").lower() == "debug":
1060-
logger.exception(e)
1120+
logger.exception("Program directory execution failed")
1121+
raise SubmissionException(str(e))
10611122

10621123
def _put_dir(self, url, directory):
10631124
"""Zip the directory and send it to the given URL using _put_file."""
@@ -1099,7 +1160,7 @@ def _put_file(self, url, file=None, raw_data=None, content_type="application/zip
10991160
logger.info("Putting file %s in %s" % (file, url))
11001161
data = open(file, "rb")
11011162
headers["Content-Length"] = str(os.path.getsize(file))
1102-
elif raw_data:
1163+
elif raw_data is not None:
11031164
logger.info("Putting raw data %s in %s" % (raw_data, url))
11041165
data = raw_data
11051166
else:
@@ -1196,10 +1257,12 @@ def start(self):
11961257
task_results = [] # will store results/exceptions from gather
11971258
signal.signal(signal.SIGALRM, alarm_handler)
11981259
signal.alarm(self.execution_time_limit)
1260+
11991261
try:
12001262
# run tasks
12011263
# keep what gather returned so we can detect async errors later
12021264
task_results = loop.run_until_complete(gathered_tasks) or []
1265+
12031266
except ExecutionTimeLimitExceeded:
12041267
error_message = f"Execution Time Limit exceeded. Limit was {self.execution_time_limit} seconds"
12051268
logger.error(error_message)
@@ -1233,8 +1296,25 @@ def start(self):
12331296
# Send error through web socket to the frontend
12341297
asyncio.run(self._send_data_through_socket(error_message))
12351298
raise SubmissionException(error_message)
1299+
12361300
finally:
1301+
signal.alarm(0)
12371302
self.watch = False
1303+
1304+
# Cancel any remaining pending tasks before closing the loop
1305+
pending = [t for t in asyncio.all_tasks(loop) if not t.done()]
1306+
for task in pending:
1307+
task.cancel()
1308+
if pending:
1309+
try:
1310+
loop.run_until_complete(asyncio.gather(*pending, return_exceptions=True))
1311+
except Exception:
1312+
pass
1313+
1314+
# Close loop
1315+
asyncio.set_event_loop(None)
1316+
loop.close()
1317+
12381318
for kind, logs in self.logs.items():
12391319
if logs["end"] is not None:
12401320
elapsed_time = logs["end"] - logs["start"]
@@ -1292,7 +1372,7 @@ def start(self):
12921372
program_results, BaseException
12931373
) and not isinstance(program_results, asyncio.CancelledError)
12941374
program_rc = getattr(self, "program_exit_code", None)
1295-
failed_rc = program_rc not in (0, None)
1375+
failed_rc = (program_rc is None) or (program_rc != 0)
12961376
if had_async_exc or failed_rc:
12971377
self._update_status(
12981378
STATUS_FAILED,
@@ -1301,6 +1381,7 @@ def start(self):
13011381
# Raise so upstream marks failed immediately
13021382
raise SubmissionException("Child task failed or non-zero return code")
13031383
self._update_status(STATUS_FINISHED)
1384+
13041385
else:
13051386
self._update_status(STATUS_SCORING)
13061387

0 commit comments

Comments
 (0)