Skip to content

Commit 49e3494

Browse files
committed
fix(execd): flush resume buffer tail after SSE attach
1 parent 441e3b5 commit 49e3494

3 files changed

Lines changed: 88 additions & 1 deletion

File tree

components/execd/pkg/web/controller/command.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -165,8 +165,12 @@ func (c *CodeInterpretingController) ResumeCommandStream() {
165165
}
166166

167167
c.setupSSEResponse()
168+
lastReplayMaxEid := afterEid
168169
for _, ev := range events {
169170
c.writeSingleEvent("ResumeBuffer", ev.Payload, false, fmt.Sprintf("buffer eid=%d", ev.EID), 0)
171+
if ev.EID > lastReplayMaxEid {
172+
lastReplayMaxEid = ev.EID
173+
}
170174
}
171175

172176
st2, _ := codeRunner.GetCommandStatus(commandID)
@@ -187,6 +191,9 @@ func (c *CodeInterpretingController) ResumeCommandStream() {
187191
return
188192
}
189193

194+
// Catch up events appended while the snapshot slice was replayed (holder still nil); same mutex as writeFrame.
195+
h.flushResumeTail(commandID, lastReplayMaxEid)
196+
190197
select {
191198
case <-h.waitDone():
192199
case <-c.ctx.Request.Context().Done():

components/execd/pkg/web/controller/command_stream.go

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright 2025 Alibaba Group Holding Ltd.
1+
// Copyright 2026 Alibaba Group Holding Ltd.
22
//
33
// Licensed under the Apache License, Version 2.0 (the "License");
44
// you may not use this file except in compliance with the License.
@@ -167,6 +167,37 @@ func (r *streamRegistry) writeSSE(id string, data []byte, bufEid int64, handler,
167167
h.writeFrame(data, bufEid, handler, summary)
168168
}
169169

170+
// flushResumeTail writes all buffered events with EID > afterEid to the current holder while holding h.mu.
171+
// Live writeFrame calls block on the same mutex, so chunks appended only to the ring during the initial
172+
// snapshot replay cannot be missed on this connection (see ResumeCommandStream).
173+
func (h *streamHub) flushResumeTail(commandID string, afterEid int64) {
174+
if h == nil || h.holder == nil {
175+
return
176+
}
177+
h.mu.Lock()
178+
defer h.mu.Unlock()
179+
180+
tail, ok := resumeBuffer.EventsAfter(commandID, afterEid)
181+
if !ok || len(tail) == 0 {
182+
return
183+
}
184+
writer := h.holder.writer
185+
for _, ev := range tail {
186+
payload := append(append([]byte(nil), ev.Payload...), '\n', '\n')
187+
n, err := writer.Write(payload)
188+
if err == nil && n != len(payload) {
189+
err = io.ErrShortWrite
190+
}
191+
if err != nil {
192+
log.Error("flushResumeTail: write eid=%d: %v", ev.EID, err)
193+
return
194+
}
195+
if flusher, ok := writer.(http.Flusher); ok {
196+
flusher.Flush()
197+
}
198+
}
199+
}
200+
170201
func (h *streamHub) writeFrame(data []byte, bufEid int64, handler, summary string) {
171202
h.mu.Lock()
172203
defer h.mu.Unlock()
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
// Copyright 2026 Alibaba Group Holding Ltd.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package controller
16+
17+
import (
18+
"context"
19+
"net/http/httptest"
20+
"testing"
21+
22+
"github.com/stretchr/testify/require"
23+
)
24+
25+
func TestFlushResumeTail_NilHubNoPanic(t *testing.T) {
26+
var h *streamHub
27+
h.flushResumeTail("any", 0)
28+
}
29+
30+
func TestFlushResumeTail_WritesBufferedEvents(t *testing.T) {
31+
cmdID := "flush-resume-test-cmd"
32+
payload := []byte(`{"type":"stdout","eid":1}`)
33+
require.NoError(t, resumeBuffer.Append(cmdID, 1, payload))
34+
35+
w := httptest.NewRecorder()
36+
ctx, cancel := context.WithCancel(context.Background())
37+
defer cancel()
38+
39+
h := &streamHub{
40+
streamID: cmdID,
41+
done: make(chan struct{}),
42+
holder: &streamHolder{writer: w, ctx: ctx},
43+
}
44+
h.flushResumeTail(cmdID, 0)
45+
46+
body := w.Body.String()
47+
require.Contains(t, body, "stdout")
48+
require.Contains(t, body, `"eid":1`)
49+
}

0 commit comments

Comments
 (0)