@@ -172,50 +172,62 @@ def open_dbwin_debug_channels() -> Iterator[tuple[int, int, int]]:
172172 yield (h_buffer_ready , h_data_ready , p_view )
173173
174174
175- @contextlib .contextmanager
176- def capture_debug_strings (ready : threading .Event , * , interval : int ) -> Iterator [Queue ]:
177- """Context manager to capture debug strings emitted via `OutputDebugString`.
178- Spawns a listener thread to monitor the debug channels.
179- """
180- captured = Queue ()
181- finished = threading .Event ()
182-
183- def _listener (
184- q : Queue , rdy : threading .Event , fin : threading .Event , pid : int
185- ) -> None :
186- # Create/open named events and file mapping for interprocess communication.
187- # These objects are part of the Windows Debugging API contract.
188- with open_dbwin_debug_channels () as (h_buffer_ready , h_data_ready , p_view ):
189- rdy .set () # Signal to the main thread that listener is ready.
190- while not fin .is_set (): # Loop until the main thread signals to finish.
191- _SetEvent (h_buffer_ready ) # Signal readiness to `OutputDebugString`.
192- # Wait for `OutputDebugString` to signal that data is ready.
193- if _WaitForSingleObject (h_data_ready , interval ) == WAIT_OBJECT_0 :
194- # Debug string buffer format: [4 bytes: PID][N bytes: string].
195- # Check if the process ID in the buffer matches the current PID.
196- if ctypes .cast (p_view , POINTER (DWORD )).contents .value == pid :
197- # Extract the null-terminated string, skipping the PID,
198- # and put it into the queue.
199- q .put (ctypes .string_at (p_view + 4 ).strip (b"\x00 " ))
175+ def _listen_on_dbwin_channel (
176+ interval_ms : int ,
177+ messages : Queue ,
178+ ready : threading .Event ,
179+ stop : threading .Event ,
180+ pid : int ,
181+ ) -> None :
182+ # Create/open named events and file mapping for interprocess communication.
183+ # These objects are part of the Windows Debugging API contract.
184+ with open_dbwin_debug_channels () as (h_buffer_ready , h_data_ready , p_view ):
185+ ready .set () # Signal to the main thread that listener is ready.
186+ while not stop .is_set (): # Loop until the main thread signals to finish.
187+ _SetEvent (h_buffer_ready ) # Signal readiness to `OutputDebugString`.
188+ # Wait for `OutputDebugString` to signal that data is ready.
189+ if _WaitForSingleObject (h_data_ready , interval_ms ) == WAIT_OBJECT_0 :
190+ # Debug string buffer format: [4 bytes: PID][N bytes: string].
191+ # Check if the process ID in the buffer matches the current PID.
192+ if ctypes .cast (p_view , POINTER (DWORD )).contents .value == pid :
193+ # Extract the null-terminated string, skipping the PID,
194+ # and put it into the queue.
195+ messages .put (ctypes .string_at (p_view + 4 ).strip (b"\x00 " ))
200196
197+
198+ @contextlib .contextmanager
199+ def _run_dbwin_listener (ready : threading .Event , interval_ms : int ) -> Iterator [Queue ]:
200+ messages = Queue ()
201+ stop = threading .Event ()
201202 th = threading .Thread (
202- target = _listener ,
203- args = (captured , ready , finished , _GetCurrentProcessId ()),
203+ target = _listen_on_dbwin_channel ,
204+ args = (interval_ms , messages , ready , stop , _GetCurrentProcessId ()),
204205 daemon = True ,
205206 )
206207 th .start ()
207208 try :
208- yield captured
209+ yield messages
209210 finally :
210- finished .set ()
211+ stop .set ()
211212 th .join ()
212213
213214
215+ @contextlib .contextmanager
216+ def capture_debug_strings (* , timeout : float , interval : float ) -> Iterator [Queue ]:
217+ """Context manager to capture debug strings emitted via `OutputDebugString`.
218+ Spawns a listener thread to monitor the debug channels.
219+
220+ Parameters are floats in seconds.
221+ """
222+ ready = threading .Event ()
223+ with _run_dbwin_listener (ready , int (interval * 1000 )) as messages :
224+ ready .wait (timeout = timeout ) # Wait for the listener to be ready
225+ yield messages
226+
227+
214228class Test_OutputDebugStringW (ut .TestCase ):
215229 def test (self ):
216- ready = threading .Event ()
217- with capture_debug_strings (ready , interval = 100 ) as cap :
218- ready .wait (timeout = 5 ) # Wait for the listener to be ready
230+ with capture_debug_strings (timeout = 5 , interval = 0.1 ) as cap :
219231 OutputDebugStringW ("hello world" )
220232 OutputDebugStringW ("test message" )
221233 self .assertEqual (cap .get (), b"hello world" )
@@ -224,15 +236,20 @@ def test(self):
224236
225237class Test_NTDebugHandler (ut .TestCase ):
226238 def test_emit (self ):
227- ready = threading .Event ()
228239 handler = NTDebugHandler ()
229- logger = logging .getLogger ("test_ntdebug_handler" )
240+ # Direct `Logger()` instantiation for test isolation: bypasses global
241+ # registration and prevents any side effects / cross-test pollution.
242+ # (The official 'Loggers should NEVER be instantiated directly' rule
243+ # targets production code where hierarchy and propagation matter;
244+ # here we want neither.)
245+ # https://docs.python.org/3/library/logging.html#logger-objects
246+ logger = logging .Logger ("test_ntdebug_handler" )
230247 # Clear existing handlers to prevent interference from other tests
248+ logger .propagate = False
231249 logger .handlers = []
232250 logger .addHandler (handler )
233251 logger .setLevel (logging .INFO )
234- with capture_debug_strings (ready , interval = 100 ) as cap :
235- ready .wait (timeout = 5 ) # Wait for the listener to be ready
252+ with capture_debug_strings (timeout = 5 , interval = 0.1 ) as cap :
236253 msg = "This is a test message from NTDebugHandler."
237254 logger .info (msg )
238255 logger .removeHandler (handler )
0 commit comments