Skip to content

Commit 4de1e28

Browse files
fix(workflow): reject incoherent ResultGenerator output across all steps
Production deployment of the agent-framework 1.3.0 upgrade surfaced a crash chain: Analysis "succeeded" with a self-contradictory result (result=True, is_hard_terminated=False, output=None), Design then crashed at `task_param.output.process_id`. The root cause is the ResultGenerator returning an empty shell when participants never produced useful content. Fixes: * groupchat_orchestrator.run_stream now validates ResultGenerator output before constructing OrchestrationResult. If the result is not hard terminated but carries no `output` / `termination_output` payload, the orchestrator now reports success=False with a descriptive error. This is generic across all four step models (Analysis uses `output`; Design/Convert/Documentation use `termination_output`). * All four step executors gained a defense-in-depth guard that raises a clear `<Step>Executor failed: produced no <X>Output. Reason: ...` exception when the same incoherent shape is observed. This stops the broken value at the boundary instead of propagating it downstream. * groupchat_orchestrator silent `except Exception: pass` around Coordinator JSON parsing replaced with `logger.debug(... exc_info=...)` so loop-detection failures become visible during debugging instead of being swallowed. Tests: * Updated each executor's existing soft-completion test to provide a valid output (previous setup encoded the broken shape we now reject). * Added a new guard test per executor asserting the new exception fires for the incoherent (success=True + output=None + not hard-terminated) shape. * Full unit suite: 829 passed (was 825; +4 new guard tests). Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
1 parent 76e9c17 commit 4de1e28

9 files changed

Lines changed: 409 additions & 9 deletions

File tree

src/processor/src/libs/agent_framework/groupchat_orchestrator.py

Lines changed: 50 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -643,17 +643,48 @@ async def run_stream(
643643
f"[RESULT] Skipping result generation - result_format: {result_format}, agent exists: {result_generator_name in self.agents}"
644644
)
645645

646+
# Validate that ResultGenerator produced a coherent output. The LLM can
647+
# sometimes return is_hard_terminated=False with output=None ("success
648+
# but no actual output"), which causes downstream steps to crash with
649+
# NoneType errors. Treat such self-contradictory results as failures so
650+
# the workflow surfaces a clear error rather than propagating an empty
651+
# shell to the next step.
652+
generated_error: str | None = None
653+
if final_analysis is not None and not bool(
654+
getattr(final_analysis, "is_hard_terminated", False)
655+
):
656+
# Step result models use either ``output`` (Analysis) or
657+
# ``termination_output`` (Design, Convert, Documentation). Treat
658+
# both equivalently: if neither holds a non-None payload, the
659+
# ResultGenerator returned an incoherent shell.
660+
has_output_attr = hasattr(final_analysis, "output") or hasattr(
661+
final_analysis, "termination_output"
662+
)
663+
payload = getattr(final_analysis, "output", None) or getattr(
664+
final_analysis, "termination_output", None
665+
)
666+
if has_output_attr and payload is None:
667+
reason = (
668+
getattr(final_analysis, "reason", "") or "<no reason given>"
669+
)
670+
generated_error = (
671+
"ResultGenerator produced incoherent output: "
672+
"is_hard_terminated=False but output=None. "
673+
f"Reason from result: {reason}"
674+
)
675+
logger.error("[RESULT] %s", generated_error)
676+
646677
# Calculate execution time
647678
execution_time = (datetime.now() - start_time).total_seconds()
648679

649680
# Build result
650681
result = OrchestrationResult[TOutput](
651-
success=True,
682+
success=generated_error is None,
652683
conversation=conversation,
653684
agent_responses=self.agent_responses,
654685
tool_usage=self.agent_tool_usage,
655686
result=final_analysis,
656-
error=None,
687+
error=generated_error,
657688
execution_time_seconds=execution_time,
658689
)
659690

@@ -1154,9 +1185,23 @@ async def _complete_agent_response(
11541185
):
11551186
# Record invocation time for non-termination coordinator selections
11561187
self._agent_invoked_at[selected] = completed_at
1157-
except Exception:
1158-
# If the Coordinator didn't emit valid JSON, ignore.
1159-
pass
1188+
except Exception as exc:
1189+
# If the Coordinator didn't emit valid JSON we silently drop
1190+
# loop-detection and termination handling for this turn. Log at
1191+
# debug so the silence is visible if loop detection ever appears
1192+
# to misfire (previously this was a bare ``pass`` which made the
1193+
# failure invisible).
1194+
preview = (
1195+
complete_message[:200]
1196+
if isinstance(complete_message, str)
1197+
else str(type(complete_message))
1198+
)
1199+
logger.debug(
1200+
"Coordinator JSON parse failed; skipping loop detection for "
1201+
"this turn. Raw message preview: %r",
1202+
preview,
1203+
exc_info=exc,
1204+
)
11601205

11611206
# Invoke callback with complete response
11621207
if callback:

src/processor/src/steps/analysis/workflow/analysis_executor.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,13 @@ async def handle_execute(
6565
error_msg = result.error or "Analysis orchestration failed with no output"
6666
raise Exception(f"AnalysisExecutor failed: {error_msg}")
6767

68+
if not result.result.is_hard_terminated and result.result.output is None:
69+
reason = result.result.reason or "<no reason given>"
70+
raise Exception(
71+
"AnalysisExecutor failed: orchestration reported success but produced "
72+
f"no AnalysisOutput. Reason: {reason}"
73+
)
74+
6875
if result.result:
6976
if not result.result.is_hard_terminated:
7077
await ctx.send_message(result.result)

src/processor/src/steps/convert/workflow/yaml_convert_executor.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,13 @@ async def handle_execute(
4545
)
4646
raise Exception(f"YamlConvertExecutor failed: {error_msg}")
4747

48+
if not result.result.is_hard_terminated and result.result.termination_output is None:
49+
reason = result.result.reason or "<no reason given>"
50+
raise Exception(
51+
"YamlConvertExecutor failed: orchestration reported success but "
52+
f"produced no YAML conversion output. Reason: {reason}"
53+
)
54+
4855
if result.result:
4956
if not result.result.is_hard_terminated:
5057
await ctx.send_message(result.result)

src/processor/src/steps/design/workflow/design_executor.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,13 @@ async def handle_execute(
4242
error_msg = result.error or "Design orchestration failed with no output"
4343
raise Exception(f"DesignExecutor failed: {error_msg}")
4444

45+
if not result.result.is_hard_terminated and result.result.termination_output is None:
46+
reason = result.result.reason or "<no reason given>"
47+
raise Exception(
48+
"DesignExecutor failed: orchestration reported success but produced "
49+
f"no DesignOutput. Reason: {reason}"
50+
)
51+
4552
if result.result:
4653
if not result.result.is_hard_terminated:
4754
await ctx.send_message(result.result)

src/processor/src/steps/documentation/workflow/documentation_executor.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,4 +47,11 @@ async def handle_execute(
4747
)
4848
raise Exception(f"DocumentationExecutor failed: {error_msg}")
4949

50+
if not result.result.is_hard_terminated and result.result.termination_output is None:
51+
reason = result.result.reason or "<no reason given>"
52+
raise Exception(
53+
"DocumentationExecutor failed: orchestration reported success but "
54+
f"produced no DocumentationOutput. Reason: {reason}"
55+
)
56+
5057
await ctx.yield_output(result.result)

src/processor/src/tests/unit/steps/analysis/test_analysis_executor.py

Lines changed: 98 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,47 @@
66
import asyncio
77

88
from libs.agent_framework.groupchat_orchestrator import OrchestrationResult
9-
from steps.analysis.models.step_output import Analysis_BooleanExtendedResult
9+
from steps.analysis.models.step_output import (
10+
AnalysisOutput,
11+
Analysis_BooleanExtendedResult,
12+
ComplexityAnalysis,
13+
FileType,
14+
MigrationReadiness,
15+
)
1016
from steps.analysis.models.step_param import Analysis_TaskParam
1117
from steps.analysis.workflow.analysis_executor import AnalysisExecutor
1218

1319

20+
def _make_analysis_output(process_id: str) -> AnalysisOutput:
21+
return AnalysisOutput(
22+
process_id=process_id,
23+
platform_detected="EKS",
24+
confidence_score="95%",
25+
files_discovered=[
26+
FileType(
27+
filename="app.yaml",
28+
type="Deployment",
29+
complexity="Low",
30+
azure_mapping="AKS Deployment",
31+
)
32+
],
33+
complexity_analysis=ComplexityAnalysis(
34+
network_complexity="Low",
35+
security_complexity="Low",
36+
storage_complexity="Low",
37+
compute_complexity="Low",
38+
),
39+
migration_readiness=MigrationReadiness(
40+
overall_score="A",
41+
concerns=[],
42+
recommendations=[],
43+
),
44+
summary="ok",
45+
expert_insights=[],
46+
analysis_file="analysis.md",
47+
)
48+
49+
1450
class _FakeTelemetry:
1551
def __init__(self):
1652
self.transitions: list[tuple[str, str, str]] = []
@@ -59,6 +95,7 @@ async def execute(self, task_param=None):
5995
result=True,
6096
is_hard_terminated=False,
6197
process_id=task_param.process_id,
98+
output=_make_analysis_output(task_param.process_id),
6299
),
63100
)
64101

@@ -143,3 +180,63 @@ async def execute(self, task_param=None):
143180
assert isinstance(ctx.yielded[0], Analysis_BooleanExtendedResult)
144181

145182
asyncio.run(_run())
183+
184+
185+
def test_analysis_executor_raises_when_soft_completion_has_no_output(monkeypatch):
186+
"""Soft completion with output=None is incoherent: AnalysisExecutor must raise.
187+
188+
This guards against ResultGenerator returning a self-contradictory shell
189+
(success=True, is_hard_terminated=False, output=None) which would otherwise
190+
propagate to Design and crash there with `NoneType.process_id`.
191+
"""
192+
async def _run():
193+
import pytest
194+
195+
telemetry = _FakeTelemetry()
196+
app_context = _FakeAppContext(telemetry)
197+
ctx = _FakeCtx()
198+
199+
class _FakeOrchestrator:
200+
def __init__(self, _app_context):
201+
pass
202+
203+
async def execute(self, task_param=None):
204+
return OrchestrationResult(
205+
success=True,
206+
conversation=[],
207+
agent_responses=[],
208+
tool_usage={},
209+
result=Analysis_BooleanExtendedResult(
210+
result=True,
211+
is_hard_terminated=False,
212+
process_id=task_param.process_id,
213+
reason="agents never produced output",
214+
),
215+
)
216+
217+
monkeypatch.setattr(
218+
"steps.analysis.workflow.analysis_executor.text2art",
219+
lambda _s: "ART",
220+
raising=False,
221+
)
222+
monkeypatch.setattr(
223+
"steps.analysis.workflow.analysis_executor.AnalysisOrchestrator",
224+
_FakeOrchestrator,
225+
)
226+
227+
executor = AnalysisExecutor(id="analysis", app_context=app_context)
228+
message = Analysis_TaskParam(
229+
process_id="p1",
230+
container_name="c1",
231+
source_file_folder="p1/source",
232+
workspace_file_folder="p1/workspace",
233+
output_file_folder="p1/output",
234+
)
235+
236+
with pytest.raises(Exception, match="produced no AnalysisOutput"):
237+
await executor.handle_execute(message, ctx) # type: ignore[arg-type]
238+
239+
assert len(ctx.sent) == 0
240+
assert len(ctx.yielded) == 0
241+
242+
asyncio.run(_run())

src/processor/src/tests/unit/steps/convert/test_yaml_convert_executor.py

Lines changed: 98 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,63 @@
66
import asyncio
77

88
from libs.agent_framework.groupchat_orchestrator import OrchestrationResult
9-
from steps.convert.models.step_output import Yaml_ExtendedBooleanResult
9+
from steps.convert.models.step_output import (
10+
ConvertedFile,
11+
ConversionMetrics,
12+
ConversionQuality,
13+
DimensionalAnalysis,
14+
MultiDimensionalAnalysis,
15+
YamlOutput,
16+
Yaml_ExtendedBooleanResult,
17+
)
1018
from steps.convert.workflow.yaml_convert_executor import YamlConvertExecutor
1119
from steps.design.models.step_output import Design_ExtendedBooleanResult
1220

1321

22+
def _make_yaml_output() -> YamlOutput:
23+
dim = DimensionalAnalysis(
24+
complexity="Low",
25+
converted_components=["pod"],
26+
azure_optimizations="none",
27+
concerns=[],
28+
success_rate="100%",
29+
)
30+
return YamlOutput(
31+
converted_files=[
32+
ConvertedFile(
33+
source_file="a.yaml",
34+
converted_file="a-azure.yaml",
35+
conversion_status="Success",
36+
accuracy_rating="100%",
37+
concerns=[],
38+
azure_enhancements=[],
39+
)
40+
],
41+
multi_dimensional_analysis=MultiDimensionalAnalysis(
42+
network_analysis=dim,
43+
security_analysis=dim,
44+
storage_analysis=dim,
45+
compute_analysis=dim,
46+
),
47+
overall_conversion_metrics=ConversionMetrics(
48+
total_files=1,
49+
successful_conversions=1,
50+
failed_conversions=0,
51+
overall_accuracy="100%",
52+
azure_compatibility="100%",
53+
),
54+
conversion_quality=ConversionQuality(
55+
azure_best_practices="ok",
56+
security_hardening="ok",
57+
performance_optimization="ok",
58+
production_readiness="ok",
59+
),
60+
summary="ok",
61+
expert_insights=[],
62+
conversion_report_file="report.md",
63+
)
64+
65+
1466
class _FakeTelemetry:
1567
def __init__(self):
1668
self.transitions: list[tuple[str, str, str]] = []
@@ -59,6 +111,7 @@ async def execute(self, task_param=None):
59111
result=True,
60112
is_hard_terminated=False,
61113
process_id=task_param.process_id,
114+
termination_output=_make_yaml_output(),
62115
),
63116
)
64117

@@ -118,3 +171,47 @@ async def execute(self, task_param=None):
118171
assert isinstance(ctx.yielded[0], Yaml_ExtendedBooleanResult)
119172

120173
asyncio.run(_run())
174+
175+
176+
def test_yaml_convert_executor_raises_when_soft_completion_has_no_output(monkeypatch):
177+
"""Soft completion with termination_output=None is incoherent: must raise."""
178+
async def _run():
179+
import pytest
180+
181+
telemetry = _FakeTelemetry()
182+
app_context = _FakeAppContext(telemetry)
183+
ctx = _FakeCtx()
184+
185+
class _FakeOrchestrator:
186+
def __init__(self, _app_context):
187+
pass
188+
189+
async def execute(self, task_param=None):
190+
return OrchestrationResult(
191+
success=True,
192+
conversation=[],
193+
agent_responses=[],
194+
tool_usage={},
195+
result=Yaml_ExtendedBooleanResult(
196+
result=True,
197+
is_hard_terminated=False,
198+
process_id=task_param.process_id,
199+
reason="agents never produced output",
200+
),
201+
)
202+
203+
monkeypatch.setattr(
204+
"steps.convert.workflow.yaml_convert_executor.YamlConvertOrchestrator",
205+
_FakeOrchestrator,
206+
)
207+
208+
executor = YamlConvertExecutor(id="yaml", app_context=app_context)
209+
message = Design_ExtendedBooleanResult(process_id="p1")
210+
211+
with pytest.raises(Exception, match="produced no YAML conversion output"):
212+
await executor.handle_execute(message, ctx) # type: ignore[arg-type]
213+
214+
assert len(ctx.sent) == 0
215+
assert len(ctx.yielded) == 0
216+
217+
asyncio.run(_run())

0 commit comments

Comments
 (0)