Skip to content

Commit 25cbbcb

Browse files
authored
chore: Replace OnSnapshot callback with Emitter interface (#185)
1 parent c171c14 commit 25cbbcb

File tree

6 files changed

+102
-71
lines changed

6 files changed

+102
-71
lines changed

lib/httpapi/events.go

Lines changed: 35 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ type EventEmitter struct {
6464
agentType mf.AgentType
6565
chans map[int]chan Event
6666
chanIdx int
67-
subscriptionBufSize int
67+
subscriptionBufSize uint
6868
screen string
6969
}
7070

@@ -81,20 +81,37 @@ func convertStatus(status st.ConversationStatus) AgentStatus {
8181
}
8282
}
8383

84-
// subscriptionBufSize is the size of the buffer for each subscription.
85-
// Once the buffer is full, the channel will be closed.
86-
// Listeners must actively drain the channel, so it's important to
87-
// set this to a value that is large enough to handle the expected
88-
// number of events.
89-
func NewEventEmitter(subscriptionBufSize int) *EventEmitter {
90-
return &EventEmitter{
91-
mu: sync.Mutex{},
84+
const defaultSubscriptionBufSize uint = 1024
85+
86+
type EventEmitterOption func(*EventEmitter)
87+
88+
func WithSubscriptionBufSize(size uint) EventEmitterOption {
89+
return func(e *EventEmitter) {
90+
if size == 0 {
91+
e.subscriptionBufSize = defaultSubscriptionBufSize
92+
} else {
93+
e.subscriptionBufSize = size
94+
}
95+
}
96+
}
97+
98+
func WithAgentType(agentType mf.AgentType) EventEmitterOption {
99+
return func(e *EventEmitter) {
100+
e.agentType = agentType
101+
}
102+
}
103+
104+
func NewEventEmitter(opts ...EventEmitterOption) *EventEmitter {
105+
e := &EventEmitter{
92106
messages: make([]st.ConversationMessage, 0),
93107
status: AgentStatusRunning,
94108
chans: make(map[int]chan Event),
95-
chanIdx: 0,
96-
subscriptionBufSize: subscriptionBufSize,
109+
subscriptionBufSize: defaultSubscriptionBufSize,
110+
}
111+
for _, opt := range opts {
112+
opt(e)
97113
}
114+
return e
98115
}
99116

100117
// Assumes the caller holds the lock.
@@ -122,7 +139,7 @@ func (e *EventEmitter) notifyChannels(eventType EventType, payload any) {
122139

123140
// Assumes that only the last message can change or new messages can be added.
124141
// If a new message is injected between existing messages (identified by Id), the behavior is undefined.
125-
func (e *EventEmitter) UpdateMessagesAndEmitChanges(newMessages []st.ConversationMessage) {
142+
func (e *EventEmitter) EmitMessages(newMessages []st.ConversationMessage) {
126143
e.mu.Lock()
127144
defer e.mu.Unlock()
128145

@@ -137,6 +154,9 @@ func (e *EventEmitter) UpdateMessagesAndEmitChanges(newMessages []st.Conversatio
137154
newMsg = newMessages[i]
138155
}
139156
if oldMsg != newMsg {
157+
if i >= len(newMessages) {
158+
continue
159+
}
140160
e.notifyChannels(EventTypeMessageUpdate, MessageUpdateBody{
141161
Id: newMessages[i].Id,
142162
Role: newMessages[i].Role,
@@ -149,7 +169,7 @@ func (e *EventEmitter) UpdateMessagesAndEmitChanges(newMessages []st.Conversatio
149169
e.messages = newMessages
150170
}
151171

152-
func (e *EventEmitter) UpdateStatusAndEmitChanges(newStatus st.ConversationStatus, agentType mf.AgentType) {
172+
func (e *EventEmitter) EmitStatus(newStatus st.ConversationStatus) {
153173
e.mu.Lock()
154174
defer e.mu.Unlock()
155175

@@ -158,12 +178,11 @@ func (e *EventEmitter) UpdateStatusAndEmitChanges(newStatus st.ConversationStatu
158178
return
159179
}
160180

161-
e.notifyChannels(EventTypeStatusChange, StatusChangeBody{Status: newAgentStatus, AgentType: agentType})
181+
e.notifyChannels(EventTypeStatusChange, StatusChangeBody{Status: newAgentStatus, AgentType: e.agentType})
162182
e.status = newAgentStatus
163-
e.agentType = agentType
164183
}
165184

166-
func (e *EventEmitter) UpdateScreenAndEmitChanges(newScreen string) {
185+
func (e *EventEmitter) EmitScreen(newScreen string) {
167186
e.mu.Lock()
168187
defer e.mu.Unlock()
169188

lib/httpapi/events_test.go

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,13 @@ import (
55
"testing"
66
"time"
77

8-
mf "github.com/coder/agentapi/lib/msgfmt"
98
st "github.com/coder/agentapi/lib/screentracker"
109
"github.com/stretchr/testify/assert"
1110
)
1211

1312
func TestEventEmitter(t *testing.T) {
1413
t.Run("single-subscription", func(t *testing.T) {
15-
emitter := NewEventEmitter(10)
14+
emitter := NewEventEmitter(WithSubscriptionBufSize(10))
1615
_, ch, stateEvents := emitter.Subscribe()
1716
assert.Empty(t, ch)
1817
assert.Equal(t, []Event{
@@ -27,7 +26,7 @@ func TestEventEmitter(t *testing.T) {
2726
}, stateEvents)
2827

2928
now := time.Now()
30-
emitter.UpdateMessagesAndEmitChanges([]st.ConversationMessage{
29+
emitter.EmitMessages([]st.ConversationMessage{
3130
{Id: 1, Message: "Hello, world!", Role: st.ConversationRoleUser, Time: now},
3231
})
3332
newEvent := <-ch
@@ -36,7 +35,7 @@ func TestEventEmitter(t *testing.T) {
3635
Payload: MessageUpdateBody{Id: 1, Message: "Hello, world!", Role: st.ConversationRoleUser, Time: now},
3736
}, newEvent)
3837

39-
emitter.UpdateMessagesAndEmitChanges([]st.ConversationMessage{
38+
emitter.EmitMessages([]st.ConversationMessage{
4039
{Id: 1, Message: "Hello, world! (updated)", Role: st.ConversationRoleUser, Time: now},
4140
{Id: 2, Message: "What's up?", Role: st.ConversationRoleAgent, Time: now},
4241
})
@@ -52,24 +51,24 @@ func TestEventEmitter(t *testing.T) {
5251
Payload: MessageUpdateBody{Id: 2, Message: "What's up?", Role: st.ConversationRoleAgent, Time: now},
5352
}, newEvent)
5453

55-
emitter.UpdateStatusAndEmitChanges(st.ConversationStatusStable, mf.AgentTypeAider)
54+
emitter.EmitStatus(st.ConversationStatusStable)
5655
newEvent = <-ch
5756
assert.Equal(t, Event{
5857
Type: EventTypeStatusChange,
59-
Payload: StatusChangeBody{Status: AgentStatusStable, AgentType: mf.AgentTypeAider},
58+
Payload: StatusChangeBody{Status: AgentStatusStable, AgentType: ""},
6059
}, newEvent)
6160
})
6261

6362
t.Run("multiple-subscriptions", func(t *testing.T) {
64-
emitter := NewEventEmitter(10)
63+
emitter := NewEventEmitter(WithSubscriptionBufSize(10))
6564
channels := make([]<-chan Event, 0, 10)
6665
for i := 0; i < 10; i++ {
6766
_, ch, _ := emitter.Subscribe()
6867
channels = append(channels, ch)
6968
}
7069
now := time.Now()
7170

72-
emitter.UpdateMessagesAndEmitChanges([]st.ConversationMessage{
71+
emitter.EmitMessages([]st.ConversationMessage{
7372
{Id: 1, Message: "Hello, world!", Role: st.ConversationRoleUser, Time: now},
7473
})
7574
for _, ch := range channels {
@@ -82,10 +81,10 @@ func TestEventEmitter(t *testing.T) {
8281
})
8382

8483
t.Run("close-channel", func(t *testing.T) {
85-
emitter := NewEventEmitter(1)
84+
emitter := NewEventEmitter(WithSubscriptionBufSize(1))
8685
_, ch, _ := emitter.Subscribe()
8786
for i := range 5 {
88-
emitter.UpdateMessagesAndEmitChanges([]st.ConversationMessage{
87+
emitter.EmitMessages([]st.ConversationMessage{
8988
{Id: i, Message: fmt.Sprintf("Hello, world! %d", i), Role: st.ConversationRoleUser, Time: time.Now()},
9089
})
9190
}

lib/httpapi/server.go

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -244,7 +244,7 @@ func NewServer(ctx context.Context, config ServerConfig) (*Server, error) {
244244
return mf.FormatToolCall(config.AgentType, message)
245245
}
246246

247-
emitter := NewEventEmitter(1024)
247+
emitter := NewEventEmitter(WithAgentType(config.AgentType))
248248

249249
// Format initial prompt into message parts if provided
250250
var initialPrompt []st.MessagePart
@@ -262,16 +262,8 @@ func NewServer(ctx context.Context, config ServerConfig) (*Server, error) {
262262
ReadyForInitialPrompt: isAgentReadyForInitialPrompt,
263263
FormatToolCall: formatToolCall,
264264
InitialPrompt: initialPrompt,
265-
// OnSnapshot uses a callback rather than passing the emitter directly
266-
// to keep the screentracker package decoupled from httpapi concerns.
267-
// This preserves clean package boundaries and avoids import cycles.
268-
OnSnapshot: func(status st.ConversationStatus, messages []st.ConversationMessage, screen string) {
269-
emitter.UpdateStatusAndEmitChanges(status, config.AgentType)
270-
emitter.UpdateMessagesAndEmitChanges(messages)
271-
emitter.UpdateScreenAndEmitChanges(screen)
272-
},
273-
Logger: logger,
274-
})
265+
Logger: logger,
266+
}, emitter)
275267

276268
// Create temporary directory for uploads
277269
tempDir, err := os.MkdirTemp("", "agentapi-uploads-")

lib/screentracker/conversation.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,13 @@ type Conversation interface {
6565
Text() string
6666
}
6767

68+
// Emitter receives conversation state updates.
69+
type Emitter interface {
70+
EmitMessages([]ConversationMessage)
71+
EmitStatus(ConversationStatus)
72+
EmitScreen(string)
73+
}
74+
6875
type ConversationMessage struct {
6976
Id int
7077
Message string

lib/screentracker/pty_conversation.go

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -68,9 +68,7 @@ type PTYConversationConfig struct {
6868
FormatToolCall func(message string) (string, []string)
6969
// InitialPrompt is the initial prompt to send to the agent once ready
7070
InitialPrompt []MessagePart
71-
// OnSnapshot is called after each snapshot with current status, messages, and screen content
72-
OnSnapshot func(status ConversationStatus, messages []ConversationMessage, screen string)
73-
Logger *slog.Logger
71+
Logger *slog.Logger
7472
}
7573

7674
func (cfg PTYConversationConfig) getStableSnapshotsThreshold() int {
@@ -86,7 +84,8 @@ func (cfg PTYConversationConfig) getStableSnapshotsThreshold() int {
8684
// PTYConversation is a conversation that uses a pseudo-terminal (PTY) for communication.
8785
// It uses a combination of polling and diffs to detect changes in the screen.
8886
type PTYConversation struct {
89-
cfg PTYConversationConfig
87+
cfg PTYConversationConfig
88+
emitter Emitter
9089
// How many stable snapshots are required to consider the screen stable
9190
stableSnapshotsThreshold int
9291
snapshotBuffer *RingBuffer[screenSnapshot]
@@ -115,13 +114,23 @@ type PTYConversation struct {
115114

116115
var _ Conversation = &PTYConversation{}
117116

118-
func NewPTY(ctx context.Context, cfg PTYConversationConfig) *PTYConversation {
117+
type noopEmitter struct{}
118+
119+
func (noopEmitter) EmitMessages([]ConversationMessage) {}
120+
func (noopEmitter) EmitStatus(ConversationStatus) {}
121+
func (noopEmitter) EmitScreen(string) {}
122+
123+
func NewPTY(ctx context.Context, cfg PTYConversationConfig, emitter Emitter) *PTYConversation {
119124
if cfg.Clock == nil {
120125
cfg.Clock = quartz.NewReal()
121126
}
127+
if emitter == nil {
128+
emitter = noopEmitter{}
129+
}
122130
threshold := cfg.getStableSnapshotsThreshold()
123131
c := &PTYConversation{
124132
cfg: cfg,
133+
emitter: emitter,
125134
stableSnapshotsThreshold: threshold,
126135
snapshotBuffer: NewRingBuffer[screenSnapshot](threshold),
127136
messages: []ConversationMessage{
@@ -139,9 +148,6 @@ func NewPTY(ctx context.Context, cfg PTYConversationConfig) *PTYConversation {
139148
if len(cfg.InitialPrompt) > 0 {
140149
c.outboundQueue <- outboundMessage{parts: cfg.InitialPrompt, errCh: nil}
141150
}
142-
if c.cfg.OnSnapshot == nil {
143-
c.cfg.OnSnapshot = func(ConversationStatus, []ConversationMessage, string) {}
144-
}
145151
if c.cfg.ReadyForInitialPrompt == nil {
146152
c.cfg.ReadyForInitialPrompt = func(string) bool { return true }
147153
}
@@ -173,7 +179,9 @@ func (c *PTYConversation) Start(ctx context.Context) {
173179
}
174180
c.lock.Unlock()
175181

176-
c.cfg.OnSnapshot(status, messages, screen)
182+
c.emitter.EmitStatus(status)
183+
c.emitter.EmitMessages(messages)
184+
c.emitter.EmitScreen(screen)
177185
return nil
178186
}, "snapshot")
179187

0 commit comments

Comments
 (0)