Skip to content

Commit 7ac7afd

Browse files
committed
fix(execd): serialize streamHub holder access under h.mu
1 parent 49e3494 commit 7ac7afd

2 files changed

Lines changed: 37 additions & 15 deletions

File tree

components/execd/pkg/web/controller/command.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,10 @@ func (c *CodeInterpretingController) ResumeCommandStream() {
175175

176176
st2, _ := codeRunner.GetCommandStatus(commandID)
177177
if st2 == nil || !st2.Running {
178+
if len(events) > 0 {
179+
log.Info("resume stream: command_id=%s after_eid=%d snapshot_events=%d (replay only)",
180+
commandID, afterEid, len(events))
181+
}
178182
return
179183
}
180184

@@ -192,7 +196,9 @@ func (c *CodeInterpretingController) ResumeCommandStream() {
192196
}
193197

194198
// Catch up events appended while the snapshot slice was replayed (holder still nil); same mutex as writeFrame.
195-
h.flushResumeTail(commandID, lastReplayMaxEid)
199+
tailN := h.flushResumeTail(commandID, lastReplayMaxEid)
200+
log.Info("resume stream: command_id=%s after_eid=%d snapshot_events=%d post_attach_tail=%d (live)",
201+
commandID, afterEid, len(events), tailN)
196202

197203
select {
198204
case <-h.waitDone():

components/execd/pkg/web/controller/command_stream.go

Lines changed: 30 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ func deferResumeCleanup(c *CodeInterpretingController) {
5050
}
5151
commandStreams.closeAndRemove(id)
5252
resumeBuffer.Delete(id)
53+
log.Info("command stream: hub and resume buffer cleaned up id=%s", id)
5354
}
5455

5556
// --- live SSE routing (mutually exclusive main vs resume) ---
@@ -79,21 +80,26 @@ func (r *streamRegistry) registerPrimary(id string, w http.ResponseWriter, ctx c
7980
streamID: id,
8081
done: make(chan struct{}),
8182
}
82-
h.holder = &streamHolder{writer: w, ctx: ctx}
8383
r.m[id] = h
84+
h.mu.Lock()
85+
h.holder = &streamHolder{writer: w, ctx: ctx}
86+
h.mu.Unlock()
8487
r.mu.Unlock()
8588

86-
r.watchHolderRelease(id, h, ctx)
89+
log.Info("command stream: primary hub registered id=%s", id)
90+
watchHolderRelease(h, ctx)
8791
}
8892

89-
func (r *streamRegistry) watchHolderRelease(id string, h *streamHub, ctx context.Context) {
93+
// watchHolderRelease clears h.holder when ctx is cancelled. All holder mutations use h.mu only
94+
// (see tryAttachResume, registerPrimary, writeFrame) so r.mu and h.mu are not split across h.holder.
95+
func watchHolderRelease(h *streamHub, ctx context.Context) {
9096
go func() {
9197
<-ctx.Done()
92-
r.mu.Lock()
93-
if cur, ok := r.m[id]; ok && cur == h && h.holder != nil && h.holder.ctx == ctx {
98+
h.mu.Lock()
99+
defer h.mu.Unlock()
100+
if h.holder != nil && h.holder.ctx == ctx {
94101
h.holder = nil
95102
}
96-
r.mu.Unlock()
97103
}()
98104
}
99105

@@ -143,14 +149,17 @@ func (r *streamRegistry) tryAttachResume(id string, w http.ResponseWriter, ctx c
143149
r.mu.Unlock()
144150
return nil, errLiveHubNotFound
145151
}
152+
h.mu.Lock()
146153
if h.holder != nil && h.holder.ctx.Err() == nil {
154+
h.mu.Unlock()
147155
r.mu.Unlock()
148156
return nil, errLiveStreamPrimaryActive
149157
}
150158
h.holder = &streamHolder{writer: w, ctx: ctx}
159+
h.mu.Unlock()
151160
r.mu.Unlock()
152161

153-
r.watchHolderRelease(id, h, ctx)
162+
watchHolderRelease(h, ctx)
154163
return h, nil
155164
}
156165

@@ -170,32 +179,39 @@ func (r *streamRegistry) writeSSE(id string, data []byte, bufEid int64, handler,
170179
// flushResumeTail writes all buffered events with EID > afterEid to the current holder while holding h.mu.
171180
// Live writeFrame calls block on the same mutex, so chunks appended only to the ring during the initial
172181
// snapshot replay cannot be missed on this connection (see ResumeCommandStream).
173-
func (h *streamHub) flushResumeTail(commandID string, afterEid int64) {
174-
if h == nil || h.holder == nil {
175-
return
182+
// Returns how many extra events were written after the initial snapshot replay.
183+
func (h *streamHub) flushResumeTail(commandID string, afterEid int64) int {
184+
if h == nil {
185+
return 0
176186
}
177187
h.mu.Lock()
178188
defer h.mu.Unlock()
189+
if h.holder == nil {
190+
return 0
191+
}
179192

180193
tail, ok := resumeBuffer.EventsAfter(commandID, afterEid)
181194
if !ok || len(tail) == 0 {
182-
return
195+
return 0
183196
}
184197
writer := h.holder.writer
198+
written := 0
185199
for _, ev := range tail {
186200
payload := append(append([]byte(nil), ev.Payload...), '\n', '\n')
187-
n, err := writer.Write(payload)
188-
if err == nil && n != len(payload) {
201+
nw, err := writer.Write(payload)
202+
if err == nil && nw != len(payload) {
189203
err = io.ErrShortWrite
190204
}
191205
if err != nil {
192206
log.Error("flushResumeTail: write eid=%d: %v", ev.EID, err)
193-
return
207+
return written
194208
}
195209
if flusher, ok := writer.(http.Flusher); ok {
196210
flusher.Flush()
197211
}
212+
written++
198213
}
214+
return written
199215
}
200216

201217
func (h *streamHub) writeFrame(data []byte, bufEid int64, handler, summary string) {

0 commit comments

Comments
 (0)