Skip to content

Commit f5e1ce2

Browse files
committed
fix CI error
1 parent 8851e19 commit f5e1ce2

4 files changed

Lines changed: 24 additions & 13 deletions

File tree

objwatch/sinks/consumer.py

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,13 @@ def _connect(self) -> None:
6868
self.logger.info(f"Subscribed to topic: {self.topic.decode('utf-8') if self.topic else 'all topics'}")
6969
except zmq.ZMQError as e:
7070
self.logger.error(f"Failed to connect to ZeroMQ endpoint {self.endpoint}: {e}")
71-
raise
71+
# Clean up resources if partially initialized
72+
if self.socket:
73+
self.socket.close()
74+
self.socket = None
75+
if self.context:
76+
self.context.term()
77+
self.context = None
7278

7379
def _disconnect(self) -> None:
7480
"""
@@ -106,25 +112,23 @@ def _run(self) -> None:
106112
"""
107113
The main run loop that listens for messages and writes them to file.
108114
"""
109-
self._connect()
110-
111115
try:
112116
with open(self.output_file, 'a', encoding='utf-8') as f:
113117
self.logger.info(f"Writing events to file: {self.output_file}")
114118

115119
while self.running:
116120
try:
117121
if self.socket is None:
118-
self.logger.error("Socket is None, attempting to reconnect")
122+
self.logger.info("Attempting to connect to ZeroMQ endpoint...")
119123
self._connect()
120124
if self.socket is None:
125+
self.logger.error("Failed to establish connection, will retry")
121126
time.sleep(0.1)
122127
continue
123128

124129
# Receive multipart message [topic, payload]
125130
msg_parts = self.socket.recv_multipart()
126131
if len(msg_parts) == 2:
127-
received_topic = msg_parts[0].decode('utf-8')
128132
payload = msgpack.unpackb(msg_parts[1], raw=False)
129133

130134
# Process and write the event to file
@@ -134,6 +138,12 @@ def _run(self) -> None:
134138
except zmq.Again:
135139
# Timeout occurred, continue the loop
136140
continue
141+
except zmq.ZMQError as e:
142+
self.logger.error(f"ZeroMQ error: {e}")
143+
# Reset socket to trigger reconnection
144+
self.socket = None
145+
time.sleep(0.1)
146+
continue
137147
except Exception as e:
138148
self.logger.error(f"Error processing message: {e}")
139149
continue

objwatch/sinks/factory.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ def get_sink(config: ObjWatchConfig) -> BaseSink:
1818
BaseSink: The configured sink instance.
1919
"""
2020
if config.output_mode == 'zmq':
21-
return ZeroMQSink(endpoint=config.zmq_endpoint)
21+
return ZeroMQSink(endpoint=config.zmq_endpoint, topic=config.zmq_topic)
2222
else:
2323
# Default to StandardSink
2424
# It handles output file internally if config.output is set

tests/test_zmq_e2e.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ def setUp(self):
2222
# Use a unique port for each test to avoid conflicts
2323
self.endpoint = "tcp://127.0.0.1:5555"
2424
self.topic = "test_topic"
25-
self.consumer_output = tempfile.mktemp(suffix=".log")
25+
self.consumer_output = tempfile.NamedTemporaryFile(suffix=".log", delete=False).name
2626

2727
# Clean up any existing output file
2828
if os.path.exists(self.consumer_output):
@@ -126,7 +126,7 @@ def test_zmq_topic_filtering(self):
126126
Note: This test may fail occasionally due to ZeroMQ's asynchronous nature and SUB socket's "slow joiner" problem.
127127
"""
128128
# Simplified test: create one consumer with a specific topic and send matching messages
129-
consumer_output = tempfile.mktemp(suffix=".log")
129+
consumer_output = tempfile.NamedTemporaryFile(suffix=".log", delete=False).name
130130

131131
# Create consumer with topic "test_topic"
132132
consumer = ZeroMQFileConsumer(

tests/test_zmq_integration.py

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,8 @@ def test_zmq_integration():
7070
with open(output, 'r') as f:
7171
content = f.read()
7272

73-
print(f"Output file contains {content.count('\n')} lines")
73+
newline = '\n'
74+
print(f"Output file contains {content.count(newline)} lines")
7475
print("First 3 lines:")
7576
for line in content.split('\n')[:3]:
7677
if line:
@@ -81,20 +82,20 @@ def test_zmq_integration():
8182
print(f"Found {test_messages_found}/5 test messages in the output file")
8283
if test_messages_found > 0:
8384
print("Test PASSED: ZeroMQ integration works correctly!")
84-
return True
85+
assert True # For pytest
8586
else:
8687
print("Test FAILED: No test messages found in the output file")
87-
return False
88+
assert False # For pytest
8889
else:
8990
print(f"Test FAILED: Output file {output} was not created")
90-
return False
91+
assert False # For pytest
9192

9293
except Exception as e:
9394
print(f"Test FAILED with exception: {e}")
9495
import traceback
9596

9697
traceback.print_exc()
97-
return False
98+
assert False # For pytest
9899

99100
finally:
100101
# Clean up

0 commit comments

Comments
 (0)