Skip to content

Commit ca5d5d4

Browse files
committed
Merge branch 'event-emitter-7z1e' into cj/refactor/event-emitter
2 parents 0fa554d + a116d63 commit ca5d5d4

3 files changed

Lines changed: 40 additions & 24 deletions

File tree

lib/httpapi/events.go

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ type EventEmitter struct {
6464
agentType mf.AgentType
6565
chans map[int]chan Event
6666
chanIdx int
67-
subscriptionBufSize int
67+
subscriptionBufSize uint
6868
screen string
6969
}
7070

@@ -81,13 +81,17 @@ func convertStatus(status st.ConversationStatus) AgentStatus {
8181
}
8282
}
8383

84-
const defaultSubscriptionBufSize = 1024
84+
const defaultSubscriptionBufSize uint = 1024
8585

8686
type EventEmitterOption func(*EventEmitter)
8787

88-
func WithSubscriptionBufSize(size int) EventEmitterOption {
88+
func WithSubscriptionBufSize(size uint) EventEmitterOption {
8989
return func(e *EventEmitter) {
90-
e.subscriptionBufSize = size
90+
if size == 0 {
91+
e.subscriptionBufSize = defaultSubscriptionBufSize
92+
} else {
93+
e.subscriptionBufSize = size
94+
}
9195
}
9296
}
9397

@@ -150,6 +154,9 @@ func (e *EventEmitter) EmitMessages(newMessages []st.ConversationMessage) {
150154
newMsg = newMessages[i]
151155
}
152156
if oldMsg != newMsg {
157+
if i >= len(newMessages) {
158+
continue
159+
}
153160
e.notifyChannels(EventTypeMessageUpdate, MessageUpdateBody{
154161
Id: newMessages[i].Id,
155162
Role: newMessages[i].Role,

lib/screentracker/pty_conversation.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ type PTYConversationConfig struct {
6868
FormatToolCall func(message string) (string, []string)
6969
// InitialPrompt is the initial prompt to send to the agent once ready
7070
InitialPrompt []MessagePart
71-
Logger *slog.Logger
71+
Logger *slog.Logger
7272
}
7373

7474
func (cfg PTYConversationConfig) getStableSnapshotsThreshold() int {
@@ -114,10 +114,19 @@ type PTYConversation struct {
114114

115115
var _ Conversation = &PTYConversation{}
116116

117+
type noopEmitter struct{}
118+
119+
func (noopEmitter) EmitMessages([]ConversationMessage) {}
120+
func (noopEmitter) EmitStatus(ConversationStatus) {}
121+
func (noopEmitter) EmitScreen(string) {}
122+
117123
func NewPTY(ctx context.Context, cfg PTYConversationConfig, emitter Emitter) *PTYConversation {
118124
if cfg.Clock == nil {
119125
cfg.Clock = quartz.NewReal()
120126
}
127+
if emitter == nil {
128+
emitter = noopEmitter{}
129+
}
121130
threshold := cfg.getStableSnapshotsThreshold()
122131
c := &PTYConversation{
123132
cfg: cfg,

lib/screentracker/pty_conversation_test.go

Lines changed: 19 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@ const testTimeout = 10 * time.Second
2020

2121
// testAgent is a goroutine-safe mock implementation of AgentIO.
2222
type testAgent struct {
23-
mu sync.Mutex
24-
screen string
23+
mu sync.Mutex
24+
screen string
2525
// onWrite is called during Write to simulate the agent reacting to
2626
// terminal input (e.g., changing the screen), which unblocks
2727
// writeStabilize's polling loops.
@@ -53,7 +53,7 @@ type testEmitter struct{}
5353

5454
func (testEmitter) EmitMessages([]st.ConversationMessage) {}
5555
func (testEmitter) EmitStatus(st.ConversationStatus) {}
56-
func (testEmitter) EmitScreen(string) {}
56+
func (testEmitter) EmitScreen(string) {}
5757

5858
// advanceFor is a shorthand for advanceUntil with a time-based condition.
5959
func advanceFor(ctx context.Context, t *testing.T, mClock *quartz.Mock, total time.Duration) {
@@ -226,11 +226,11 @@ func TestMessages(t *testing.T) {
226226
mClock := quartz.NewMock(t)
227227
mClock.Set(now)
228228
cfg := st.PTYConversationConfig{
229-
Clock: mClock,
230-
AgentIO: agent,
231-
SnapshotInterval: 100 * time.Millisecond,
232-
ScreenStabilityLength: 200 * time.Millisecond,
233-
Logger: slog.New(slog.NewTextHandler(io.Discard, nil)),
229+
Clock: mClock,
230+
AgentIO: agent,
231+
SnapshotInterval: 100 * time.Millisecond,
232+
ScreenStabilityLength: 200 * time.Millisecond,
233+
Logger: slog.New(slog.NewTextHandler(io.Discard, nil)),
234234
}
235235
for _, opt := range opts {
236236
opt(&cfg)
@@ -519,15 +519,15 @@ func TestInitialPromptReadiness(t *testing.T) {
519519
agent.screen = fmt.Sprintf("__write_%d", writeCounter)
520520
}
521521
cfg := st.PTYConversationConfig{
522-
Clock: mClock,
523-
SnapshotInterval: 1 * time.Second,
524-
ScreenStabilityLength: 0,
525-
AgentIO: agent,
522+
Clock: mClock,
523+
SnapshotInterval: 1 * time.Second,
524+
ScreenStabilityLength: 0,
525+
AgentIO: agent,
526526
ReadyForInitialPrompt: func(message string) bool {
527527
return message == "ready"
528528
},
529-
InitialPrompt: []st.MessagePart{st.MessagePartText{Content: "initial prompt here"}},
530-
Logger: discardLogger,
529+
InitialPrompt: []st.MessagePart{st.MessagePartText{Content: "initial prompt here"}},
530+
Logger: discardLogger,
531531
}
532532

533533
c := st.NewPTY(ctx, cfg, &testEmitter{})
@@ -585,11 +585,11 @@ func TestInitialPromptReadiness(t *testing.T) {
585585
mClock := quartz.NewMock(t)
586586
agent := &testAgent{screen: "ready"}
587587
cfg := st.PTYConversationConfig{
588-
Clock: mClock,
589-
SnapshotInterval: 1 * time.Second,
590-
ScreenStabilityLength: 2 * time.Second, // threshold = 3
591-
AgentIO: agent,
592-
Logger: discardLogger,
588+
Clock: mClock,
589+
SnapshotInterval: 1 * time.Second,
590+
ScreenStabilityLength: 2 * time.Second, // threshold = 3
591+
AgentIO: agent,
592+
Logger: discardLogger,
593593
}
594594

595595
c := st.NewPTY(ctx, cfg, &testEmitter{})

0 commit comments

Comments
 (0)