55import time
66import tempfile
77import unittest
8- from objwatch . sinks . consumer import DynamicRoutingConsumer
8+
99from objwatch .sinks .zmq_sink import ZeroMQSink
10+ from objwatch .sinks .consumer import DynamicRoutingConsumer
1011
1112
1213class TestDynamicRoutingConsumer (unittest .TestCase ):
@@ -28,16 +29,17 @@ def tearDown(self):
2829 """
2930 # Clean up test output files
3031 if os .path .exists (self .temp_dir ):
32+ import logging
3133 for filename in os .listdir (self .temp_dir ):
3234 filepath = os .path .join (self .temp_dir , filename )
3335 try :
3436 os .remove (filepath )
35- except Exception :
36- pass
37+ except Exception as e :
38+ logging . debug ( f"Failed to remove { filepath } : { e } " )
3739 try :
3840 os .rmdir (self .temp_dir )
39- except Exception :
40- pass
41+ except Exception as e :
42+ logging . debug ( f"Failed to remove directory { self . temp_dir } : { e } " )
4143
4244 def test_dynamic_routing_basic (self ):
4345 """
@@ -88,7 +90,7 @@ def test_dynamic_routing_basic(self):
8890 "output_file" : output1 ,
8991 "process_id" : os .getpid (),
9092 }
91-
93+
9294 sink .emit (event1 )
9395 sink .emit (event2 )
9496 sink .emit (event3 )
@@ -114,18 +116,18 @@ def test_dynamic_routing_basic(self):
114116 # Check that at least some messages were received
115117 self .assertTrue (len (content1 ) > 0 , "Output file 1 should contain messages" )
116118 self .assertTrue (len (content2 ) > 0 , "Output file 2 should contain messages" )
117-
119+
118120 # Check for presence of expected messages (may not be all due to ZeroMQ async nature)
119121 if "Message to output1" in content1 :
120122 print ("✓ Received 'Message to output1'" )
121123 else :
122124 print ("✗ Did not receive 'Message to output1' (may be due to ZeroMQ timing)" )
123-
125+
124126 if "Another message to output1" in content1 :
125127 print ("✓ Received 'Another message to output1'" )
126128 else :
127129 print ("✗ Did not receive 'Another message to output1' (may be due to ZeroMQ timing)" )
128-
130+
129131 if "Message to output2" in content2 :
130132 print ("✓ Received 'Message to output2'" )
131133 else :
@@ -172,13 +174,13 @@ def test_file_handle_lru_cache(self):
172174 Test LRU cache for file handles.
173175 """
174176 max_open_files = 3
175-
177+
176178 # Create ZeroMQSink first and bind to endpoint
177179 sink = ZeroMQSink (endpoint = self .endpoint , topic = "" )
178-
180+
179181 # Wait a bit for sink to be ready
180182 time .sleep (0.1 )
181-
183+
182184 consumer = DynamicRoutingConsumer (
183185 endpoint = self .endpoint ,
184186 auto_start = True ,
@@ -226,9 +228,7 @@ def test_consumer_lifecycle(self):
226228 Test proper lifecycle management of DynamicRoutingConsumer.
227229 """
228230 # Create consumer
229- consumer = DynamicRoutingConsumer (
230- endpoint = self .endpoint , auto_start = False , allowed_directories = [self .temp_dir ]
231- )
231+ consumer = DynamicRoutingConsumer (endpoint = self .endpoint , auto_start = False , allowed_directories = [self .temp_dir ])
232232
233233 # Start consumer
234234 consumer .start ()
@@ -284,10 +284,10 @@ def test_process_id_in_output(self):
284284
285285 # Create ZeroMQSink first and bind to endpoint
286286 sink = ZeroMQSink (endpoint = self .endpoint , topic = "" )
287-
287+
288288 # Wait a bit for sink to be ready
289289 time .sleep (0.1 )
290-
290+
291291 # Create and start consumer
292292 consumer = DynamicRoutingConsumer (
293293 endpoint = self .endpoint , auto_start = True , daemon = True , allowed_directories = [self .temp_dir ]
@@ -321,10 +321,10 @@ def test_process_id_in_output(self):
321321 if os .path .exists (output_file ):
322322 with open (output_file , "r" ) as f :
323323 content = f .read ()
324-
324+
325325 # Check that at least some messages were received
326326 self .assertTrue (len (content ) > 0 , "Output file should contain messages" )
327-
327+
328328 # Check for process ID (may not be present if no messages were received)
329329 if "PID:12345" in content :
330330 print ("✓ Process ID found in output" )
0 commit comments