55import time
66import tempfile
77import unittest
8- from objwatch . sinks . consumer import DynamicRoutingConsumer
8+
99from objwatch .sinks .zmq_sink import ZeroMQSink
10+ from objwatch .sinks .consumer import DynamicRoutingConsumer
1011
1112
1213class TestDynamicRoutingConsumer (unittest .TestCase ):
@@ -28,16 +29,17 @@ def tearDown(self):
2829 """
2930 # Clean up test output files
3031 if os .path .exists (self .temp_dir ):
32+ import logging
3133 for filename in os .listdir (self .temp_dir ):
3234 filepath = os .path .join (self .temp_dir , filename )
3335 try :
3436 os .remove (filepath )
35- except Exception :
36- pass
37+ except Exception as e :
38+ logging . debug ( f"Failed to remove { filepath } : { e } " )
3739 try :
3840 os .rmdir (self .temp_dir )
39- except Exception :
40- pass
41+ except Exception as e :
42+ logging . debug ( f"Failed to remove directory { self . temp_dir } : { e } " )
4143
4244 def test_dynamic_routing_basic (self ):
4345 """
@@ -59,7 +61,7 @@ def test_dynamic_routing_basic(self):
5961
6062 # Give consumer time to start and connect
6163 # Increase delay to handle ZeroMQ SUB socket's slow joiner problem
62- time .sleep (0.5 )
64+ time .sleep (0.1 )
6365
6466 # Send messages with different output_file
6567 # Send multiple messages to increase chance of reception
@@ -88,14 +90,14 @@ def test_dynamic_routing_basic(self):
8890 "output_file" : output1 ,
8991 "process_id" : os .getpid (),
9092 }
91-
93+
9294 sink .emit (event1 )
9395 sink .emit (event2 )
9496 sink .emit (event3 )
9597 time .sleep (0.1 )
9698
9799 # Give time for messages to be processed
98- time .sleep (0.5 )
100+ time .sleep (0.1 )
99101
100102 # Clean up
101103 consumer .stop ()
@@ -114,18 +116,18 @@ def test_dynamic_routing_basic(self):
114116 # Check that at least some messages were received
115117 self .assertTrue (len (content1 ) > 0 , "Output file 1 should contain messages" )
116118 self .assertTrue (len (content2 ) > 0 , "Output file 2 should contain messages" )
117-
119+
118120 # Check for presence of expected messages (may not be all due to ZeroMQ async nature)
119121 if "Message to output1" in content1 :
120122 print ("✓ Received 'Message to output1'" )
121123 else :
122124 print ("✗ Did not receive 'Message to output1' (may be due to ZeroMQ timing)" )
123-
125+
124126 if "Another message to output1" in content1 :
125127 print ("✓ Received 'Another message to output1'" )
126128 else :
127129 print ("✗ Did not receive 'Another message to output1' (may be due to ZeroMQ timing)" )
128-
130+
129131 if "Message to output2" in content2 :
130132 print ("✓ Received 'Message to output2'" )
131133 else :
@@ -172,13 +174,13 @@ def test_file_handle_lru_cache(self):
172174 Test LRU cache for file handles.
173175 """
174176 max_open_files = 3
175-
177+
176178 # Create ZeroMQSink first and bind to endpoint
177179 sink = ZeroMQSink (endpoint = self .endpoint , topic = "" )
178-
180+
179181 # Wait a bit for sink to be ready
180182 time .sleep (0.1 )
181-
183+
182184 consumer = DynamicRoutingConsumer (
183185 endpoint = self .endpoint ,
184186 auto_start = True ,
@@ -189,7 +191,7 @@ def test_file_handle_lru_cache(self):
189191
190192 # Give consumer time to start and connect
191193 # Increase delay to handle ZeroMQ SUB socket's slow joiner problem
192- time .sleep (0.5 )
194+ time .sleep (0.1 )
193195
194196 # Create more output files than max_open_files
195197 output_files = [os .path .join (self .temp_dir , f"output{ i } .log" ) for i in range (5 )]
@@ -209,7 +211,7 @@ def test_file_handle_lru_cache(self):
209211 time .sleep (0.05 )
210212
211213 # Give time for messages to be processed
212- time .sleep (0.5 )
214+ time .sleep (0.1 )
213215
214216 # Clean up
215217 consumer .stop ()
@@ -226,20 +228,18 @@ def test_consumer_lifecycle(self):
226228 Test proper lifecycle management of DynamicRoutingConsumer.
227229 """
228230 # Create consumer
229- consumer = DynamicRoutingConsumer (
230- endpoint = self .endpoint , auto_start = False , allowed_directories = [self .temp_dir ]
231- )
231+ consumer = DynamicRoutingConsumer (endpoint = self .endpoint , auto_start = False , allowed_directories = [self .temp_dir ])
232232
233233 # Start consumer
234234 consumer .start ()
235- time .sleep (0.2 )
235+ time .sleep (0.1 )
236236
237237 # Verify consumer is running
238238 self .assertTrue (consumer .running , "Consumer should be running after start()" )
239239
240240 # Stop consumer
241241 consumer .stop ()
242- time .sleep (0.2 )
242+ time .sleep (0.1 )
243243
244244 # Verify consumer has stopped
245245 self .assertFalse (consumer .running , "Consumer should not be running after stop()" )
@@ -254,7 +254,7 @@ def test_consumer_context_manager(self):
254254 ) as consumer :
255255 # Start consumer within context
256256 consumer .start ()
257- time .sleep (0.2 )
257+ time .sleep (0.1 )
258258 self .assertTrue (consumer .running , "Consumer should be running within context" )
259259
260260 # Verify consumer has been stopped after context exit
@@ -284,18 +284,18 @@ def test_process_id_in_output(self):
284284
285285 # Create ZeroMQSink first and bind to endpoint
286286 sink = ZeroMQSink (endpoint = self .endpoint , topic = "" )
287-
287+
288288 # Wait a bit for sink to be ready
289289 time .sleep (0.1 )
290-
290+
291291 # Create and start consumer
292292 consumer = DynamicRoutingConsumer (
293293 endpoint = self .endpoint , auto_start = True , daemon = True , allowed_directories = [self .temp_dir ]
294294 )
295295
296296 # Give consumer time to start and connect
297297 # Increase delay to handle ZeroMQ SUB socket's slow joiner problem
298- time .sleep (0.5 )
298+ time .sleep (0.1 )
299299
300300 # Send multiple messages to increase chance of reception
301301 for _ in range (5 ):
@@ -311,7 +311,7 @@ def test_process_id_in_output(self):
311311 time .sleep (0.1 )
312312
313313 # Give time for messages to be processed
314- time .sleep (0.5 )
314+ time .sleep (0.1 )
315315
316316 # Clean up
317317 consumer .stop ()
@@ -321,10 +321,10 @@ def test_process_id_in_output(self):
321321 if os .path .exists (output_file ):
322322 with open (output_file , "r" ) as f :
323323 content = f .read ()
324-
324+
325325 # Check that at least some messages were received
326326 self .assertTrue (len (content ) > 0 , "Output file should contain messages" )
327-
327+
328328 # Check for process ID (may not be present if no messages were received)
329329 if "PID:12345" in content :
330330 print ("✓ Process ID found in output" )
0 commit comments