88from threading import Thread
99from typing import TYPE_CHECKING , NamedTuple
1010
11+ import logistro
12+
1113from .kaleido import Kaleido
1214
1315if TYPE_CHECKING :
1416 from typing import Any , Callable
1517
18+ _logger = logistro .getLogger (__name__ )
19+
1620
1721class Task (NamedTuple ):
1822 fn : str
@@ -31,9 +35,10 @@ async def _server(self, *args, **kwargs):
3135 async with Kaleido (* args , ** kwargs ) as k : # multiple processor? Enable GPU?
3236 while True :
3337 task = self ._task_queue .get () # thread dies if main thread dies
38+ _logger .debug (f"Got task for kaleido_sync_server: { task !s} " )
3439 if task is None :
35- self . _task_queue . task_done ( )
36- return
40+ _logger . debug ( "Task was none." )
41+ break
3742 if not hasattr (k , task .fn ):
3843 raise _BadFunctionName (f"Kaleido has no attribute { task .fn } " )
3944 try :
@@ -45,6 +50,9 @@ async def _server(self, *args, **kwargs):
4550
4651 self ._task_queue .task_done ()
4752
53+ self ._task_queue .task_done ()
54+ return # noqa: PLR1711 useless return, but readability
55+
4856 def __new__ (cls ):
4957 # Create the singleton on first instantiation
5058 if cls ._instance is None :
@@ -73,23 +81,38 @@ def open(self, *args: Any, silence_warnings=False, **kwargs: Any) -> None:
7381 )
7482 self ._task_queue : Queue [Task | None ] = Queue ()
7583 self ._return_queue : Queue [Any ] = Queue ()
84+ _logger .debug ("Starting kaleido_sync_server thread." )
7685 self ._thread .start ()
7786 self ._initialized = True
78- close = partial (self .close , silence_warnings = True )
87+ close = partial (self .close , silence_warnings = True , _atexit = True )
88+ _logger .debug ("Registering close with atexit." )
7989 atexit .register (close )
8090
81- def close (self , * , silence_warnings = False ):
91+ # python bug
92+ from time import sleep # noqa: PLC0415 import at top, is hack
93+
94+ sleep (0.1 )
95+ # python seems to sometimes not like calling atext.register
96+ # too close to the end of a program
97+
98+ def close (self , * , silence_warnings = False , _atexit = False ):
8299 """Reset the singleton back to an uninitialized state."""
100+ if _atexit :
101+ _logger .debug ("atexit trying to close kaleido_sync_server" )
83102 if not self .is_running ():
103+ _logger .debug ("Can't close kaleido_sync_server: not running." )
84104 if not silence_warnings :
85105 warnings .warn (
86106 "Server already closed." ,
87107 RuntimeWarning ,
88108 stacklevel = 2 ,
89109 )
90110 return
111+ _logger .debug ("Putting None to thread queue to end." )
91112 self ._task_queue .put (None )
113+ _logger .debug ("Signaled thread to end, now going to join." )
92114 self ._thread .join ()
115+ _logger .debug ("Thread joined." )
93116 del self ._thread
94117 del self ._task_queue
95118 del self ._return_queue
0 commit comments