Skip to content

Commit dda3717

Browse files
committed
fix: do not corrupt user message if late chunk arrives after error
1 parent 19b3a83 commit dda3717

2 files changed

Lines changed: 49 additions & 2 deletions

File tree

x/acpio/acp_conversation.go

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -179,9 +179,18 @@ func (c *ACPConversation) Text() string {
179179
// handleChunk is called for each streaming chunk from the agent.
180180
func (c *ACPConversation) handleChunk(chunk string) {
181181
c.mu.Lock()
182+
// Log and discard chunks that arrive after the prompt has completed or errored.
183+
// This should not happen under normal operation — if it does, it indicates a
184+
// bug in the ACP SDK or a race in the connection teardown.
185+
if !c.prompting {
186+
c.mu.Unlock()
187+
c.logger.Error("received chunk while not prompting (late/unexpected chunk discarded)",
188+
"chunkLen", len(chunk))
189+
return
190+
}
182191
c.streamingResponse.WriteString(chunk)
183-
// Update the last message (the streaming agent response)
184-
if len(c.messages) > 0 {
192+
// Only update the last message if it's the agent placeholder (defense-in-depth)
193+
if len(c.messages) > 0 && c.messages[len(c.messages)-1].Role == st.ConversationRoleAgent {
185194
c.messages[len(c.messages)-1].Message = c.streamingResponse.String()
186195
}
187196
messages := slices.Clone(c.messages)

x/acpio/acp_conversation_test.go

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -504,3 +504,41 @@ func Test_ErrorRemovesPartialMessage(t *testing.T) {
504504
assert.Equal(t, screentracker.ConversationRoleUser, messages[0].Role)
505505
assert.Equal(t, "test", messages[0].Message)
506506
}
507+
508+
func Test_LateChunkAfterError_DoesNotCorruptUserMessage(t *testing.T) {
509+
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
510+
defer cancel()
511+
512+
mClock := quartz.NewMock(t)
513+
mock := newMockAgentIO()
514+
emitter := newMockEmitter()
515+
started, done := mock.BlockWrite()
516+
517+
conv := acpio.NewACPConversation(ctx, mock, nil, nil, emitter, mClock)
518+
conv.Start(ctx)
519+
520+
// Given: a send that fails with an error, removing the agent placeholder
521+
err := conv.Send(screentracker.MessagePartText{Content: "hello"})
522+
require.NoError(t, err)
523+
<-started
524+
525+
mock.mu.Lock()
526+
mock.writeErr = assert.AnError
527+
mock.mu.Unlock()
528+
close(done)
529+
530+
emitter.WaitForStatus(ctx, t, screentracker.ConversationStatusStable)
531+
532+
messages := conv.Messages()
533+
require.Len(t, messages, 1, "agent placeholder should be removed after error")
534+
assert.Equal(t, "hello", messages[0].Message)
535+
536+
// When: a late chunk arrives after the prompt has already errored
537+
mock.SimulateChunks("late response data")
538+
539+
// Then: the user message is not corrupted
540+
messages = conv.Messages()
541+
require.Len(t, messages, 1, "no new messages should appear from a late chunk")
542+
assert.Equal(t, "hello", messages[0].Message)
543+
assert.Equal(t, screentracker.ConversationRoleUser, messages[0].Role)
544+
}

0 commit comments

Comments
 (0)