Skip to content

Commit 6af6fb6

Browse files
committed
Check error_response before processing other ServerToAgent fields per OpAMP spec
1 parent 62cefc5 commit 6af6fb6

2 files changed

Lines changed: 62 additions & 26 deletions

File tree

opamp/opentelemetry-opamp-client/src/opentelemetry/_opamp/agent.py

Lines changed: 24 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -234,25 +234,7 @@ def _run_worker(self) -> None:
234234
break
235235

236236
if message is not None:
237-
if (
238-
message.flags
239-
& opamp_pb2.ServerToAgentFlags_ReportFullState
240-
):
241-
logger.debug("Server requested full state report")
242-
payload = self._client.build_full_state_message()
243-
self.send(payload)
244-
245-
msg_data = MessageData.from_server_message(message)
246-
_safe_invoke(
247-
self._callbacks.on_message, self, self._client, msg_data
248-
)
249-
if message.HasField("error_response"):
250-
_safe_invoke(
251-
self._callbacks.on_error,
252-
self,
253-
self._client,
254-
message.error_response,
255-
)
237+
self._process_message(message)
256238

257239
try:
258240
if job.callback is not None:
@@ -262,6 +244,29 @@ def _run_worker(self) -> None:
262244
finally:
263245
self._queue.task_done()
264246

247+
def _process_message(self, message: opamp_pb2.ServerToAgent) -> None:
248+
if message.HasField("error_response"):
249+
_safe_invoke(
250+
self._callbacks.on_error,
251+
self,
252+
self._client,
253+
message.error_response,
254+
)
255+
return
256+
257+
if message.flags & opamp_pb2.ServerToAgentFlags_ReportFullState:
258+
logger.debug("Server requested full state report")
259+
payload = self._client.build_full_state_message()
260+
self.send(payload)
261+
262+
msg_data = MessageData.from_server_message(message)
263+
_safe_invoke(
264+
self._callbacks.on_message,
265+
self,
266+
self._client,
267+
msg_data,
268+
)
269+
265270
def stop(self, timeout: float | None = None) -> None:
266271
"""
267272
Signal server we are disconnecting and then threads to exit

opamp/opentelemetry-opamp-client/tests/opamp/test_agent.py

Lines changed: 38 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,7 @@ def test_agent_send_enqueues_job():
193193
assert cb.on_message.call_count == 2
194194

195195

196-
def test_on_message_and_on_error_both_called():
196+
def test_on_error_called_without_on_message_for_error_response():
197197
cb = mock.create_autospec(Callbacks, instance=True)
198198
client_mock = mock.Mock()
199199

@@ -217,9 +217,9 @@ def test_on_message_and_on_error_both_called():
217217
sleep(0.1)
218218
agent.stop()
219219

220-
# on_message called for both connection and our message
221-
assert cb.on_message.call_count == 2
222-
# on_error called only for the message with error_response
220+
# on_message called only for connection (not for error_response message)
221+
assert cb.on_message.call_count == 1
222+
# on_error called for the message with error_response
223223
cb.on_error.assert_called_once_with(agent, client_mock, error_response)
224224

225225

@@ -243,8 +243,8 @@ def test_on_error_not_called_without_error_response():
243243
cb.on_error.assert_not_called()
244244

245245

246-
def test_dispatch_order():
247-
"""Verify the opamp-client dispatch order: on_connect -> on_message -> on_error."""
246+
def test_dispatch_order_with_error():
247+
"""Verify that error_response skips on_message: on_connect -> on_error."""
248248
call_order = []
249249
client_mock = mock.Mock()
250250

@@ -276,7 +276,38 @@ def on_error(self, agent, client, error_response):
276276
sleep(0.1)
277277
agent.stop()
278278

279-
assert call_order == ["on_connect", "on_message", "on_error"]
279+
assert call_order == ["on_connect", "on_error"]
280+
281+
282+
def test_dispatch_order_without_error():
283+
"""Verify normal dispatch order: on_connect -> on_message."""
284+
call_order = []
285+
client_mock = mock.Mock()
286+
287+
server_msg = opamp_pb2.ServerToAgent()
288+
289+
class OrderTrackingCallbacks(Callbacks):
290+
def on_connect(self, agent, client):
291+
call_order.append("on_connect")
292+
293+
def on_message(self, agent, client, message):
294+
call_order.append("on_message")
295+
296+
def on_error(self, agent, client, error_response):
297+
call_order.append("on_error")
298+
299+
client_mock.send.side_effect = [
300+
server_msg, # connection message, no error
301+
mock.Mock(), # disconnect
302+
]
303+
agent = OpAMPAgent(
304+
interval=30, client=client_mock, callbacks=OrderTrackingCallbacks()
305+
)
306+
agent.start()
307+
sleep(0.1)
308+
agent.stop()
309+
310+
assert call_order == ["on_connect", "on_message"]
280311

281312

282313
def test_report_full_state_flag_triggers_full_state_send():

0 commit comments

Comments
 (0)