Skip to content

Commit 6297f18

Browse files
committed
feat(sse): add progress events for enrichment nodes (RAG, Web Search, Code Analysis)
Users can now see real-time progress during the enrichment phase: - RAG context enrichment - Web search grounding - Code analysis Added ENRICHMENT_START/COMPLETE/ERROR event types to EventType enum and emit events at start, completion, and error states for all three enrichment nodes.
1 parent d097765 commit 6297f18

4 files changed

Lines changed: 185 additions & 0 deletions

File tree

backend/app/graph/nodes/code_analysis_enrich.py

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
from langchain_core.runnables import RunnableConfig
66

77
from app.graph.state import EvaluationState
8+
from app.services.event_channel import create_sommelier_event, get_event_channel
89
from app.services.repo_clone_service import clone_and_analyze
910

1011
logger = logging.getLogger(__name__)
@@ -14,8 +15,33 @@ async def code_analysis_enrich(
1415
state: EvaluationState, config: Optional[RunnableConfig] = None
1516
) -> Dict[str, Any]:
1617
started_at = datetime.now(timezone.utc).isoformat()
18+
evaluation_id = state.get("evaluation_id")
19+
event_channel = get_event_channel()
20+
21+
if evaluation_id:
22+
event_channel.emit_sync(
23+
evaluation_id,
24+
create_sommelier_event(
25+
evaluation_id=evaluation_id,
26+
sommelier="code_analysis",
27+
event_type="enrichment_start",
28+
progress_percent=0,
29+
message="Code analysis starting...",
30+
),
31+
)
1732

1833
if existing := state.get("code_analysis"):
34+
if evaluation_id:
35+
event_channel.emit_sync(
36+
evaluation_id,
37+
create_sommelier_event(
38+
evaluation_id=evaluation_id,
39+
sommelier="code_analysis",
40+
event_type="enrichment_complete",
41+
progress_percent=100,
42+
message="Code analysis complete (cached)",
43+
),
44+
)
1945
return {"code_analysis": existing}
2046

2147
repo_url = state.get("repo_url", "")
@@ -25,6 +51,17 @@ async def code_analysis_enrich(
2551
github_token = state.get("github_token")
2652

2753
if not repo_url:
54+
if evaluation_id:
55+
event_channel.emit_sync(
56+
evaluation_id,
57+
create_sommelier_event(
58+
evaluation_id=evaluation_id,
59+
sommelier="code_analysis",
60+
event_type="enrichment_complete",
61+
progress_percent=100,
62+
message="Code analysis skipped (no repo URL)",
63+
),
64+
)
2865
return {
2966
"code_analysis": {
3067
"status": "skipped",
@@ -61,6 +98,19 @@ async def code_analysis_enrich(
6198
"summary": clone_result.summary,
6299
}
63100

101+
if evaluation_id:
102+
files_count = len(clone_result.main_files)
103+
event_channel.emit_sync(
104+
evaluation_id,
105+
create_sommelier_event(
106+
evaluation_id=evaluation_id,
107+
sommelier="code_analysis",
108+
event_type="enrichment_complete",
109+
progress_percent=100,
110+
message=f"Code analysis complete ({files_count} files)",
111+
),
112+
)
113+
64114
result: Dict[str, Any] = {
65115
"code_analysis": code_analysis,
66116
"trace_metadata": {
@@ -80,6 +130,17 @@ async def code_analysis_enrich(
80130

81131
except Exception as e:
82132
logger.exception("code_analysis_enrich failed")
133+
if evaluation_id:
134+
event_channel.emit_sync(
135+
evaluation_id,
136+
create_sommelier_event(
137+
evaluation_id=evaluation_id,
138+
sommelier="code_analysis",
139+
event_type="enrichment_error",
140+
progress_percent=100,
141+
message=f"Code analysis failed: {e}",
142+
),
143+
)
83144
return {
84145
"code_analysis": {
85146
"status": "error",

backend/app/graph/nodes/rag_enrich.py

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
from app.core.config import settings
99
from app.graph.state import EvaluationState
10+
from app.services.event_channel import create_sommelier_event, get_event_channel
1011

1112
logger = logging.getLogger(__name__)
1213

@@ -101,14 +102,50 @@ async def rag_enrich(
101102
state: EvaluationState, config: Optional[RunnableConfig] = None
102103
) -> Dict[str, Any]:
103104
started_at = datetime.now(timezone.utc).isoformat()
105+
evaluation_id = state.get("evaluation_id")
106+
event_channel = get_event_channel()
107+
108+
if evaluation_id:
109+
event_channel.emit_sync(
110+
evaluation_id,
111+
create_sommelier_event(
112+
evaluation_id=evaluation_id,
113+
sommelier="rag",
114+
event_type="enrichment_start",
115+
progress_percent=0,
116+
message="RAG context enrichment starting...",
117+
),
118+
)
104119

105120
if existing := state.get("rag_context"):
121+
if evaluation_id:
122+
event_channel.emit_sync(
123+
evaluation_id,
124+
create_sommelier_event(
125+
evaluation_id=evaluation_id,
126+
sommelier="rag",
127+
event_type="enrichment_complete",
128+
progress_percent=100,
129+
message="RAG context enrichment complete (cached)",
130+
),
131+
)
106132
return {"rag_context": existing}
107133

108134
repo_context = state.get("repo_context", {})
109135
query = _create_query(state)
110136

111137
if not settings.VERTEX_API_KEY:
138+
if evaluation_id:
139+
event_channel.emit_sync(
140+
evaluation_id,
141+
create_sommelier_event(
142+
evaluation_id=evaluation_id,
143+
sommelier="rag",
144+
event_type="enrichment_complete",
145+
progress_percent=100,
146+
message="RAG enrichment skipped (no API key)",
147+
),
148+
)
112149
return {
113150
"rag_context": {
114151
"query": query,
@@ -145,6 +182,18 @@ async def rag_enrich(
145182
min(settings.RAG_TOP_K, len(docs)),
146183
)
147184

185+
if evaluation_id:
186+
event_channel.emit_sync(
187+
evaluation_id,
188+
create_sommelier_event(
189+
evaluation_id=evaluation_id,
190+
sommelier="rag",
191+
event_type="enrichment_complete",
192+
progress_percent=100,
193+
message=f"RAG enrichment complete ({len(chunks)} chunks)",
194+
),
195+
)
196+
148197
return {
149198
"rag_context": {"query": query, "chunks": chunks, "error": None},
150199
"trace_metadata": {
@@ -157,6 +206,17 @@ async def rag_enrich(
157206

158207
except Exception as e:
159208
logger.warning(f"RAG embedding failed: {e}")
209+
if evaluation_id:
210+
event_channel.emit_sync(
211+
evaluation_id,
212+
create_sommelier_event(
213+
evaluation_id=evaluation_id,
214+
sommelier="rag",
215+
event_type="enrichment_error",
216+
progress_percent=100,
217+
message=f"RAG enrichment failed: {e}",
218+
),
219+
)
160220
return {
161221
"rag_context": {"query": query, "chunks": [], "error": str(e)},
162222
"errors": [f"rag_enrich failed: {e!s}"],

backend/app/graph/nodes/web_search_enrich.py

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
from app.core.config import settings
88
from app.graph.state import EvaluationState
9+
from app.services.event_channel import create_sommelier_event, get_event_channel
910

1011
logger = logging.getLogger(__name__)
1112

@@ -23,11 +24,47 @@ async def web_search_enrich(
2324
state: EvaluationState, config: Optional[RunnableConfig] = None
2425
) -> Dict[str, Any]:
2526
started_at = datetime.now(timezone.utc).isoformat()
27+
evaluation_id = state.get("evaluation_id")
28+
event_channel = get_event_channel()
29+
30+
if evaluation_id:
31+
event_channel.emit_sync(
32+
evaluation_id,
33+
create_sommelier_event(
34+
evaluation_id=evaluation_id,
35+
sommelier="web_search",
36+
event_type="enrichment_start",
37+
progress_percent=0,
38+
message="Web search enrichment starting...",
39+
),
40+
)
2641

2742
if existing := state.get("web_search_context"):
43+
if evaluation_id:
44+
event_channel.emit_sync(
45+
evaluation_id,
46+
create_sommelier_event(
47+
evaluation_id=evaluation_id,
48+
sommelier="web_search",
49+
event_type="enrichment_complete",
50+
progress_percent=100,
51+
message="Web search enrichment complete (cached)",
52+
),
53+
)
2854
return {"web_search_context": existing}
2955

3056
if not settings.VERTEX_API_KEY:
57+
if evaluation_id:
58+
event_channel.emit_sync(
59+
evaluation_id,
60+
create_sommelier_event(
61+
evaluation_id=evaluation_id,
62+
sommelier="web_search",
63+
event_type="enrichment_complete",
64+
progress_percent=100,
65+
message="Web search skipped (no API key)",
66+
),
67+
)
3168
return {
3269
"web_search_context": {
3370
"query": "",
@@ -85,6 +122,18 @@ async def web_search_enrich(
85122
}
86123
)
87124

125+
if evaluation_id:
126+
event_channel.emit_sync(
127+
evaluation_id,
128+
create_sommelier_event(
129+
evaluation_id=evaluation_id,
130+
sommelier="web_search",
131+
event_type="enrichment_complete",
132+
progress_percent=100,
133+
message=f"Web search complete ({len(sources)} sources)",
134+
),
135+
)
136+
88137
return {
89138
"web_search_context": {
90139
"query": query,
@@ -103,6 +152,17 @@ async def web_search_enrich(
103152

104153
except Exception as e:
105154
logger.warning(f"Web search grounding failed: {e}")
155+
if evaluation_id:
156+
event_channel.emit_sync(
157+
evaluation_id,
158+
create_sommelier_event(
159+
evaluation_id=evaluation_id,
160+
sommelier="web_search",
161+
event_type="enrichment_error",
162+
progress_percent=100,
163+
message=f"Web search failed: {e}",
164+
),
165+
)
106166
return {
107167
"web_search_context": {
108168
"query": query,

backend/app/services/event_channel.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,10 @@ class EventType(str, Enum):
6666
DEEP_SYNTHESIS_COMPLETE = "deep_synthesis_complete"
6767
QUALITY_GATE_COMPLETE = "quality_gate_complete"
6868
METRICS_UPDATE = "metrics_update"
69+
# Enrichment phase events (RAG, Web Search, Code Analysis)
70+
ENRICHMENT_START = "enrichment_start"
71+
ENRICHMENT_COMPLETE = "enrichment_complete"
72+
ENRICHMENT_ERROR = "enrichment_error"
6973

7074

7175
@dataclass

0 commit comments

Comments
 (0)