@@ -94,24 +94,23 @@ type PTYConversation struct {
9494 screenBeforeLastUserMessage string
9595 lock sync.Mutex
9696
97- // outboundQueue holds messages waiting to be sent to the agent
97+ // outboundQueue holds messages waiting to be sent to the agent.
98+ // Buffer size is 1. Callers are expected to be serialized (the HTTP
99+ // layer holds s.mu, and Send blocks until the message is processed),
100+ // so ordering is preserved.
98101 outboundQueue chan outboundMessage
99102 // stableSignal is used by the snapshot loop to signal the send loop
100103 // when the agent is stable and there are items in the outbound queue.
101104 stableSignal chan struct {}
102105 // toolCallMessageSet keeps track of the tool calls that have been detected & logged in the current agent message
103106 toolCallMessageSet map [string ]bool
104107 // initialPromptReady is closed when ReadyForInitialPrompt returns true.
105- // This is checked by a separate goroutine to avoid calling ReadyForInitialPrompt on every tick.
108+ // Checked inline in the snapshot loop on each tick.
106109 initialPromptReady chan struct {}
107110}
108111
109112var _ Conversation = & PTYConversation {}
110113
111- // errInitialPromptReady is a sentinel used to stop the readiness TickerFunc
112- // after ReadyForInitialPrompt returns true.
113- var errInitialPromptReady = xerrors .New ("initial prompt ready" )
114-
115114func NewPTY (ctx context.Context , cfg PTYConversationConfig ) * PTYConversation {
116115 if cfg .Clock == nil {
117116 cfg .Clock = quartz .NewReal ()
@@ -147,18 +146,6 @@ func NewPTY(ctx context.Context, cfg PTYConversationConfig) *PTYConversation {
147146}
148147
149148func (c * PTYConversation ) Start (ctx context.Context ) {
150- // Initial prompt readiness loop - polls ReadyForInitialPrompt until it returns true,
151- // then closes initialPromptReady and exits. This avoids calling ReadyForInitialPrompt
152- // on every snapshot tick.
153- c .cfg .Clock .TickerFunc (ctx , 100 * time .Millisecond , func () error {
154- screen := c .cfg .AgentIO .ReadScreen ()
155- if c .cfg .ReadyForInitialPrompt (screen ) {
156- close (c .initialPromptReady )
157- return errInitialPromptReady
158- }
159- return nil
160- }, "readiness" )
161-
162149 // Snapshot loop
163150 c .cfg .Clock .TickerFunc (ctx , c .cfg .SnapshotInterval , func () error {
164151 c .lock .Lock ()
@@ -175,6 +162,10 @@ func (c *PTYConversation) Start(ctx context.Context) {
175162 case <- c .initialPromptReady :
176163 isReady = true
177164 default :
165+ if c .cfg .ReadyForInitialPrompt (screen ) {
166+ close (c .initialPromptReady )
167+ isReady = true
168+ }
178169 }
179170 if isReady && len (c .outboundQueue ) > 0 && c .isScreenStableLocked () {
180171 select {
@@ -191,6 +182,20 @@ func (c *PTYConversation) Start(ctx context.Context) {
191182
192183 // Send loop - primary call site for sendLocked() in production
193184 go func () {
185+ defer func () {
186+ // Drain outbound queue so Send() callers don't block forever.
187+ for {
188+ select {
189+ case msg := <- c .outboundQueue :
190+ if msg .errCh != nil {
191+ msg .errCh <- ctx .Err ()
192+ close (msg .errCh )
193+ }
194+ default :
195+ return
196+ }
197+ }
198+ }()
194199 for {
195200 select {
196201 case <- ctx .Done ():
@@ -203,6 +208,7 @@ func (c *PTYConversation) Start(ctx context.Context) {
203208 err := c .sendMessage (ctx , msg .parts ... )
204209 if msg .errCh != nil {
205210 msg .errCh <- err
211+ close (msg .errCh )
206212 }
207213 default :
208214 c .cfg .Logger .Error ("received stable signal but outbound queue is empty" )
0 commit comments