Skip to content

Commit 582f8d6

Browse files
authored
Merge pull request #183 from rostilos/1.5.7-rc
feat: Enhance error handling and result validation in CommandQueueCon…
2 parents 7a2edbc + 647caf2 commit 582f8d6

4 files changed

Lines changed: 290 additions & 12 deletions

File tree

python-ecosystem/inference-orchestrator/src/server/command_queue_consumer.py

Lines changed: 64 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,14 @@ class CommandQueueConsumer:
1818
using the CommandService. Events and final results are pushed back
1919
to a job-specific Redis event queue.
2020
"""
21+
22+
EMPTY_RESULT_SENTINELS = {
23+
"null",
24+
"none",
25+
"no output generated",
26+
"failed to generate summary",
27+
"i couldn't generate an answer. please try rephrasing your question.",
28+
}
2129

2230
def __init__(self, command_service: CommandService):
2331
self.command_service = command_service
@@ -114,18 +122,45 @@ def event_callback(event: Dict[str, Any]):
114122
result = await self.command_service.process_ask(request_dto, event_callback)
115123
else:
116124
raise ValueError(f"Unknown command type: {command_type}")
117-
125+
126+
if self._has_error(result):
127+
error_message = self._get_result_value(result, "error", "AI command failed")
128+
await self._publish_event(event_queue_key, {
129+
"type": "error",
130+
"message": str(error_message)
131+
})
132+
logger.warning(f"Command Job ID {job_id} failed: {error_message}")
133+
return
134+
118135
# Format output correctly depending on command type based on their DTO responses
119136
final_payload = {}
120137
if command_type == "summarize":
138+
summary = self._get_result_value(result, "summary")
139+
if not self._has_usable_text(summary):
140+
await self._publish_event(event_queue_key, {
141+
"type": "error",
142+
"message": "AI service returned an empty summary"
143+
})
144+
logger.warning(f"Command Job ID {job_id} failed: empty summarize result")
145+
return
146+
121147
final_payload = {
122-
"summary": result.summary if hasattr(result, "summary") else result.get("summary"),
123-
"diagram": result.diagram if hasattr(result, "diagram") else result.get("diagram"),
124-
"diagramType": result.diagramType if hasattr(result, "diagramType") else result.get("diagramType", "MERMAID")
148+
"summary": str(summary),
149+
"diagram": self._string_or_empty(self._get_result_value(result, "diagram")),
150+
"diagramType": self._string_or_empty(self._get_result_value(result, "diagramType", "MERMAID")) or "MERMAID"
125151
}
126152
elif command_type == "ask":
153+
answer = self._get_result_value(result, "answer")
154+
if not self._has_usable_text(answer):
155+
await self._publish_event(event_queue_key, {
156+
"type": "error",
157+
"message": "AI service returned an empty answer"
158+
})
159+
logger.warning(f"Command Job ID {job_id} failed: empty ask result")
160+
return
161+
127162
final_payload = {
128-
"answer": result.answer if hasattr(result, "answer") else result.get("answer")
163+
"answer": str(answer)
129164
}
130165

131166
event_callback({"type": "final", "result": final_payload})
@@ -156,3 +191,27 @@ async def _publish_event(self, key: str, event: Dict[str, Any]):
156191
await self._redis.lpush(key, event_json)
157192
except Exception as e:
158193
logger.error(f"Failed to publish event to {key}: {e}")
194+
195+
@staticmethod
196+
def _get_result_value(result: Any, key: str, default: Any = None) -> Any:
197+
if isinstance(result, dict):
198+
return result.get(key, default)
199+
if hasattr(result, key):
200+
return getattr(result, key)
201+
return default
202+
203+
@classmethod
204+
def _has_error(cls, result: Any) -> bool:
205+
error = cls._get_result_value(result, "error")
206+
return error is not None and str(error).strip() != ""
207+
208+
@staticmethod
209+
def _has_usable_text(value: Any) -> bool:
210+
if value is None:
211+
return False
212+
text = str(value).strip()
213+
return bool(text) and text.lower() not in CommandQueueConsumer.EMPTY_RESULT_SENTINELS
214+
215+
@staticmethod
216+
def _string_or_empty(value: Any) -> str:
217+
return "" if value is None else str(value)

python-ecosystem/inference-orchestrator/src/service/command/command_service.py

Lines changed: 63 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,14 @@ class CommandService:
3030
# Hard timeout ceiling for commands (seconds). Configurable via .env
3131
COMMAND_TIMEOUT_SECONDS = int(os.environ.get("COMMAND_TIMEOUT_SECONDS", "600"))
3232

33+
EMPTY_RESULT_SENTINELS = {
34+
"null",
35+
"none",
36+
"no output generated",
37+
"failed to generate summary",
38+
"i couldn't generate an answer. please try rephrasing your question.",
39+
}
40+
3341
def __init__(self):
3442
load_dotenv(interpolate=False)
3543
self.default_jar_path = os.environ.get(
@@ -112,6 +120,11 @@ async def process_summarize(
112120
except Exception as close_err:
113121
logger.warning(f"Error closing MCP sessions: {close_err}")
114122

123+
result = self._normalize_summarize_result(result, supports_mermaid=False)
124+
if "error" in result:
125+
self._emit_event(event_callback, {"type": "error", "message": result["error"]})
126+
return result
127+
115128
self._emit_event(event_callback, {
116129
"type": "status",
117130
"state": "completed",
@@ -222,6 +235,11 @@ async def process_ask(
222235
except Exception as close_err:
223236
logger.warning(f"Error closing MCP sessions: {close_err}")
224237

238+
result = self._normalize_ask_result(result)
239+
if "error" in result:
240+
self._emit_event(event_callback, {"type": "error", "message": result["error"]})
241+
return result
242+
225243
self._emit_event(event_callback, {
226244
"type": "status",
227245
"state": "completed",
@@ -241,7 +259,49 @@ async def process_ask(
241259
sanitized_msg = create_user_friendly_error(e)
242260
self._emit_event(event_callback, {"type": "error", "message": sanitized_msg})
243261
return {"error": sanitized_msg}
244-
262+
263+
def _normalize_summarize_result(self, result: Any, supports_mermaid: bool) -> Dict[str, Any]:
264+
"""Validate summarize output before the queue consumer publishes a final event."""
265+
if not isinstance(result, dict):
266+
return {"error": "AI service returned an invalid summarize result"}
267+
if result.get("error"):
268+
return {"error": str(result["error"])}
269+
270+
summary = result.get("summary")
271+
if not self._has_usable_text(summary):
272+
return {"error": "AI service returned an empty summary"}
273+
274+
diagram_type = result.get("diagramType") or ("MERMAID" if supports_mermaid else "ASCII")
275+
return {
276+
"summary": str(summary),
277+
"diagram": self._string_or_empty(result.get("diagram")),
278+
"diagramType": str(diagram_type),
279+
}
280+
281+
def _normalize_ask_result(self, result: Any) -> Dict[str, Any]:
282+
"""Validate ask output before the queue consumer publishes a final event."""
283+
if not isinstance(result, dict):
284+
return {"error": "AI service returned an invalid ask result"}
285+
if result.get("error"):
286+
return {"error": str(result["error"])}
287+
288+
answer = result.get("answer")
289+
if not self._has_usable_text(answer):
290+
return {"error": "AI service returned an empty answer"}
291+
292+
return {"answer": str(answer)}
293+
294+
@classmethod
295+
def _has_usable_text(cls, value: Any) -> bool:
296+
if value is None:
297+
return False
298+
text = str(value).strip()
299+
return bool(text) and text.lower() not in cls.EMPTY_RESULT_SENTINELS
300+
301+
@staticmethod
302+
def _string_or_empty(value: Any) -> str:
303+
return "" if value is None else str(value)
304+
245305
def _build_platform_jvm_props(self, request) -> Dict[str, str]:
246306
"""Build JVM properties for Platform MCP server (API + VCS access)."""
247307
props = {
@@ -701,11 +761,7 @@ async def _execute_summarize(
701761
"diagramType": "MERMAID" if supports_mermaid else "ASCII"
702762
}
703763
else:
704-
return {
705-
"summary": "Failed to generate summary",
706-
"diagram": "",
707-
"diagramType": "MERMAID" if supports_mermaid else "ASCII"
708-
}
764+
return {"error": "AI service returned an empty summary"}
709765

710766
except Exception as e:
711767
logger.error(f"Summarize agent error: {e}", exc_info=True)
@@ -821,7 +877,7 @@ async def _execute_ask(
821877
else:
822878
return {"answer": final_result}
823879
else:
824-
return {"answer": "I couldn't generate an answer. Please try rephrasing your question."}
880+
return {"error": "AI service returned an empty answer"}
825881

826882
except Exception as e:
827883
logger.error(f"Ask agent error: {e}", exc_info=True)
Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
import asyncio
2+
import json
3+
from unittest.mock import AsyncMock, MagicMock
4+
5+
import pytest
6+
7+
from server.command_queue_consumer import CommandQueueConsumer
8+
9+
10+
class FakeRedis:
11+
def __init__(self):
12+
self.events = []
13+
14+
async def lpush(self, key, value):
15+
self.events.append((key, json.loads(value)))
16+
17+
18+
def _ask_request():
19+
return {
20+
"projectId": 1,
21+
"projectVcsWorkspace": "ws",
22+
"projectVcsRepoSlug": "repo",
23+
"projectWorkspace": "workspace",
24+
"projectNamespace": "namespace",
25+
"aiProvider": "OPENAI_COMPATIBLE",
26+
"aiModel": "model",
27+
"aiApiKey": "key",
28+
"question": "describe this PR",
29+
"pullRequestId": 7,
30+
}
31+
32+
33+
def _summarize_request():
34+
return {
35+
"projectId": 1,
36+
"projectVcsWorkspace": "ws",
37+
"projectVcsRepoSlug": "repo",
38+
"projectWorkspace": "workspace",
39+
"projectNamespace": "namespace",
40+
"aiProvider": "OPENAI_COMPATIBLE",
41+
"aiModel": "model",
42+
"aiApiKey": "key",
43+
"pullRequestId": 7,
44+
}
45+
46+
47+
def _payload(command_type, request):
48+
return json.dumps({
49+
"job_id": f"job-{command_type}",
50+
"command_type": command_type,
51+
"request": request,
52+
})
53+
54+
55+
def _consumer(command_service):
56+
consumer = CommandQueueConsumer(command_service)
57+
consumer._redis = FakeRedis()
58+
return consumer
59+
60+
61+
async def _handle_and_collect_events(consumer, payload):
62+
await consumer._handle_job(payload)
63+
await asyncio.sleep(0)
64+
await asyncio.sleep(0)
65+
return [event for _, event in consumer._redis.events]
66+
67+
68+
@pytest.mark.asyncio(loop_scope="function")
69+
async def test_error_result_is_published_as_error_without_final():
70+
command_service = MagicMock()
71+
command_service.process_ask = AsyncMock(return_value={"error": "provider failed"})
72+
consumer = _consumer(command_service)
73+
74+
events = await _handle_and_collect_events(consumer, _payload("ask", _ask_request()))
75+
76+
assert any(event["type"] == "error" and event["message"] == "provider failed" for event in events)
77+
assert not any(event["type"] == "final" for event in events)
78+
79+
80+
@pytest.mark.asyncio(loop_scope="function")
81+
async def test_empty_ask_answer_is_published_as_error_without_final():
82+
command_service = MagicMock()
83+
command_service.process_ask = AsyncMock(return_value={"answer": None})
84+
consumer = _consumer(command_service)
85+
86+
events = await _handle_and_collect_events(consumer, _payload("ask", _ask_request()))
87+
88+
assert any(
89+
event["type"] == "error" and event["message"] == "AI service returned an empty answer"
90+
for event in events
91+
)
92+
assert not any(event["type"] == "final" for event in events)
93+
94+
95+
@pytest.mark.asyncio(loop_scope="function")
96+
async def test_successful_ask_answer_is_published_as_final():
97+
command_service = MagicMock()
98+
command_service.process_ask = AsyncMock(return_value={"answer": "42"})
99+
consumer = _consumer(command_service)
100+
101+
events = await _handle_and_collect_events(consumer, _payload("ask", _ask_request()))
102+
103+
assert {"type": "final", "result": {"answer": "42"}} in events
104+
assert not any(event["type"] == "error" for event in events)
105+
106+
107+
@pytest.mark.asyncio(loop_scope="function")
108+
async def test_empty_summarize_result_is_published_as_error_without_final():
109+
command_service = MagicMock()
110+
command_service.process_summarize = AsyncMock(return_value={"summary": "No output generated"})
111+
consumer = _consumer(command_service)
112+
113+
events = await _handle_and_collect_events(consumer, _payload("summarize", _summarize_request()))
114+
115+
assert any(
116+
event["type"] == "error" and event["message"] == "AI service returned an empty summary"
117+
for event in events
118+
)
119+
assert not any(event["type"] == "final" for event in events)

python-ecosystem/inference-orchestrator/tests/test_command_service.py

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -310,6 +310,50 @@ def test_with_newlines(self, service):
310310
assert "line1" in result
311311

312312

313+
# -- _normalize_*_result -----------------------------------------
314+
315+
class TestNormalizeSummarizeResult:
316+
def test_preserves_provider_error(self, service):
317+
result = service._normalize_summarize_result({"error": "provider failed"}, supports_mermaid=False)
318+
assert result == {"error": "provider failed"}
319+
320+
def test_rejects_non_dict_result(self, service):
321+
result = service._normalize_summarize_result(None, supports_mermaid=False)
322+
assert result == {"error": "AI service returned an invalid summarize result"}
323+
324+
@pytest.mark.parametrize("summary", [None, "", " ", "null", "No output generated", "none"])
325+
def test_rejects_empty_summary_values(self, service, summary):
326+
result = service._normalize_summarize_result({"summary": summary}, supports_mermaid=False)
327+
assert result == {"error": "AI service returned an empty summary"}
328+
329+
def test_defaults_missing_diagram_fields(self, service):
330+
result = service._normalize_summarize_result({"summary": "Summary", "diagram": None}, supports_mermaid=False)
331+
assert result == {
332+
"summary": "Summary",
333+
"diagram": "",
334+
"diagramType": "ASCII",
335+
}
336+
337+
338+
class TestNormalizeAskResult:
339+
def test_preserves_provider_error(self, service):
340+
result = service._normalize_ask_result({"error": "provider failed"})
341+
assert result == {"error": "provider failed"}
342+
343+
def test_rejects_non_dict_result(self, service):
344+
result = service._normalize_ask_result(None)
345+
assert result == {"error": "AI service returned an invalid ask result"}
346+
347+
@pytest.mark.parametrize("answer", [None, "", " ", "null", "No output generated", "none"])
348+
def test_rejects_empty_answer_values(self, service, answer):
349+
result = service._normalize_ask_result({"answer": answer})
350+
assert result == {"error": "AI service returned an empty answer"}
351+
352+
def test_accepts_answer(self, service):
353+
result = service._normalize_ask_result({"answer": "The PR updates auth handling."})
354+
assert result == {"answer": "The PR updates auth handling."}
355+
356+
313357
# ── _create_mcp_client ───────────────────────────────────────────
314358

315359
class TestCreateMcpClient:

0 commit comments

Comments
 (0)