Skip to content

Commit 3530d97

Browse files
authored
Merge pull request #1177 from devoxx/fix/persist-partial-conversation-on-streaming-error
fix: persist conversations to history when a run fails after streaming an answer
2 parents 0933686 + 9aa67e7 commit 3530d97

7 files changed

Lines changed: 358 additions & 23 deletions

File tree

src/main/java/com/devoxx/genie/service/prompt/response/streaming/StreamingResponseHandler.java

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -267,6 +267,10 @@ public void onCompleteResponse(ChatResponse response) {
267267
public void onError(@NotNull Throwable error) {
268268
log.error("Streaming error for context {}: {}", context.getId(), error.getMessage());
269269

270+
// Persist any answer already streamed before the failure so the run survives in
271+
// conversation history. Must run before the UI teardown below.
272+
persistPartialResponseOnError();
273+
270274
// Deactivate activity handlers BEFORE hiding to prevent race condition
271275
if (conversationViewController != null) {
272276
conversationViewController.deactivateActivityHandlers();
@@ -294,6 +298,39 @@ public void onError(@NotNull Throwable error) {
294298
onErrorCallback.accept(streamingError);
295299
}
296300

301+
/**
302+
* Persists a run that produced visible answer text but then failed. Conversation
303+
* persistence is normally triggered only from {@link #onCompleteResponse} (which
304+
* publishes {@link AppTopics#CONVERSATION_TOPIC} → {@code ChatService.saveConversation}).
305+
* Without this, an agent/streaming run that streamed a visible answer and then hit a
306+
* trailing provider error (e.g. NVIDIA 404 / ConnectException mid tool-loop) would be
307+
* dropped from conversation history entirely, even though the user already saw the
308+
* answer. Mirrors how {@link #stop()} preserves the partial text in the live view.
309+
*/
310+
private void persistPartialResponseOnError() {
311+
if (isStopped || isCompleted) {
312+
// stop() finalizes its own message; a completed run already persisted. Avoid
313+
// re-publishing a fresh conversation for a trailing/duplicate error callback.
314+
return;
315+
}
316+
String partial = getAccumulatedText();
317+
if (partial.isEmpty()) {
318+
// Provider failed before producing any text — there is nothing worth saving.
319+
return;
320+
}
321+
try {
322+
context.setExecutionTimeMs(System.currentTimeMillis() - startTime);
323+
context.setAiMessage(dev.langchain4j.data.message.AiMessage.from(partial));
324+
project.getMessageBus()
325+
.syncPublisher(AppTopics.CONVERSATION_TOPIC)
326+
.onNewConversation(context);
327+
log.debug("Persisted partial response to history after streaming error for context {}", context.getId());
328+
} catch (Exception e) {
329+
// Best-effort: a persistence failure must not mask the original streaming error.
330+
log.warn("Failed to persist partial response after streaming error for context {}", context.getId(), e);
331+
}
332+
}
333+
297334
/**
298335
* Hides the "Thinking..." loading indicator and any agent activity in the WebView.
299336
* Delegates to ConversationViewController.hideLoadingIndicator().

src/main/java/com/devoxx/genie/service/prompt/strategy/AbstractPromptExecutionStrategy.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import com.devoxx.genie.service.rag.SearchResult;
1616
import com.devoxx.genie.service.rag.SemanticSearchService;
1717
import com.devoxx.genie.ui.panel.PromptOutputPanel;
18+
import com.devoxx.genie.ui.topic.AppTopics;
1819
import com.devoxx.genie.ui.util.NotificationUtil;
1920
import com.intellij.openapi.project.Project;
2021
import dev.langchain4j.data.message.AiMessage;
@@ -305,6 +306,36 @@ protected void handleExecutionError(@NotNull Throwable error,
305306
PromptErrorHandler.handleException(context.getProject(), executionError, context);
306307
resultTask.complete(PromptResult.failure(context, executionError));
307308
}
309+
310+
/**
311+
* Persists a partially-completed run to conversation history after a failure. Strategies
312+
* that display answer text incrementally (streaming, CLI, ACP) otherwise only persist on
313+
* clean completion, so a run that streamed a visible answer and then failed would vanish
314+
* from history entirely. Mirrors the streaming {@code onError} behaviour.
315+
* <p>
316+
* No-op when no text was produced (nothing worth saving). Best-effort: a persistence
317+
* failure is logged and swallowed so it can never mask the original error.
318+
*
319+
* @param context the chat message context (its AI message and execution time are set here)
320+
* @param partialText the answer text streamed before the failure
321+
* @param startTimeMillis the run start time, used to compute the execution duration
322+
*/
323+
protected void persistPartialResponseOnError(@NotNull ChatMessageContext context,
324+
@NotNull String partialText,
325+
long startTimeMillis) {
326+
if (partialText.isEmpty()) {
327+
return;
328+
}
329+
try {
330+
context.setExecutionTimeMs(System.currentTimeMillis() - startTimeMillis);
331+
context.setAiMessage(AiMessage.from(partialText));
332+
project.getMessageBus()
333+
.syncPublisher(AppTopics.CONVERSATION_TOPIC)
334+
.onNewConversation(context);
335+
} catch (Exception e) {
336+
log.warn("Failed to persist partial response after error for context {}", context.getId(), e);
337+
}
338+
}
308339

309340
/**
310341
* Helper method to hide the "Thinking..." loading indicator.

src/main/java/com/devoxx/genie/service/prompt/strategy/AcpPromptStrategy.java

Lines changed: 51 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import com.devoxx.genie.service.prompt.threading.PromptTask;
1010
import com.devoxx.genie.ui.panel.PromptOutputPanel;
1111
import com.devoxx.genie.ui.settings.DevoxxGenieStateService;
12+
import com.devoxx.genie.ui.topic.AppTopics;
1213
import com.devoxx.genie.ui.compose.ConversationViewController;
1314
import com.intellij.openapi.application.ApplicationManager;
1415
import com.intellij.openapi.project.Project;
@@ -124,35 +125,63 @@ private void runAcpSession(@NotNull ChatMessageContext context,
124125
long elapsed = System.currentTimeMillis() - startTime;
125126
log.info("ACP session completed: elapsed={}ms, responseLength={}", elapsed, accumulatedResponse.length());
126127

127-
ApplicationManager.getApplication().invokeLater(() -> {
128-
String exitMsg = "\n=== ACP session completed (after " + elapsed + "ms) ===\n";
129-
consoleManager.printSystem(exitMsg);
130-
131-
context.setExecutionTimeMs(elapsed);
132-
String finalText = accumulatedResponse.toString();
133-
if (!finalText.isEmpty()) {
134-
context.setAiMessage(AiMessage.from(finalText));
135-
}
136-
137-
if (viewController != null) {
138-
viewController.updateAiMessageContent(context);
139-
viewController.markMCPLogsAsCompleted(context.getId());
140-
}
141-
142-
ChatMemoryManager.getInstance().addAiResponse(context);
143-
resultTask.complete(PromptResult.success(context));
144-
});
128+
ApplicationManager.getApplication().invokeLater(() ->
129+
finalizeAcpSuccess(elapsed, accumulatedResponse, consoleManager, viewController, context, resultTask));
145130

146131
} catch (Exception e) {
147132
activeClient = null;
148133
log.error("ACP session failed: {}", e.getMessage(), e);
149-
ApplicationManager.getApplication().invokeLater(() -> {
150-
consoleManager.printError("[ACP] Error: " + e.getMessage());
151-
resultTask.complete(PromptResult.failure(context, e));
152-
});
134+
ApplicationManager.getApplication().invokeLater(() ->
135+
finalizeAcpError(e, accumulatedResponse, startTime, consoleManager, context, resultTask));
153136
}
154137
}
155138

139+
private void finalizeAcpSuccess(long elapsed,
140+
@NotNull StringBuilder accumulatedResponse,
141+
@NotNull CliConsoleManager consoleManager,
142+
@Nullable ConversationViewController viewController,
143+
@NotNull ChatMessageContext context,
144+
@NotNull PromptTask<PromptResult> resultTask) {
145+
String exitMsg = "\n=== ACP session completed (after " + elapsed + "ms) ===\n";
146+
consoleManager.printSystem(exitMsg);
147+
148+
context.setExecutionTimeMs(elapsed);
149+
String finalText = accumulatedResponse.toString();
150+
if (!finalText.isEmpty()) {
151+
context.setAiMessage(AiMessage.from(finalText));
152+
}
153+
154+
if (viewController != null) {
155+
viewController.updateAiMessageContent(context);
156+
viewController.markMCPLogsAsCompleted(context.getId());
157+
}
158+
159+
ChatMemoryManager.getInstance().addAiResponse(context);
160+
161+
// Persist to conversation history. ACP previously never published this event, so every
162+
// ACP run — success or failure — was absent from history; align it with the
163+
// Streaming/NonStreaming/CLI strategies which all persist here.
164+
project.getMessageBus()
165+
.syncPublisher(AppTopics.CONVERSATION_TOPIC)
166+
.onNewConversation(context);
167+
168+
resultTask.complete(PromptResult.success(context));
169+
}
170+
171+
private void finalizeAcpError(@NotNull Exception e,
172+
@NotNull StringBuilder accumulatedResponse,
173+
long startTime,
174+
@NotNull CliConsoleManager consoleManager,
175+
@NotNull ChatMessageContext context,
176+
@NotNull PromptTask<PromptResult> resultTask) {
177+
consoleManager.printError("[ACP] Error: " + e.getMessage());
178+
179+
// Persist any answer already streamed before the failure so the run survives in history.
180+
persistPartialResponseOnError(context, accumulatedResponse.toString(), startTime);
181+
182+
resultTask.complete(PromptResult.failure(context, e));
183+
}
184+
156185
@Override
157186
public void cancel() {
158187
log.info("Cancelling ACP Runner strategy");

src/main/java/com/devoxx/genie/service/prompt/strategy/CliPromptStrategy.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -249,7 +249,8 @@ private void finalizeProcessResult(@NotNull ProcessOutcome outcome,
249249
finalizeSuccess(exitMsg, outcome.startTime(), outcome.accumulatedResponse(), consoleManager,
250250
viewController, context, resultTask);
251251
} else {
252-
finalizeError(outcome.exitCode(), exitMsg, outcome.stderrLines(), consoleManager, context, resultTask);
252+
finalizeError(outcome.exitCode(), exitMsg, outcome.stderrLines(),
253+
outcome.accumulatedResponse(), outcome.startTime(), consoleManager, context, resultTask);
253254
}
254255
});
255256
}
@@ -304,10 +305,18 @@ private void finalizeSuccess(@NotNull String exitMsg,
304305
private void finalizeError(int exitCode,
305306
@NotNull String exitMsg,
306307
@NotNull List<String> stderrLines,
308+
@NotNull StringBuilder accumulatedResponse,
309+
long startTime,
307310
@NotNull CliConsoleManager consoleManager,
308311
@NotNull ChatMessageContext context,
309312
@NotNull PromptTask<PromptResult> resultTask) {
310313
consoleManager.printError(exitMsg);
314+
315+
// A CLI tool can stream a full, visible answer and still exit non-zero (e.g. a trailing
316+
// cleanup error). Persist whatever was already shown so the run survives in conversation
317+
// history instead of vanishing — matching the streaming/onError behaviour.
318+
persistPartialResponseOnError(context, accumulatedResponse.toString(), startTime);
319+
311320
String errorOutput = String.join("\n", stderrLines).trim();
312321
resultTask.complete(PromptResult.failure(context,
313322
new RuntimeException("CLI process exited with code " + exitCode +

src/test/java/com/devoxx/genie/service/prompt/response/streaming/StreamingResponseHandlerTest.java

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -672,4 +672,63 @@ void onError_withNullWebViewController_doesNotThrow() {
672672

673673
assertThat(onErrorCalled.get()).isTrue();
674674
}
675+
676+
// --- Persist partial answer on error (bug: agent/streaming runs that error mid-loop
677+
// were never saved to conversation history, even after streaming a visible answer) ---
678+
679+
@Test
680+
void onError_afterStreamingAnswer_persistsConversationToHistory() {
681+
StreamingResponseHandler handler = createHandler();
682+
683+
// Agent mode: the model streams a visible answer (turn 1)...
684+
handler.onPartialResponse("Here is the answer you asked for.");
685+
// ...then a later request in the tool loop fails (e.g. NVIDIA 404 / ConnectException).
686+
handler.onError(new RuntimeException("{\"status\":404,\"title\":\"Not Found\"}"));
687+
688+
// The run produced a visible answer, so it must be persisted (published on the
689+
// CONVERSATION_TOPIC) — otherwise it silently vanishes from conversation history.
690+
verify(mockProject.getMessageBus().syncPublisher(AppTopics.CONVERSATION_TOPIC))
691+
.onNewConversation(mockContext);
692+
}
693+
694+
@Test
695+
void onError_afterStreamingAnswer_setsAccumulatedTextAsAiMessageBeforePersisting() {
696+
StreamingResponseHandler handler = createHandler();
697+
698+
handler.onPartialResponse("Partial answer ");
699+
handler.onPartialResponse("before the error.");
700+
handler.onError(new RuntimeException("boom"));
701+
702+
// The persisted AI message must carry the full accumulated text, so history shows
703+
// the answer the user actually saw (not an empty AI turn).
704+
ArgumentCaptor<AiMessage> captor = ArgumentCaptor.forClass(AiMessage.class);
705+
verify(mockContext, atLeastOnce()).setAiMessage(captor.capture());
706+
assertThat(captor.getValue().text()).isEqualTo("Partial answer before the error.");
707+
}
708+
709+
@Test
710+
void onError_withoutAnyStreamedText_doesNotPersist() {
711+
StreamingResponseHandler handler = createHandler();
712+
713+
// The provider errored before producing any text — there is nothing worth saving.
714+
handler.onError(new RuntimeException("Immediate connection failure"));
715+
716+
verify(mockProject.getMessageBus().syncPublisher(AppTopics.CONVERSATION_TOPIC), never())
717+
.onNewConversation(any());
718+
}
719+
720+
@Test
721+
void onError_afterStop_doesNotPersist() {
722+
StreamingResponseHandler handler = createHandler();
723+
handler.onPartialResponse("Some partial text");
724+
725+
// A user-initiated stop already finalizes the message; a trailing provider error
726+
// must not re-persist it as a fresh conversation.
727+
handler.stop();
728+
clearInvocations(mockMessageBus);
729+
730+
handler.onError(new RuntimeException("late error after stop"));
731+
732+
verify(mockMessageBus, never()).syncPublisher(AppTopics.CONVERSATION_TOPIC);
733+
}
675734
}

0 commit comments

Comments
 (0)