22import glob
33import hashlib
44import json
5- import logging
65import os
76import shutil
87import signal
2625from kombu import Queue , Exchange
2726from urllib3 import Retry
2827
29- logger = logging .getLogger ()
28+ # This is only needed for the pytests to pass
29+ import sys
30+ sys .path .append ('/app/src/settings/' )
3031
32+ from celery import signals
33+ import logging
34+ logger = logging .getLogger (__name__ )
35+ from logs_loguru import configure_logging
3136
3237# -----------------------------------------------
3338# Celery + Rabbit MQ
3439# -----------------------------------------------
40+ @signals .setup_logging .connect
41+ def setup_celery_logging (** kwargs ):
42+ pass
3543# Init celery + rabbit queue definitions
3644app = Celery ()
3745app .config_from_object ('celery_config' ) # grabs celery_config.py
4048 Queue ('compute-worker' , Exchange ('compute-worker' ), routing_key = 'compute-worker' , queue_arguments = {'x-max-priority' : 10 }),
4149]
4250
43-
51+ # -----------------------------------------------
52+ # Logging
53+ # -----------------------------------------------
54+ configure_logging (os .environ .get ("LOG_LEVEL" , "INFO" ),os .environ .get ("SERIALIZED" , 'false' ))
4455# -----------------------------------------------
4556# Directories
4657# -----------------------------------------------
@@ -98,7 +109,6 @@ class DockerImagePullException(Exception):
98109class ExecutionTimeLimitExceeded (Exception ):
99110 pass
100111
101-
102112# -----------------------------------------------------------------------------
103113# The main compute worker entrypoint, this is how a job is ran at the highest
104114# level.
@@ -337,7 +347,7 @@ def _update_submission(self, data):
337347 if resp .status_code == 200 :
338348 logger .info ("Submission updated successfully!" )
339349 else :
340- logger .info (f"Submission patch failed with status = { resp .status_code } , and response = \n { resp .content } " )
350+ logger .error (f"Submission patch failed with status = { resp .status_code } , and response = \n { resp .content } " )
341351 raise SubmissionException ("Failure updating submission data." )
342352
343353 def _update_status (self , status , extra_information = None ):
@@ -370,7 +380,7 @@ def _get_container_image(self, image_name):
370380 retries += 1
371381 if retries >= max_retries :
372382 error_message = f"Pull for image: { image_name } returned a non-zero exit code! Check if the docker image exists on docker hub. { pull_error } "
373- logger .info (error_message )
383+ logger .error (error_message )
374384 # Prepare data to be sent to submissions api
375385 docker_pull_fail_data = {
376386 "type" : "Docker_Image_Pull_Fail" ,
@@ -383,7 +393,7 @@ def _get_container_image(self, image_name):
383393 asyncio .run (self ._send_data_through_socket (error_message ))
384394 raise DockerImagePullException (f"Pull for { image_name } failed!" )
385395 else :
386- logger .info ("Failed. Retrying in 5 seconds..." )
396+ logger .warning ("Failed. Retrying in 5 seconds..." )
387397 time .sleep (5 ) # Wait 5 seconds before retrying
388398
389399 async def _send_data_through_socket (self , error_message ):
@@ -413,7 +423,7 @@ async def _send_data_through_socket(self, error_message):
413423
414424 except websocket_errors :
415425 # handle websocket errors
416- logger .info (f"Error sending failed through websocket" )
426+ logger .error (f"Error sending failed through websocket" )
417427 try :
418428 await websocket .close ()
419429 except Exception as e :
@@ -467,7 +477,7 @@ def _get_bundle(self, url, destination, cache=True):
467477 if retries >= max_retries :
468478 raise # Re-raise the last caught BadZipFile exception
469479 else :
470- logger .info ("Failed. Retrying in 60 seconds..." )
480+ logger .warning ("Failed. Retrying in 60 seconds..." )
471481 time .sleep (60 ) # Wait 60 seconds before retrying
472482 # Return the zip file path for other uses, e.g. for creating a MD5 hash to identify it
473483 return bundle_file
@@ -544,7 +554,7 @@ async def _readline_or_chunk(stream):
544554 except asyncio .TimeoutError :
545555 continue
546556 except websocket_errors :
547- logger .debug ("\n \n Websocket error (line 538)\n \n " )
557+ logger .error ("\n \n Websocket error (line 538)\n \n " )
548558 try :
549559 # do we need to await websocket.close() on the old socket? before making a new one probably not?
550560 await websocket .close ()
@@ -557,7 +567,7 @@ async def _readline_or_chunk(stream):
557567 tries = 0
558568 while tries < 3 and not websocket .open :
559569 try :
560- logger .debug (f"\n \n Attempting to reconnect in 2 seconds (attempt { tries + 1 } /3)" )
570+ logger .warning (f"\n \n Attempting to reconnect in 2 seconds (attempt { tries + 1 } /3)" )
561571 websocket = await websockets .connect (websocket_url )
562572 logger .debug (f"\n \n Successfully reconnected to { websocket_url } " )
563573 except websocket_errors :
@@ -606,7 +616,7 @@ async def _run_program_directory(self, program_dir, kind):
606616 """
607617 # If the directory doesn't even exist, move on
608618 if not os .path .exists (program_dir ):
609- logger .info (f"{ program_dir } not found, no program to execute" )
619+ logger .error (f"{ program_dir } not found, no program to execute" )
610620
611621 # Communicate that the program is closing
612622 self .completed_program_counter += 1
@@ -619,7 +629,7 @@ async def _run_program_directory(self, program_dir, kind):
619629 else :
620630 # Display a warning in logs when there is no metadata file in submission/program dir
621631 if kind == "program" :
622- logger .info (
632+ logger .warning (
623633 "Program directory missing metadata, assuming it's going to be handled by ingestion"
624634 )
625635 return
@@ -636,12 +646,13 @@ async def _run_program_directory(self, program_dir, kind):
636646 else :
637647 command = None
638648 except yaml .YAMLError as e :
649+ logger .error ("Error parsing YAML file: " , e )
639650 print ("Error parsing YAML file: " , e )
640651 command = None
641652 if not command and kind == "ingestion" :
642653 raise SubmissionException ("Program directory missing 'command' in metadata" )
643654 elif not command :
644- logger .info (
655+ logger .warning (
645656 f"Warning: { program_dir } has no command in metadata, continuing anyway "
646657 f"(may be meant to be consumed by an ingestion program)"
647658 )
@@ -852,7 +863,7 @@ def start(self):
852863 loop .run_until_complete (gathered_tasks )
853864 except ExecutionTimeLimitExceeded :
854865 error_message = f"Execution Time Limit exceeded. Limit was { self .execution_time_limit } seconds"
855- logger .info (error_message )
866+ logger .error (error_message )
856867 # Prepare data to be sent to submissions api
857868 execution_time_limit_exceeded_data = {
858869 "type" : "Execution_Time_Limit_Exceeded" ,
@@ -873,14 +884,14 @@ def start(self):
873884 elapsed_time = self .execution_time_limit
874885 return_code = logs ["proc" ].returncode
875886 if return_code is None :
876- logger .info ('No return code from Process. Killing it' )
887+ logger .warning ('No return code from Process. Killing it' )
877888 if kind == 'ingestion' :
878889 program_to_kill = self .ingestion_container_name
879890 else :
880891 program_to_kill = self .program_container_name
881892 # Try and stop the program. If stop does not succeed
882893 kill_code = subprocess .call ([CONTAINER_ENGINE_EXECUTABLE , 'stop' , str (program_to_kill )])
883- logger .info (f'Kill process returned { kill_code } ' )
894+ logger .warning (f'Kill process returned { kill_code } ' )
884895 if kind == 'program' :
885896 self .program_exit_code = return_code
886897 self .program_elapsed_time = elapsed_time
@@ -966,7 +977,7 @@ def push_output(self):
966977
967978 def clean_up (self ):
968979 if os .environ .get ("CODALAB_IGNORE_CLEANUP_STEP" ):
969- logger .info (f"CODALAB_IGNORE_CLEANUP_STEP mode enabled, ignoring clean up of: { self .root_dir } " )
980+ logger .warning (f"CODALAB_IGNORE_CLEANUP_STEP mode enabled, ignoring clean up of: { self .root_dir } " )
970981 return
971982
972983 logger .info (f"Destroying submission temp dir: { self .root_dir } " )
0 commit comments