Skip to content

Commit 0ff03dc

Browse files
committed
feat: add /x/acpio package
1 parent 25cbbcb commit 0ff03dc

5 files changed

Lines changed: 963 additions & 0 deletions

File tree

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ require (
66
github.com/ActiveState/termtest/xpty v0.6.0
77
github.com/acarl005/stripansi v0.0.0-20180116102854-5a71ef0e047d
88
github.com/charmbracelet/bubbletea v1.3.4
9+
github.com/coder/acp-go-sdk v0.6.3
910
github.com/coder/agentapi-sdk-go v0.0.0-20250505131810-560d1d88d225
1011
github.com/coder/quartz v0.1.2
1112
github.com/danielgtaylor/huma/v2 v2.32.0

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,8 @@ github.com/ckaznocha/intrange v0.3.1 h1:j1onQyXvHUsPWujDH6WIjhyH26gkRt/txNlV7Lsp
163163
github.com/ckaznocha/intrange v0.3.1/go.mod h1:QVepyz1AkUoFQkpEqksSYpNpUo3c5W7nWh/s6SHIJJk=
164164
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
165165
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
166+
github.com/coder/acp-go-sdk v0.6.3 h1:LsXQytehdjKIYJnoVWON/nf7mqbiarnyuyE3rrjBsXQ=
167+
github.com/coder/acp-go-sdk v0.6.3/go.mod h1:yKzM/3R9uELp4+nBAwwtkS0aN1FOFjo11CNPy37yFko=
166168
github.com/coder/agentapi-sdk-go v0.0.0-20250505131810-560d1d88d225 h1:tRIViZ5JRmzdOEo5wUWngaGEFBG8OaE1o2GIHN5ujJ8=
167169
github.com/coder/agentapi-sdk-go v0.0.0-20250505131810-560d1d88d225/go.mod h1:rNLVpYgEVeu1Zk29K64z6Od8RBP9DwqCu9OfCzh8MR4=
168170
github.com/coder/paralleltestctx v0.0.1 h1:eauyehej1XYTGwgzGWMTjeRIVgOpU6XLPNVb2oi6kDs=

x/acpio/acp_conversation.go

Lines changed: 247 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,247 @@
1+
package acpio
2+
3+
import (
4+
"context"
5+
"log/slog"
6+
"slices"
7+
"strings"
8+
"sync"
9+
10+
st "github.com/coder/agentapi/lib/screentracker"
11+
"github.com/coder/quartz"
12+
)
13+
14+
// Compile-time assertion that ACPConversation implements st.Conversation
15+
var _ st.Conversation = (*ACPConversation)(nil)
16+
17+
// ChunkableAgentIO extends AgentIO with chunk callback support for streaming responses.
18+
// This interface is what ACPConversation needs from its AgentIO implementation.
19+
type ChunkableAgentIO interface {
20+
st.AgentIO
21+
SetOnChunk(fn func(chunk string))
22+
}
23+
24+
// ACPConversation tracks conversations with ACP-based agents.
25+
// Unlike PTY-based Conversation, ACP has blocking writes where the
26+
// response is complete when Write() returns.
27+
type ACPConversation struct {
28+
mu sync.Mutex
29+
ctx context.Context
30+
cancel context.CancelFunc
31+
agentIO ChunkableAgentIO
32+
messages []st.ConversationMessage
33+
prompting bool // true while agent is processing
34+
streamingResponse strings.Builder
35+
logger *slog.Logger
36+
emitter st.Emitter
37+
initialPrompt []st.MessagePart
38+
clock quartz.Clock
39+
}
40+
41+
// noopEmitter is a no-op implementation of Emitter for when no emitter is provided.
42+
type noopEmitter struct{}
43+
44+
func (noopEmitter) EmitMessages([]st.ConversationMessage) {}
45+
func (noopEmitter) EmitStatus(st.ConversationStatus) {}
46+
func (noopEmitter) EmitScreen(string) {}
47+
48+
// NewACPConversation creates a new ACPConversation.
49+
// If emitter is provided, it will receive events when messages/status/screen change.
50+
// If clock is nil, a real clock will be used.
51+
func NewACPConversation(ctx context.Context, agentIO ChunkableAgentIO, logger *slog.Logger, initialPrompt []st.MessagePart, emitter st.Emitter, clock quartz.Clock) *ACPConversation {
52+
if logger == nil {
53+
logger = slog.Default()
54+
}
55+
if clock == nil {
56+
clock = quartz.NewReal()
57+
}
58+
if emitter == nil {
59+
emitter = noopEmitter{}
60+
}
61+
ctx, cancel := context.WithCancel(ctx)
62+
c := &ACPConversation{
63+
ctx: ctx,
64+
cancel: cancel,
65+
agentIO: agentIO,
66+
logger: logger,
67+
initialPrompt: initialPrompt,
68+
emitter: emitter,
69+
clock: clock,
70+
}
71+
return c
72+
}
73+
74+
// Messages returns the conversation history.
75+
func (c *ACPConversation) Messages() []st.ConversationMessage {
76+
c.mu.Lock()
77+
defer c.mu.Unlock()
78+
return slices.Clone(c.messages)
79+
}
80+
81+
// Send sends a message to the agent asynchronously.
82+
// It returns immediately after recording the user message and starts
83+
// the agent request in a background goroutine. Returns an error if
84+
// a message is already being processed.
85+
func (c *ACPConversation) Send(messageParts ...st.MessagePart) error {
86+
message := ""
87+
for _, part := range messageParts {
88+
message += part.String()
89+
}
90+
91+
// Validate whitespace BEFORE trimming (match PTY behavior)
92+
if message != strings.TrimSpace(message) {
93+
return st.ErrMessageValidationWhitespace
94+
}
95+
96+
if message == "" {
97+
return st.ErrMessageValidationEmpty
98+
}
99+
100+
// Check if already prompting and set state atomically
101+
c.mu.Lock()
102+
if c.prompting {
103+
c.mu.Unlock()
104+
return st.ErrMessageValidationChanging
105+
}
106+
c.messages = append(c.messages, st.ConversationMessage{
107+
Id: len(c.messages),
108+
Role: st.ConversationRoleUser,
109+
Message: message,
110+
Time: c.clock.Now(),
111+
})
112+
// Add placeholder for streaming agent response
113+
c.messages = append(c.messages, st.ConversationMessage{
114+
Id: len(c.messages),
115+
Role: st.ConversationRoleAgent,
116+
Message: "",
117+
Time: c.clock.Now(),
118+
})
119+
c.streamingResponse.Reset()
120+
c.prompting = true
121+
status := c.statusLocked()
122+
c.mu.Unlock()
123+
124+
// Emit status change to "running" before starting the prompt
125+
c.emitter.EmitStatus(status)
126+
127+
c.logger.Debug("ACPConversation sending message", "message", message)
128+
129+
// Run the blocking write in a goroutine so HTTP returns immediately
130+
go c.executePrompt(messageParts)
131+
132+
return nil
133+
}
134+
135+
// Start sets up chunk handling and sends the initial prompt if provided.
136+
func (c *ACPConversation) Start(ctx context.Context) {
137+
// Wire up the chunk callback for streaming
138+
c.agentIO.SetOnChunk(c.handleChunk)
139+
140+
// Send initial prompt if provided
141+
if len(c.initialPrompt) > 0 {
142+
err := c.Send(c.initialPrompt...)
143+
if err != nil {
144+
c.logger.Error("ACPConversation failed to send initial prompt", "error", err)
145+
}
146+
} else {
147+
// No initial prompt means we start in stable state
148+
c.emitter.EmitStatus(c.Status())
149+
}
150+
}
151+
152+
// Status returns the current conversation status.
153+
func (c *ACPConversation) Status() st.ConversationStatus {
154+
c.mu.Lock()
155+
defer c.mu.Unlock()
156+
return c.statusLocked()
157+
}
158+
159+
// statusLocked returns the status without acquiring the lock (caller must hold lock).
160+
func (c *ACPConversation) statusLocked() st.ConversationStatus {
161+
if c.prompting {
162+
return st.ConversationStatusChanging // agent is processing
163+
}
164+
return st.ConversationStatusStable
165+
}
166+
167+
// Stop cancels any in-progress operations.
168+
func (c *ACPConversation) Stop() {
169+
c.cancel()
170+
}
171+
172+
// Text returns the current streaming response text.
173+
func (c *ACPConversation) Text() string {
174+
c.mu.Lock()
175+
defer c.mu.Unlock()
176+
return c.streamingResponse.String()
177+
}
178+
179+
// handleChunk is called for each streaming chunk from the agent.
180+
func (c *ACPConversation) handleChunk(chunk string) {
181+
c.mu.Lock()
182+
c.streamingResponse.WriteString(chunk)
183+
// Update the last message (the streaming agent response)
184+
if len(c.messages) > 0 {
185+
c.messages[len(c.messages)-1].Message = c.streamingResponse.String()
186+
}
187+
messages := slices.Clone(c.messages)
188+
status := c.statusLocked()
189+
screen := c.streamingResponse.String()
190+
c.mu.Unlock()
191+
192+
c.emitter.EmitMessages(messages)
193+
c.emitter.EmitStatus(status)
194+
c.emitter.EmitScreen(screen)
195+
}
196+
197+
// executePrompt runs the actual agent request in background
198+
func (c *ACPConversation) executePrompt(messageParts []st.MessagePart) {
199+
var err error
200+
for _, part := range messageParts {
201+
if c.ctx.Err() != nil {
202+
err = c.ctx.Err()
203+
break
204+
}
205+
if partErr := part.Do(c.agentIO); partErr != nil {
206+
err = partErr
207+
break
208+
}
209+
}
210+
211+
c.mu.Lock()
212+
c.prompting = false
213+
214+
if err != nil {
215+
c.logger.Error("ACPConversation message failed", "error", err)
216+
// Remove the agent's streaming message on error (may be empty or partial)
217+
if len(c.messages) > 0 && c.messages[len(c.messages)-1].Role == st.ConversationRoleAgent {
218+
c.messages = c.messages[:len(c.messages)-1]
219+
}
220+
messages := slices.Clone(c.messages)
221+
status := c.statusLocked()
222+
screen := c.streamingResponse.String()
223+
c.mu.Unlock()
224+
225+
c.emitter.EmitMessages(messages)
226+
c.emitter.EmitStatus(status)
227+
c.emitter.EmitScreen(screen)
228+
return
229+
}
230+
231+
// Final response should already be in the last message via streaming
232+
// but ensure it's finalized
233+
response := c.streamingResponse.String()
234+
if len(c.messages) > 0 && c.messages[len(c.messages)-1].Role == st.ConversationRoleAgent {
235+
c.messages[len(c.messages)-1].Message = strings.TrimSpace(response)
236+
}
237+
messages := slices.Clone(c.messages)
238+
status := c.statusLocked()
239+
screen := c.streamingResponse.String()
240+
c.mu.Unlock()
241+
242+
c.emitter.EmitMessages(messages)
243+
c.emitter.EmitStatus(status)
244+
c.emitter.EmitScreen(screen)
245+
246+
c.logger.Debug("ACPConversation message complete", "responseLen", len(response))
247+
}

0 commit comments

Comments
 (0)