Skip to content

Commit 2ab7a0b

Browse files
authored
chore: integrate coder/quartz (#175)
Refactors existing usage of time.Sleep etc. to use github.com/coder/quartz.
1 parent 47eec24 commit 2ab7a0b

File tree

7 files changed

+180
-80
lines changed

7 files changed

+180
-80
lines changed

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ require (
77
github.com/acarl005/stripansi v0.0.0-20180116102854-5a71ef0e047d
88
github.com/charmbracelet/bubbletea v1.3.4
99
github.com/coder/agentapi-sdk-go v0.0.0-20250505131810-560d1d88d225
10+
github.com/coder/quartz v0.1.2
1011
github.com/danielgtaylor/huma/v2 v2.32.0
1112
github.com/go-chi/chi/v5 v5.2.2
1213
github.com/go-chi/cors v1.2.1
@@ -193,7 +194,6 @@ require (
193194
go-simpler.org/sloglint v0.11.1 // indirect
194195
go.augendre.info/arangolint v0.2.0 // indirect
195196
go.augendre.info/fatcontext v0.8.1 // indirect
196-
go.uber.org/atomic v1.9.0 // indirect
197197
go.uber.org/automaxprocs v1.6.0 // indirect
198198
go.uber.org/multierr v1.10.0 // indirect
199199
go.uber.org/zap v1.27.0 // indirect

go.sum

Lines changed: 50 additions & 35 deletions
Large diffs are not rendered by default.

lib/httpapi/server.go

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
mf "github.com/coder/agentapi/lib/msgfmt"
2525
st "github.com/coder/agentapi/lib/screentracker"
2626
"github.com/coder/agentapi/lib/termexec"
27+
"github.com/coder/quartz"
2728
"github.com/danielgtaylor/huma/v2"
2829
"github.com/danielgtaylor/huma/v2/adapters/humachi"
2930
"github.com/danielgtaylor/huma/v2/sse"
@@ -46,6 +47,7 @@ type Server struct {
4647
emitter *EventEmitter
4748
chatBasePath string
4849
tempDir string
50+
clock quartz.Clock
4951
}
5052

5153
func (s *Server) NormalizeSchema(schema any) any {
@@ -102,6 +104,7 @@ type ServerConfig struct {
102104
AllowedHosts []string
103105
AllowedOrigins []string
104106
InitialPrompt string
107+
Clock quartz.Clock
105108
}
106109

107110
// Validate allowed hosts don't contain whitespace, commas, schemes, or ports.
@@ -194,6 +197,10 @@ func NewServer(ctx context.Context, config ServerConfig) (*Server, error) {
194197

195198
logger := logctx.From(ctx)
196199

200+
if config.Clock == nil {
201+
config.Clock = quartz.NewReal()
202+
}
203+
197204
allowedHosts, err := parseAllowedHosts(config.AllowedHosts)
198205
if err != nil {
199206
return nil, xerrors.Errorf("failed to parse allowed hosts: %w", err)
@@ -238,11 +245,9 @@ func NewServer(ctx context.Context, config ServerConfig) (*Server, error) {
238245
}
239246

240247
conversation := st.NewConversation(ctx, st.ConversationConfig{
241-
AgentType: config.AgentType,
242-
AgentIO: config.Process,
243-
GetTime: func() time.Time {
244-
return time.Now()
245-
},
248+
AgentType: config.AgentType,
249+
AgentIO: config.Process,
250+
Clock: config.Clock,
246251
SnapshotInterval: snapshotInterval,
247252
ScreenStabilityLength: 2 * time.Second,
248253
FormatMessage: formatMessage,
@@ -270,6 +275,7 @@ func NewServer(ctx context.Context, config ServerConfig) (*Server, error) {
270275
emitter: emitter,
271276
chatBasePath: strings.TrimSuffix(config.ChatBasePath, "/"),
272277
tempDir: tempDir,
278+
clock: config.Clock,
273279
}
274280

275281
// Register API routes
@@ -333,12 +339,13 @@ func sseMiddleware(ctx huma.Context, next func(huma.Context)) {
333339
func (s *Server) StartSnapshotLoop(ctx context.Context) {
334340
s.conversation.StartSnapshotLoop(ctx)
335341
go func() {
342+
ticker := s.clock.NewTicker(snapshotInterval)
343+
defer ticker.Stop()
336344
for {
337345
currentStatus := s.conversation.Status()
338346

339347
// Send initial prompt when agent becomes stable for the first time
340348
if !s.conversation.InitialPromptSent && convertStatus(currentStatus) == AgentStatusStable {
341-
342349
if err := s.conversation.SendMessage(FormatMessage(s.agentType, s.conversation.InitialPrompt)...); err != nil {
343350
s.logger.Error("Failed to send initial prompt", "error", err)
344351
} else {
@@ -351,7 +358,12 @@ func (s *Server) StartSnapshotLoop(ctx context.Context) {
351358
s.emitter.UpdateStatusAndEmitChanges(currentStatus, s.agentType)
352359
s.emitter.UpdateMessagesAndEmitChanges(s.conversation.Messages())
353360
s.emitter.UpdateScreenAndEmitChanges(s.conversation.Screen())
354-
time.Sleep(snapshotInterval)
361+
362+
select {
363+
case <-ctx.Done():
364+
return
365+
case <-ticker.C:
366+
}
355367
}
356368
}()
357369
}

lib/screentracker/conversation.go

Lines changed: 23 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010

1111
"github.com/coder/agentapi/lib/msgfmt"
1212
"github.com/coder/agentapi/lib/util"
13+
"github.com/coder/quartz"
1314
"github.com/danielgtaylor/huma/v2"
1415
"golang.org/x/xerrors"
1516
)
@@ -27,8 +28,8 @@ type AgentIO interface {
2728
type ConversationConfig struct {
2829
AgentType msgfmt.AgentType
2930
AgentIO AgentIO
30-
// GetTime returns the current time
31-
GetTime func() time.Time
31+
// Clock provides time operations for the conversation
32+
Clock quartz.Clock
3233
// How often to take a snapshot for the stability check
3334
SnapshotInterval time.Duration
3435
// How long the screen should not change to be considered stable
@@ -109,6 +110,9 @@ func getStableSnapshotsThreshold(cfg ConversationConfig) int {
109110
}
110111

111112
func NewConversation(ctx context.Context, cfg ConversationConfig, initialPrompt string) *Conversation {
113+
if cfg.Clock == nil {
114+
cfg.Clock = quartz.NewReal()
115+
}
112116
threshold := getStableSnapshotsThreshold(cfg)
113117
c := &Conversation{
114118
cfg: cfg,
@@ -118,7 +122,7 @@ func NewConversation(ctx context.Context, cfg ConversationConfig, initialPrompt
118122
{
119123
Message: "",
120124
Role: ConversationRoleAgent,
121-
Time: cfg.GetTime(),
125+
Time: cfg.Clock.Now(),
122126
},
123127
},
124128
InitialPrompt: initialPrompt,
@@ -130,11 +134,13 @@ func NewConversation(ctx context.Context, cfg ConversationConfig, initialPrompt
130134

131135
func (c *Conversation) StartSnapshotLoop(ctx context.Context) {
132136
go func() {
137+
ticker := c.cfg.Clock.NewTicker(c.cfg.SnapshotInterval)
138+
defer ticker.Stop()
133139
for {
134140
select {
135141
case <-ctx.Done():
136142
return
137-
case <-time.After(c.cfg.SnapshotInterval):
143+
case <-ticker.C:
138144
// It's important that we hold the lock while reading the screen.
139145
// There's a race condition that occurs without it:
140146
// 1. The screen is read
@@ -250,7 +256,7 @@ func (c *Conversation) updateLastAgentMessage(screen string, timestamp time.Time
250256
// assumes the caller holds the lock
251257
func (c *Conversation) addSnapshotInner(screen string) {
252258
snapshot := screenSnapshot{
253-
timestamp: c.cfg.GetTime(),
259+
timestamp: c.cfg.Clock.Now(),
254260
screen: screen,
255261
}
256262
c.snapshotBuffer.Add(snapshot)
@@ -320,10 +326,11 @@ func (c *Conversation) writeMessageWithConfirmation(ctx context.Context, message
320326
Timeout: 15 * time.Second,
321327
MinInterval: 50 * time.Millisecond,
322328
InitialWait: true,
329+
Clock: c.cfg.Clock,
323330
}, func() (bool, error) {
324331
screen := c.cfg.AgentIO.ReadScreen()
325332
if screen != screenBeforeMessage {
326-
time.Sleep(1 * time.Second)
333+
<-util.After(c.cfg.Clock, time.Second)
327334
newScreen := c.cfg.AgentIO.ReadScreen()
328335
return newScreen == screen, nil
329336
}
@@ -338,17 +345,18 @@ func (c *Conversation) writeMessageWithConfirmation(ctx context.Context, message
338345
if err := util.WaitFor(ctx, util.WaitTimeout{
339346
Timeout: 15 * time.Second,
340347
MinInterval: 25 * time.Millisecond,
348+
Clock: c.cfg.Clock,
341349
}, func() (bool, error) {
342350
// we don't want to spam additional carriage returns because the agent may process them
343351
// (aider does this), but we do want to retry sending one if nothing's
344352
// happening for a while
345-
if time.Since(lastCarriageReturnTime) >= 3*time.Second {
346-
lastCarriageReturnTime = time.Now()
353+
if c.cfg.Clock.Since(lastCarriageReturnTime) >= 3*time.Second {
354+
lastCarriageReturnTime = c.cfg.Clock.Now()
347355
if _, err := c.cfg.AgentIO.Write([]byte("\r")); err != nil {
348356
return false, xerrors.Errorf("failed to write carriage return: %w", err)
349357
}
350358
}
351-
time.Sleep(25 * time.Millisecond)
359+
<-util.After(c.cfg.Clock, 25*time.Millisecond)
352360
screen := c.cfg.AgentIO.ReadScreen()
353361

354362
return screen != screenBeforeCarriageReturn, nil
@@ -359,9 +367,11 @@ func (c *Conversation) writeMessageWithConfirmation(ctx context.Context, message
359367
return nil
360368
}
361369

362-
var MessageValidationErrorWhitespace = xerrors.New("message must be trimmed of leading and trailing whitespace")
363-
var MessageValidationErrorEmpty = xerrors.New("message must not be empty")
364-
var MessageValidationErrorChanging = xerrors.New("message can only be sent when the agent is waiting for user input")
370+
var (
371+
MessageValidationErrorWhitespace = xerrors.New("message must be trimmed of leading and trailing whitespace")
372+
MessageValidationErrorEmpty = xerrors.New("message must not be empty")
373+
MessageValidationErrorChanging = xerrors.New("message can only be sent when the agent is waiting for user input")
374+
)
365375

366376
func (c *Conversation) SendMessage(messageParts ...MessagePart) error {
367377
c.lock.Lock()
@@ -382,7 +392,7 @@ func (c *Conversation) SendMessage(messageParts ...MessagePart) error {
382392
}
383393

384394
screenBeforeMessage := c.cfg.AgentIO.ReadScreen()
385-
now := c.cfg.GetTime()
395+
now := c.cfg.Clock.Now()
386396
c.updateLastAgentMessage(screenBeforeMessage, now)
387397

388398
if err := c.writeMessageWithConfirmation(context.Background(), messageParts...); err != nil {

lib/screentracker/conversation_test.go

Lines changed: 25 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"time"
1010

1111
"github.com/coder/agentapi/lib/msgfmt"
12+
"github.com/coder/quartz"
1213
"github.com/stretchr/testify/assert"
1314

1415
st "github.com/coder/agentapi/lib/screentracker"
@@ -39,8 +40,8 @@ func (a *testAgent) Write(data []byte) (int, error) {
3940
func statusTest(t *testing.T, params statusTestParams) {
4041
ctx := context.Background()
4142
t.Run(fmt.Sprintf("interval-%s,stability_length-%s", params.cfg.SnapshotInterval, params.cfg.ScreenStabilityLength), func(t *testing.T) {
42-
if params.cfg.GetTime == nil {
43-
params.cfg.GetTime = func() time.Time { return time.Now() }
43+
if params.cfg.Clock == nil {
44+
params.cfg.Clock = quartz.NewReal()
4445
}
4546
c := st.NewConversation(ctx, params.cfg, "")
4647
assert.Equal(t, st.ConversationStatusInitializing, c.Status())
@@ -137,8 +138,10 @@ func TestMessages(t *testing.T) {
137138
return c.SendMessage(st.MessagePartText{Content: msg})
138139
}
139140
newConversation := func(opts ...func(*st.ConversationConfig)) *st.Conversation {
141+
mClock := quartz.NewMock(t)
142+
mClock.Set(now)
140143
cfg := st.ConversationConfig{
141-
GetTime: func() time.Time { return now },
144+
Clock: mClock,
142145
SnapshotInterval: 1 * time.Second,
143146
ScreenStabilityLength: 2 * time.Second,
144147
SkipWritingMessage: true,
@@ -173,21 +176,18 @@ func TestMessages(t *testing.T) {
173176
})
174177

175178
t.Run("no-change-no-message-update", func(t *testing.T) {
176-
nowWrapper := struct {
177-
time.Time
178-
}{
179-
Time: now,
180-
}
179+
mClock := quartz.NewMock(t)
180+
mClock.Set(now)
181181
c := newConversation(func(cfg *st.ConversationConfig) {
182-
cfg.GetTime = func() time.Time { return nowWrapper.Time }
182+
cfg.Clock = mClock
183183
})
184184

185185
c.AddSnapshot("1")
186186
msgs := c.Messages()
187187
assert.Equal(t, []st.ConversationMessage{
188188
agentMsg(0, "1"),
189189
}, msgs)
190-
nowWrapper.Time = nowWrapper.Add(1 * time.Second)
190+
mClock.Set(now.Add(1 * time.Second))
191191
c.AddSnapshot("1")
192192
assert.Equal(t, msgs, c.Messages())
193193
})
@@ -411,8 +411,10 @@ func TestInitialPromptReadiness(t *testing.T) {
411411
now := time.Now()
412412

413413
t.Run("agent not ready - status remains changing", func(t *testing.T) {
414+
mClock := quartz.NewMock(t)
415+
mClock.Set(now)
414416
cfg := st.ConversationConfig{
415-
GetTime: func() time.Time { return now },
417+
Clock: mClock,
416418
SnapshotInterval: 1 * time.Second,
417419
ScreenStabilityLength: 0,
418420
AgentIO: &testAgent{screen: "loading..."},
@@ -432,8 +434,10 @@ func TestInitialPromptReadiness(t *testing.T) {
432434
})
433435

434436
t.Run("agent becomes ready - status changes to stable", func(t *testing.T) {
437+
mClock := quartz.NewMock(t)
438+
mClock.Set(now)
435439
cfg := st.ConversationConfig{
436-
GetTime: func() time.Time { return now },
440+
Clock: mClock,
437441
SnapshotInterval: 1 * time.Second,
438442
ScreenStabilityLength: 0,
439443
AgentIO: &testAgent{screen: "loading..."},
@@ -455,9 +459,11 @@ func TestInitialPromptReadiness(t *testing.T) {
455459
})
456460

457461
t.Run("ready for initial prompt lifecycle: false -> true -> false", func(t *testing.T) {
462+
mClock := quartz.NewMock(t)
463+
mClock.Set(now)
458464
agent := &testAgent{screen: "loading..."}
459465
cfg := st.ConversationConfig{
460-
GetTime: func() time.Time { return now },
466+
Clock: mClock,
461467
SnapshotInterval: 1 * time.Second,
462468
ScreenStabilityLength: 0,
463469
AgentIO: agent,
@@ -496,8 +502,10 @@ func TestInitialPromptReadiness(t *testing.T) {
496502
})
497503

498504
t.Run("no initial prompt - normal status logic applies", func(t *testing.T) {
505+
mClock := quartz.NewMock(t)
506+
mClock.Set(now)
499507
cfg := st.ConversationConfig{
500-
GetTime: func() time.Time { return now },
508+
Clock: mClock,
501509
SnapshotInterval: 1 * time.Second,
502510
ScreenStabilityLength: 0,
503511
AgentIO: &testAgent{screen: "loading..."},
@@ -517,9 +525,11 @@ func TestInitialPromptReadiness(t *testing.T) {
517525
})
518526

519527
t.Run("initial prompt sent - normal status logic applies", func(t *testing.T) {
528+
mClock := quartz.NewMock(t)
529+
mClock.Set(now)
520530
agent := &testAgent{screen: "ready"}
521531
cfg := st.ConversationConfig{
522-
GetTime: func() time.Time { return now },
532+
Clock: mClock,
523533
SnapshotInterval: 1 * time.Second,
524534
ScreenStabilityLength: 0,
525535
AgentIO: agent,

0 commit comments

Comments
 (0)