Skip to content

Commit a33d95e

Browse files
committed
fix race in ACPConversation: wait for chunk after Prompt() returns
1 parent 7ee60ac commit a33d95e

1 file changed

Lines changed: 23 additions & 1 deletion

File tree

x/acpio/acp_conversation.go

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"slices"
77
"strings"
88
"sync"
9+
"time"
910

1011
st "github.com/coder/agentapi/lib/screentracker"
1112
"github.com/coder/quartz"
@@ -31,7 +32,8 @@ type ACPConversation struct {
3132
agentIO ChunkableAgentIO
3233
messages []st.ConversationMessage
3334
nextID int // monotonically increasing message ID
34-
prompting bool // true while agent is processing
35+
prompting bool // true while agent is processing
36+
chunkReceived chan struct{} // signals that handleChunk has accumulated a chunk
3537
streamingResponse strings.Builder
3638
logger *slog.Logger
3739
emitter st.Emitter
@@ -68,6 +70,7 @@ func NewACPConversation(ctx context.Context, agentIO ChunkableAgentIO, logger *s
6870
initialPrompt: initialPrompt,
6971
emitter: emitter,
7072
clock: clock,
73+
chunkReceived: make(chan struct{}, 1),
7174
}
7275
return c
7376
}
@@ -202,13 +205,25 @@ func (c *ACPConversation) handleChunk(chunk string) {
202205
screen := c.streamingResponse.String()
203206
c.mu.Unlock()
204207

208+
// Signal that a chunk has been received (non-blocking; a pending signal is sufficient).
209+
select {
210+
case c.chunkReceived <- struct{}{}:
211+
default:
212+
}
213+
205214
c.emitter.EmitMessages(messages)
206215
c.emitter.EmitStatus(status)
207216
c.emitter.EmitScreen(screen)
208217
}
209218

210219
// executePrompt runs the actual agent request and returns any error.
211220
func (c *ACPConversation) executePrompt(messageParts []st.MessagePart) error {
221+
// Drain any stale signal before sending the prompt.
222+
select {
223+
case <-c.chunkReceived:
224+
default:
225+
}
226+
212227
var err error
213228
for _, part := range messageParts {
214229
if c.ctx.Err() != nil {
@@ -221,6 +236,13 @@ func (c *ACPConversation) executePrompt(messageParts []st.MessagePart) error {
221236
}
222237
}
223238

239+
// The ACP SDK dispatches SessionUpdate notifications as goroutines, so
240+
// the chunk may arrive after conn.Prompt() returns. Wait up to 100ms.
241+
select {
242+
case <-c.chunkReceived:
243+
case <-time.After(100 * time.Millisecond):
244+
}
245+
224246
c.mu.Lock()
225247
c.prompting = false
226248

0 commit comments

Comments
 (0)