Skip to content

Commit a967615

Browse files
authored
feat: add /x/acpio package (#188)
Relates to coder/internal#1333 Adds /x/acpio package
1 parent 25cbbcb commit a967615

File tree

7 files changed

+1327
-0
lines changed

7 files changed

+1327
-0
lines changed

go.mod

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ require (
66
github.com/ActiveState/termtest/xpty v0.6.0
77
github.com/acarl005/stripansi v0.0.0-20180116102854-5a71ef0e047d
88
github.com/charmbracelet/bubbletea v1.3.4
9+
github.com/coder/acp-go-sdk v0.6.3
910
github.com/coder/agentapi-sdk-go v0.0.0-20250505131810-560d1d88d225
1011
github.com/coder/quartz v0.1.2
1112
github.com/danielgtaylor/huma/v2 v2.32.0
@@ -15,6 +16,7 @@ require (
1516
github.com/spf13/viper v1.20.1
1617
github.com/stretchr/testify v1.11.1
1718
github.com/tmaxmax/go-sse v0.10.0
19+
go.uber.org/goleak v1.3.0
1820
golang.org/x/term v0.30.0
1921
golang.org/x/xerrors v0.0.0-20240903120638-7835f813f4da
2022
)

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,8 @@ github.com/ckaznocha/intrange v0.3.1 h1:j1onQyXvHUsPWujDH6WIjhyH26gkRt/txNlV7Lsp
163163
github.com/ckaznocha/intrange v0.3.1/go.mod h1:QVepyz1AkUoFQkpEqksSYpNpUo3c5W7nWh/s6SHIJJk=
164164
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
165165
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
166+
github.com/coder/acp-go-sdk v0.6.3 h1:LsXQytehdjKIYJnoVWON/nf7mqbiarnyuyE3rrjBsXQ=
167+
github.com/coder/acp-go-sdk v0.6.3/go.mod h1:yKzM/3R9uELp4+nBAwwtkS0aN1FOFjo11CNPy37yFko=
166168
github.com/coder/agentapi-sdk-go v0.0.0-20250505131810-560d1d88d225 h1:tRIViZ5JRmzdOEo5wUWngaGEFBG8OaE1o2GIHN5ujJ8=
167169
github.com/coder/agentapi-sdk-go v0.0.0-20250505131810-560d1d88d225/go.mod h1:rNLVpYgEVeu1Zk29K64z6Od8RBP9DwqCu9OfCzh8MR4=
168170
github.com/coder/paralleltestctx v0.0.1 h1:eauyehej1XYTGwgzGWMTjeRIVgOpU6XLPNVb2oi6kDs=

x/acpio/acp_conversation.go

Lines changed: 262 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,262 @@
1+
package acpio
2+
3+
import (
4+
"context"
5+
"log/slog"
6+
"slices"
7+
"strings"
8+
"sync"
9+
10+
st "github.com/coder/agentapi/lib/screentracker"
11+
"github.com/coder/quartz"
12+
)
13+
14+
// Compile-time assertion that ACPConversation implements st.Conversation
15+
var _ st.Conversation = (*ACPConversation)(nil)
16+
17+
// ChunkableAgentIO extends AgentIO with chunk callback support for streaming responses.
18+
// This interface is what ACPConversation needs from its AgentIO implementation.
19+
type ChunkableAgentIO interface {
20+
st.AgentIO
21+
SetOnChunk(fn func(chunk string))
22+
}
23+
24+
// ACPConversation tracks conversations with ACP-based agents.
25+
// Unlike PTY-based Conversation, ACP has blocking writes where the
26+
// response is complete when Write() returns.
27+
type ACPConversation struct {
28+
mu sync.Mutex
29+
ctx context.Context
30+
cancel context.CancelFunc
31+
agentIO ChunkableAgentIO
32+
messages []st.ConversationMessage
33+
nextID int // monotonically increasing message ID
34+
prompting bool // true while agent is processing
35+
streamingResponse strings.Builder
36+
logger *slog.Logger
37+
emitter st.Emitter
38+
initialPrompt []st.MessagePart
39+
clock quartz.Clock
40+
}
41+
42+
// noopEmitter is a no-op implementation of Emitter for when no emitter is provided.
43+
type noopEmitter struct{}
44+
45+
func (noopEmitter) EmitMessages([]st.ConversationMessage) {}
46+
func (noopEmitter) EmitStatus(st.ConversationStatus) {}
47+
func (noopEmitter) EmitScreen(string) {}
48+
49+
// NewACPConversation creates a new ACPConversation.
50+
// If emitter is provided, it will receive events when messages/status/screen change.
51+
// If clock is nil, a real clock will be used.
52+
func NewACPConversation(ctx context.Context, agentIO ChunkableAgentIO, logger *slog.Logger, initialPrompt []st.MessagePart, emitter st.Emitter, clock quartz.Clock) *ACPConversation {
53+
if logger == nil {
54+
logger = slog.Default()
55+
}
56+
if clock == nil {
57+
clock = quartz.NewReal()
58+
}
59+
if emitter == nil {
60+
emitter = noopEmitter{}
61+
}
62+
ctx, cancel := context.WithCancel(ctx)
63+
c := &ACPConversation{
64+
ctx: ctx,
65+
cancel: cancel,
66+
agentIO: agentIO,
67+
logger: logger,
68+
initialPrompt: initialPrompt,
69+
emitter: emitter,
70+
clock: clock,
71+
}
72+
return c
73+
}
74+
75+
// Messages returns the conversation history.
76+
func (c *ACPConversation) Messages() []st.ConversationMessage {
77+
c.mu.Lock()
78+
defer c.mu.Unlock()
79+
return slices.Clone(c.messages)
80+
}
81+
82+
// Send sends a message to the agent synchronously.
83+
// It blocks until the agent has finished processing and returns any error
84+
// from the underlying write. Returns a validation error immediately if
85+
// the message is invalid or another message is already being processed.
86+
func (c *ACPConversation) Send(messageParts ...st.MessagePart) error {
87+
message := ""
88+
for _, part := range messageParts {
89+
message += part.String()
90+
}
91+
92+
// Validate whitespace BEFORE trimming (match PTY behavior)
93+
if message != strings.TrimSpace(message) {
94+
return st.ErrMessageValidationWhitespace
95+
}
96+
97+
if message == "" {
98+
return st.ErrMessageValidationEmpty
99+
}
100+
101+
// Check if already prompting and set state atomically
102+
c.mu.Lock()
103+
if c.prompting {
104+
c.mu.Unlock()
105+
return st.ErrMessageValidationChanging
106+
}
107+
c.messages = append(c.messages, st.ConversationMessage{
108+
Id: c.nextID,
109+
Role: st.ConversationRoleUser,
110+
Message: message,
111+
Time: c.clock.Now(),
112+
})
113+
c.nextID++
114+
// Add placeholder for streaming agent response
115+
c.messages = append(c.messages, st.ConversationMessage{
116+
Id: c.nextID,
117+
Role: st.ConversationRoleAgent,
118+
Message: "",
119+
Time: c.clock.Now(),
120+
})
121+
c.nextID++
122+
c.streamingResponse.Reset()
123+
c.prompting = true
124+
status := c.statusLocked()
125+
c.mu.Unlock()
126+
127+
// Emit status change to "running" before starting the prompt
128+
c.emitter.EmitStatus(status)
129+
130+
c.logger.Debug("ACPConversation sending message", "message", message)
131+
132+
return c.executePrompt(messageParts)
133+
}
134+
135+
// Start sets up chunk handling and sends the initial prompt if provided.
136+
func (c *ACPConversation) Start(ctx context.Context) {
137+
// Wire up the chunk callback for streaming
138+
c.agentIO.SetOnChunk(c.handleChunk)
139+
140+
// Send initial prompt if provided
141+
if len(c.initialPrompt) > 0 {
142+
// Run in a goroutine because Send blocks until the prompt completes,
143+
// and Start must return immediately per the Conversation interface.
144+
go func() {
145+
err := c.Send(c.initialPrompt...)
146+
if err != nil {
147+
c.logger.Error("ACPConversation failed to send initial prompt", "error", err)
148+
}
149+
}()
150+
} else {
151+
// No initial prompt means we start in stable state
152+
c.emitter.EmitStatus(c.Status())
153+
}
154+
}
155+
156+
// Status returns the current conversation status.
157+
func (c *ACPConversation) Status() st.ConversationStatus {
158+
c.mu.Lock()
159+
defer c.mu.Unlock()
160+
return c.statusLocked()
161+
}
162+
163+
// statusLocked returns the status without acquiring the lock (caller must hold lock).
164+
func (c *ACPConversation) statusLocked() st.ConversationStatus {
165+
if c.prompting {
166+
return st.ConversationStatusChanging // agent is processing
167+
}
168+
return st.ConversationStatusStable
169+
}
170+
171+
// Stop cancels any in-progress operations.
172+
func (c *ACPConversation) Stop() {
173+
c.cancel()
174+
}
175+
176+
// Text returns the current streaming response text.
177+
func (c *ACPConversation) Text() string {
178+
c.mu.Lock()
179+
defer c.mu.Unlock()
180+
return c.streamingResponse.String()
181+
}
182+
183+
// handleChunk is called for each streaming chunk from the agent.
184+
func (c *ACPConversation) handleChunk(chunk string) {
185+
c.mu.Lock()
186+
// Log and discard chunks that arrive after the prompt has completed or errored.
187+
// This should not happen under normal operation — if it does, it indicates a
188+
// bug in the ACP SDK or a race in the connection teardown.
189+
if !c.prompting {
190+
c.mu.Unlock()
191+
c.logger.Error("received chunk while not prompting (late/unexpected chunk discarded)",
192+
"chunkLen", len(chunk))
193+
return
194+
}
195+
c.streamingResponse.WriteString(chunk)
196+
// Only update the last message if it's the agent placeholder (defense-in-depth)
197+
if len(c.messages) > 0 && c.messages[len(c.messages)-1].Role == st.ConversationRoleAgent {
198+
c.messages[len(c.messages)-1].Message = c.streamingResponse.String()
199+
}
200+
messages := slices.Clone(c.messages)
201+
status := c.statusLocked()
202+
screen := c.streamingResponse.String()
203+
c.mu.Unlock()
204+
205+
c.emitter.EmitMessages(messages)
206+
c.emitter.EmitStatus(status)
207+
c.emitter.EmitScreen(screen)
208+
}
209+
210+
// executePrompt runs the actual agent request and returns any error.
211+
func (c *ACPConversation) executePrompt(messageParts []st.MessagePart) error {
212+
var err error
213+
for _, part := range messageParts {
214+
if c.ctx.Err() != nil {
215+
err = c.ctx.Err()
216+
break
217+
}
218+
if partErr := part.Do(c.agentIO); partErr != nil {
219+
err = partErr
220+
break
221+
}
222+
}
223+
224+
c.mu.Lock()
225+
c.prompting = false
226+
227+
if err != nil {
228+
c.logger.Error("ACPConversation message failed", "error", err)
229+
// Remove the agent's streaming message on error (may be empty or partial)
230+
if len(c.messages) > 0 && c.messages[len(c.messages)-1].Role == st.ConversationRoleAgent {
231+
c.messages = c.messages[:len(c.messages)-1]
232+
}
233+
messages := slices.Clone(c.messages)
234+
status := c.statusLocked()
235+
screen := c.streamingResponse.String()
236+
c.mu.Unlock()
237+
238+
c.emitter.EmitMessages(messages)
239+
c.emitter.EmitStatus(status)
240+
c.emitter.EmitScreen(screen)
241+
return err
242+
}
243+
244+
// Final response should already be in the last message via streaming
245+
// but ensure it's finalized
246+
response := c.streamingResponse.String()
247+
if len(c.messages) > 0 && c.messages[len(c.messages)-1].Role == st.ConversationRoleAgent {
248+
// Intentionally not trimming space here.
249+
c.messages[len(c.messages)-1].Message = response
250+
}
251+
messages := slices.Clone(c.messages)
252+
status := c.statusLocked()
253+
screen := c.streamingResponse.String()
254+
c.mu.Unlock()
255+
256+
c.emitter.EmitMessages(messages)
257+
c.emitter.EmitStatus(status)
258+
c.emitter.EmitScreen(screen)
259+
260+
c.logger.Debug("ACPConversation message complete", "responseLen", len(response))
261+
return nil
262+
}

0 commit comments

Comments
 (0)