Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 20 additions & 8 deletions compute_worker/compute_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from rich.pretty import pprint
import requests
import websockets
from websockets.exceptions import InvalidStatusCode
import yaml
from billiard.exceptions import SoftTimeLimitExceeded
from celery import Celery, shared_task, utils
Expand Down Expand Up @@ -84,10 +85,7 @@
# -----------------------------------------------
# Show Progress bar on downloading images
# -----------------------------------------------
tasks = {}


def show_progress(line, progress):
def show_progress(line, progress, tasks):
try:
status = line.get("status") or ""
layer_id = line.get("id")
Expand Down Expand Up @@ -585,6 +583,7 @@ def _update_status(self, status, extra_information=None):
logger.exception(f"Failed to update submission status to {status}: {e}")

def _get_container_image(self, image_name):
tasks = {}
logger.info("Running pull for image: {}".format(image_name))
retries, max_retries = (0, 3)
while retries < max_retries:
Expand All @@ -594,7 +593,7 @@ def _get_container_image(self, image_name):
for line in resp:
if isinstance(line, dict) and line.get("error"):
raise DockerImagePullException(line["error"])
show_progress(line, progress)
show_progress(line, progress, tasks)
break # Break if the loop is successful to exit "with Progress() as progress"

except (docker.errors.APIError, Exception) as pull_error:
Expand Down Expand Up @@ -753,6 +752,12 @@ async def _run_container_engine_cmd(self, container, kind):
+ "for container "
+ str(container.get("Id"))
)
except InvalidStatusCode as e:
logger.error(
f"There was an error trying to connect to the websocket on the codabench instance: {type(e)}"
)
logger.exception(e)
websocket = None
except Exception as e:
logger.error(
f"There was an error trying to connect to the websocket on the codabench instance: {e}"
Expand Down Expand Up @@ -884,16 +889,16 @@ def _get_host_path(self, *paths):
# Take our list of paths and smash 'em together
path = os.path.join(*paths)

# Create if necessary
os.makedirs(path, exist_ok=True)

# pull front of path, which points to the location inside the container
path = path[len(BASE_DIR) :]

# add host to front, so when we run commands in the container on the host they
# can be seen properly
path = os.path.join(HOST_DIRECTORY, path)

# Create if necessary
os.makedirs(path, exist_ok=True)

return path

async def _run_program_directory(self, program_dir, kind):
Expand Down Expand Up @@ -1251,6 +1256,13 @@ def start(self):
ingestion_program_dir = os.path.join(self.root_dir, "ingestion_program")

logger.info("Running scoring program, and then ingestion program")
try:
loop = asyncio.get_running_loop()
loop.close()
except RuntimeError as e:
logger.error("Error while closing running event loop: No event loop.")
# logger.exception(e)

loop = asyncio.new_event_loop()
# Set the event loop for the gather
asyncio.set_event_loop(loop)
Expand Down