Skip to content

Commit a4f7d77

Browse files
authored
Merge pull request #273 from Two-Weeks-Team/fix/grand-tasting-sse-events
fix: add SSE event emission to grand_tasting mode
2 parents f9d9211 + c1f2f1d commit a4f7d77

2 files changed

Lines changed: 136 additions & 31 deletions

File tree

backend/app/graph/nodes/tasting_notes/base.py

Lines changed: 113 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
from app.graph.schemas import TastingNoteOutput, TechniqueResult
1111
from app.providers.llm import build_llm, extract_text_content
1212
from app.providers.llm_policy import invoke_with_policy, RetryConfig
13+
from app.services.event_channel import create_sommelier_event, get_event_channel
1314
from app.services.llm_context import render_repo_context, get_context_budget
1415
from app.techniques.mappings import (
1516
TastingNote,
@@ -98,6 +99,25 @@ async def evaluate(
9899
self, state: EvaluationState, config: Optional[RunnableConfig] = None
99100
) -> Dict[str, Any]:
100101
started_at = datetime.now(timezone.utc).isoformat()
102+
evaluation_id = state.get("evaluation_id", "")
103+
category_id = self.category.value
104+
progress_config = TASTING_NOTE_PROGRESS.get(
105+
category_id, {"start": 0, "complete": 100}
106+
)
107+
event_channel = get_event_channel()
108+
109+
if evaluation_id:
110+
event_channel.emit_sync(
111+
evaluation_id,
112+
create_sommelier_event(
113+
evaluation_id=evaluation_id,
114+
sommelier=category_id,
115+
event_type="sommelier_start",
116+
progress_percent=progress_config["start"],
117+
message=f"{category_id} analysis starting...",
118+
),
119+
)
120+
101121
configurable = (config or {}).get("configurable", {})
102122
provider = configurable.get("provider", "vertex")
103123
api_key = configurable.get("api_key")
@@ -117,15 +137,26 @@ async def evaluate(
117137
techniques = self.get_techniques()[:3]
118138

119139
if not techniques:
120-
logger.warning(f"{self.category.value}: no techniques available")
140+
logger.warning(f"{category_id}: no techniques available")
141+
if evaluation_id:
142+
event_channel.emit_sync(
143+
evaluation_id,
144+
create_sommelier_event(
145+
evaluation_id=evaluation_id,
146+
sommelier=category_id,
147+
event_type="sommelier_error",
148+
progress_percent=progress_config["start"],
149+
message=f"{category_id}: no techniques configured",
150+
),
151+
)
121152
return {
122-
"errors": [f"{self.category.value}: no techniques configured"],
123-
f"{self.category.value}_result": None,
124-
"completed_sommeliers": [self.category.value],
125-
"token_usage": {self.category.value: {}},
126-
"cost_usage": {self.category.value: None},
153+
"errors": [f"{category_id}: no techniques configured"],
154+
f"{category_id}_result": None,
155+
"completed_sommeliers": [category_id],
156+
"token_usage": {category_id: {}},
157+
"cost_usage": {category_id: None},
127158
"trace_metadata": {
128-
self.category.value: {
159+
category_id: {
129160
"started_at": started_at,
130161
"completed_at": datetime.now(timezone.utc).isoformat(),
131162
}
@@ -194,11 +225,11 @@ async def evaluate(
194225
rendered_context += code_section
195226

196227
observability = {
197-
"completed_sommeliers": [self.category.value],
198-
"token_usage": {self.category.value: {}},
199-
"cost_usage": {self.category.value: None},
228+
"completed_sommeliers": [category_id],
229+
"token_usage": {category_id: {}},
230+
"cost_usage": {category_id: None},
200231
"trace_metadata": {
201-
self.category.value: {
232+
category_id: {
202233
"started_at": started_at,
203234
"completed_at": None,
204235
"model": model or "default",
@@ -214,7 +245,18 @@ async def evaluate(
214245
messages = prompt.format_messages(repo_context=rendered_context)
215246

216247
def on_retry(attempt: int, delay: float, msg: str) -> None:
217-
logger.info(f"{self.category.value}: {msg}")
248+
logger.info(f"{category_id}: {msg}")
249+
if evaluation_id:
250+
event_channel.emit_sync(
251+
evaluation_id,
252+
create_sommelier_event(
253+
evaluation_id=evaluation_id,
254+
sommelier=category_id,
255+
event_type="sommelier_retry",
256+
progress_percent=progress_config["start"],
257+
message=f"{category_id} retrying ({attempt}/3)...",
258+
),
259+
)
218260

219261
invocation_result = await invoke_with_policy(
220262
llm=llm,
@@ -225,21 +267,21 @@ def on_retry(attempt: int, delay: float, msg: str) -> None:
225267
on_retry=on_retry,
226268
)
227269

228-
observability["trace_metadata"][self.category.value]["completed_at"] = (
229-
datetime.now(timezone.utc).isoformat()
230-
)
231-
observability["trace_metadata"][self.category.value]["attempts"] = (
270+
observability["trace_metadata"][category_id]["completed_at"] = datetime.now(
271+
timezone.utc
272+
).isoformat()
273+
observability["trace_metadata"][category_id]["attempts"] = (
232274
invocation_result.attempts
233275
)
234-
observability["trace_metadata"][self.category.value]["total_wait_seconds"] = (
276+
observability["trace_metadata"][category_id]["total_wait_seconds"] = (
235277
invocation_result.total_wait_seconds
236278
)
237279

238280
if invocation_result.success:
239281
response = invocation_result.response
240282
usage = getattr(response, "usage_metadata", {}) or {}
241283
observability["token_usage"] = {
242-
self.category.value: {
284+
category_id: {
243285
"input_tokens": usage.get("input_tokens"),
244286
"output_tokens": usage.get("output_tokens"),
245287
"total_tokens": usage.get("total_tokens"),
@@ -250,23 +292,56 @@ def on_retry(attempt: int, delay: float, msg: str) -> None:
250292
text_content = extract_text_content(response.content)
251293
result = self.parser.parse(text_content)
252294
except Exception as parse_error:
253-
logger.error(
254-
f"{self.category.value} failed to parse response: {parse_error!s}"
255-
)
295+
logger.error(f"{category_id} failed to parse response: {parse_error!s}")
296+
if evaluation_id:
297+
event_channel.emit_sync(
298+
evaluation_id,
299+
create_sommelier_event(
300+
evaluation_id=evaluation_id,
301+
sommelier=category_id,
302+
event_type="sommelier_error",
303+
progress_percent=progress_config["start"],
304+
message=f"{category_id} analysis failed (parse error)",
305+
),
306+
)
256307
return {
257-
"errors": [f"{self.category.value} parse error: {parse_error!s}"],
258-
f"{self.category.value}_result": None,
308+
"errors": [f"{category_id} parse error: {parse_error!s}"],
309+
f"{category_id}_result": None,
259310
**observability,
260311
}
261312

313+
if evaluation_id:
314+
event_channel.emit_sync(
315+
evaluation_id,
316+
create_sommelier_event(
317+
evaluation_id=evaluation_id,
318+
sommelier=category_id,
319+
event_type="sommelier_complete",
320+
progress_percent=progress_config["complete"],
321+
message=f"{category_id} analysis complete",
322+
tokens_used=usage.get("total_tokens", 0),
323+
),
324+
)
325+
if category_id == "cellar":
326+
event_channel.emit_sync(
327+
evaluation_id,
328+
create_sommelier_event(
329+
evaluation_id=evaluation_id,
330+
sommelier="system",
331+
event_type="evaluation_complete",
332+
progress_percent=100,
333+
message="Grand tasting evaluation complete",
334+
),
335+
)
336+
262337
return {
263-
f"{self.category.value}_result": result.model_dump(),
338+
f"{category_id}_result": result.model_dump(),
264339
**observability,
265340
}
266341

267342
error_category = invocation_result.error_category
268343
error_msg = (
269-
f"{self.category.value} evaluation failed after "
344+
f"{category_id} evaluation failed after "
270345
f"{invocation_result.attempts} attempts"
271346
)
272347
if error_category:
@@ -276,8 +351,20 @@ def on_retry(attempt: int, delay: float, msg: str) -> None:
276351

277352
logger.error(error_msg)
278353

354+
if evaluation_id:
355+
event_channel.emit_sync(
356+
evaluation_id,
357+
create_sommelier_event(
358+
evaluation_id=evaluation_id,
359+
sommelier=category_id,
360+
event_type="sommelier_error",
361+
progress_percent=progress_config["start"],
362+
message=f"{category_id} analysis failed",
363+
),
364+
)
365+
279366
return {
280367
"errors": [error_msg],
281-
f"{self.category.value}_result": None,
368+
f"{category_id}_result": None,
282369
**observability,
283370
}

frontend/src/hooks/useEvaluationStream.ts

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -83,10 +83,18 @@ export const useEvaluationStream = (evaluationId: string): UseEvaluationStreamRe
8383
setCurrentSommelier(event.sommelier);
8484
}
8585
setStatus('processing');
86-
setCompletedSommeliers((prev) => {
87-
updateProgressFromCompleted(prev.length, true);
88-
return prev;
89-
});
86+
if (event.progress_percent != null && event.progress_percent >= 0) {
87+
const newProgress = event.progress_percent;
88+
if (newProgress > progressRef.current) {
89+
progressRef.current = newProgress;
90+
setProgress(newProgress);
91+
}
92+
} else {
93+
setCompletedSommeliers((prev) => {
94+
updateProgressFromCompleted(prev.length, true);
95+
return prev;
96+
});
97+
}
9098
break;
9199

92100
case 'sommelier_complete':
@@ -108,7 +116,15 @@ export const useEvaluationStream = (evaluationId: string): UseEvaluationStreamRe
108116
feedback: event.message || `${sommelierInfo.name} analysis complete`,
109117
},
110118
];
111-
updateProgressFromCompleted(newList.length, false);
119+
if (event.progress_percent != null && event.progress_percent >= 0) {
120+
const newProgress = event.progress_percent;
121+
if (newProgress > progressRef.current) {
122+
progressRef.current = newProgress;
123+
setProgress(newProgress);
124+
}
125+
} else {
126+
updateProgressFromCompleted(newList.length, false);
127+
}
112128
return newList;
113129
});
114130
setCurrentSommelier(null);
@@ -128,6 +144,7 @@ export const useEvaluationStream = (evaluationId: string): UseEvaluationStreamRe
128144
break;
129145

130146
case 'evaluation_complete':
147+
isCompleteRef.current = true;
131148
setIsComplete(true);
132149
setStatus('completed');
133150
progressRef.current = 100;
@@ -136,6 +153,7 @@ export const useEvaluationStream = (evaluationId: string): UseEvaluationStreamRe
136153
break;
137154

138155
case 'evaluation_error':
156+
isCompleteRef.current = true;
139157
setIsComplete(true);
140158
setStatus('failed');
141159
setCurrentSommelier(null);

0 commit comments

Comments
 (0)