Skip to content

Commit e2e4ff4

Browse files
committed
fix(execd): serialize streamHub holder access under h.mu
1 parent 49e3494 commit e2e4ff4

1 file changed

Lines changed: 18 additions & 8 deletions

File tree

components/execd/pkg/web/controller/command_stream.go

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -79,21 +79,25 @@ func (r *streamRegistry) registerPrimary(id string, w http.ResponseWriter, ctx c
7979
streamID: id,
8080
done: make(chan struct{}),
8181
}
82-
h.holder = &streamHolder{writer: w, ctx: ctx}
8382
r.m[id] = h
83+
h.mu.Lock()
84+
h.holder = &streamHolder{writer: w, ctx: ctx}
85+
h.mu.Unlock()
8486
r.mu.Unlock()
8587

86-
r.watchHolderRelease(id, h, ctx)
88+
watchHolderRelease(h, ctx)
8789
}
8890

89-
func (r *streamRegistry) watchHolderRelease(id string, h *streamHub, ctx context.Context) {
91+
// watchHolderRelease clears h.holder when ctx is cancelled. All holder mutations use h.mu only
92+
// (see tryAttachResume, registerPrimary, writeFrame) so r.mu and h.mu are not split across h.holder.
93+
func watchHolderRelease(h *streamHub, ctx context.Context) {
9094
go func() {
9195
<-ctx.Done()
92-
r.mu.Lock()
93-
if cur, ok := r.m[id]; ok && cur == h && h.holder != nil && h.holder.ctx == ctx {
96+
h.mu.Lock()
97+
defer h.mu.Unlock()
98+
if h.holder != nil && h.holder.ctx == ctx {
9499
h.holder = nil
95100
}
96-
r.mu.Unlock()
97101
}()
98102
}
99103

@@ -143,14 +147,17 @@ func (r *streamRegistry) tryAttachResume(id string, w http.ResponseWriter, ctx c
143147
r.mu.Unlock()
144148
return nil, errLiveHubNotFound
145149
}
150+
h.mu.Lock()
146151
if h.holder != nil && h.holder.ctx.Err() == nil {
152+
h.mu.Unlock()
147153
r.mu.Unlock()
148154
return nil, errLiveStreamPrimaryActive
149155
}
150156
h.holder = &streamHolder{writer: w, ctx: ctx}
157+
h.mu.Unlock()
151158
r.mu.Unlock()
152159

153-
r.watchHolderRelease(id, h, ctx)
160+
watchHolderRelease(h, ctx)
154161
return h, nil
155162
}
156163

@@ -171,11 +178,14 @@ func (r *streamRegistry) writeSSE(id string, data []byte, bufEid int64, handler,
171178
// Live writeFrame calls block on the same mutex, so chunks appended only to the ring during the initial
172179
// snapshot replay cannot be missed on this connection (see ResumeCommandStream).
173180
func (h *streamHub) flushResumeTail(commandID string, afterEid int64) {
174-
if h == nil || h.holder == nil {
181+
if h == nil {
175182
return
176183
}
177184
h.mu.Lock()
178185
defer h.mu.Unlock()
186+
if h.holder == nil {
187+
return
188+
}
179189

180190
tail, ok := resumeBuffer.EventsAfter(commandID, afterEid)
181191
if !ok || len(tail) == 0 {

0 commit comments

Comments
 (0)