@@ -302,7 +302,9 @@ def get_detailed_results_file_path(self):
302302 async def send_detailed_results (self , file_path ):
303303 logger .info (f"Updating detailed results { file_path } - { self .detailed_results_url } " )
304304 self ._put_file (self .detailed_results_url , file = file_path , content_type = 'text/html' )
305- async with websockets .connect (self .websocket_url ) as websocket :
305+ websocket_url = f"{ self .websocket_url } ?kind=detailed_results"
306+ logger .info (f"Connecting to { websocket_url } for detailed results" )
307+ async with websockets .connect (websocket_url ) as websocket :
306308 await websocket .send (json .dumps ({
307309 "kind" : 'detailed_result_update' ,
308310 }))
@@ -390,10 +392,14 @@ async def _send_data_through_socket(self, error_message):
390392 - Docker image pull failure logs
391393 - Execution time limit exceeded logs
392394 """
393- logger .info (f"Connecting to { self .websocket_url } to send docker image pull error" )
395+ # Create a unique websocket URL for error messages
396+ websocket_url = f"{ self .websocket_url } ?kind=error_logs"
397+ logger .info (f"Connecting to { websocket_url } to send error message" )
398+
399+ logger .info (f"Connecting to { websocket_url } to send docker image pull error" )
394400
395401 # connect to web socket
396- websocket = await websockets .connect (self . websocket_url )
402+ websocket = await websockets .connect (websocket_url )
397403
398404 # define websocket errors
399405 websocket_errors = (socket .gaierror , websockets .WebSocketException , websockets .ConnectionClosedError , ConnectionRefusedError )
@@ -416,7 +422,7 @@ async def _send_data_through_socket(self, error_message):
416422 # no error in websocket message sending
417423 logger .info (f"Error sent successfully through websocket" )
418424
419- logger .info (f"Disconnecting from websocket { self . websocket_url } " )
425+ logger .info (f"Disconnecting from websocket { websocket_url } " )
420426
421427 # close websocket
422428 await websocket .close ()
@@ -500,8 +506,11 @@ async def _run_container_engine_cmd(self, engine_cmd, kind):
500506 }
501507
502508 # Start websocket, it will reconnect in the stdout/stderr listener loop below
503- logger .info (f"Connecting to { self .websocket_url } " )
504- websocket = await websockets .connect (self .websocket_url )
509+ # This ensures each task has its own independent WebSocket connection
510+ websocket_url = f"{ self .websocket_url } ?kind={ kind } "
511+ logger .debug (f"WORKER_MARKER: Connecting to { websocket_url } " )
512+ websocket = await websockets .connect (websocket_url )
513+ # websocket = await websockets.connect(self.websocket_url) # old BB
505514 websocket_errors = (socket .gaierror , websockets .WebSocketException , websockets .ConnectionClosedError , ConnectionRefusedError )
506515
507516 # Function to read a line, if the line is larger than the buffer size we will
@@ -522,7 +531,7 @@ async def _readline_or_chunk(stream):
522531 logs = [self .logs [kind ][key ] for key in ('stdout' , 'stderr' )]
523532 for value in logs :
524533 try :
525- out = await asyncio .wait_for (_readline_or_chunk (value ["stream" ]), timeout = .1 )
534+ out = await asyncio .wait_for (_readline_or_chunk (value ["stream" ]), timeout = 0 .1 )
526535 if out :
527536 value ["data" ] += out
528537 print ("WS: " + str (out ))
@@ -535,32 +544,36 @@ async def _readline_or_chunk(stream):
535544 except asyncio .TimeoutError :
536545 continue
537546 except websocket_errors :
547+ logger .debug ("\n \n Websocket error (line 538)\n \n " )
538548 try :
539549 # do we need to await websocket.close() on the old socket? before making a new one probably not?
540550 await websocket .close ()
541551 except Exception as e :
542552 logger .error (e )
543- logger .info (e )
544553 # TODO: catch proper exceptions here..! What can go wrong failing to close?
545554 pass
546555
547556 # try to reconnect a few times
548557 tries = 0
549558 while tries < 3 and not websocket .open :
550559 try :
551- websocket = await websockets .connect (self .websocket_url )
560+ logger .debug (f"\n \n Attempting to reconnect in 2 seconds (attempt { tries + 1 } /3)" )
561+ websocket = await websockets .connect (websocket_url )
562+ logger .debug (f"\n \n Successfully reconnected to { websocket_url } " )
552563 except websocket_errors :
564+ logger .error (f"\n \n Reconnection attempt { tries + 1 } failed: { websocket_errors } " )
553565 await asyncio .sleep (2 )
554566 tries += 1
555567
556568 self .logs [kind ]["end" ] = time .time ()
557569
558- logger .info (f"Process exited with { proc .returncode } " )
559- logger .info (f"Disconnecting from websocket { self . websocket_url } " )
570+ logger .debug (f"Process exited with { proc .returncode } " )
571+ logger .debug (f"Disconnecting from websocket { websocket_url } " )
560572
561573 # Communicate that the program is closing
562574 self .completed_program_counter += 1
563575
576+ logger .debug (f"WORKER_MARKER: Disconnecting from { websocket_url } , program counter = { self .completed_program_counter } " )
564577 await websocket .close ()
565578
566579 def _get_host_path (self , * paths ):
@@ -609,6 +622,9 @@ async def _run_program_directory(self, program_dir, kind):
609622 logger .info (
610623 "Program directory missing metadata, assuming it's going to be handled by ingestion"
611624 )
625+ # Copy submission files into prediction output
626+ # This is useful for results submissions but wrongly uses storage
627+ shutil .copytree (program_dir , self .output_dir )
612628 return
613629 else :
614630 raise SubmissionException ("Program directory missing 'metadata.yaml/metadata'" )
0 commit comments