Skip to content

Commit 0c2ae6a

Browse files
committed
refactor(runtime): remove queue shutdown loop
1 parent 8d94ef8 commit 0c2ae6a

8 files changed

Lines changed: 56 additions & 85 deletions

File tree

apps/druid/core/services/runtime_lifecycle.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ func (s *RuntimeSupervisor) DeleteWithPolicy(id string, purgeData bool) error {
1212
delete(s.sessions, id)
1313
s.mu.Unlock()
1414
if session != nil {
15-
session.Shutdown()
15+
session.stopDeploymentQueue()
1616
}
1717

1818
runtimeScroll, err := s.store.GetScroll(id)
@@ -60,6 +60,6 @@ func (s *RuntimeSupervisor) Stop(id string) (*domain.RuntimeScroll, error) {
6060
session.markError(err)
6161
return nil, err
6262
}
63-
session.Shutdown()
63+
session.stopDeploymentQueue()
6464
return s.store.GetScroll(id)
6565
}

apps/druid/core/services/runtime_session.go

Lines changed: 6 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,6 @@ type RuntimeSession struct {
2222
runtimeBackend ports.RuntimeBackendInterface
2323
procedures ports.ProcedureLauchnerInterface
2424
queue map[string]*runtimeQueueItem
25-
taskChan chan string
26-
taskDoneChan chan struct{}
27-
shutdownChan chan struct{}
28-
shutdownDoneChan chan struct{}
29-
shutdownOnce sync.Once
3025
workWg sync.WaitGroup
3126
notifierChan []chan []string
3227
devWatchPaths []string
@@ -84,19 +79,7 @@ func (s *RuntimeSession) Start() {
8479
}
8580
s.started = true
8681
s.mu.Unlock()
87-
s.startQueue()
88-
}
89-
90-
func (s *RuntimeSession) Shutdown() {
91-
s.shutdownQueueLoop()
92-
s.mu.Lock()
93-
s.started = false
94-
s.mu.Unlock()
95-
}
96-
97-
func (s *RuntimeSession) startQueue() {
98-
s.resetQueueState()
99-
go s.Work()
82+
s.triggerRunQueue()
10083
}
10184

10285
func (s *RuntimeSession) newQueue(scrollService *coreservices.ScrollService, root string, scrollName string) (*coreservices.QueueManager, *coreservices.ProcedureLauncher, error) {
@@ -133,7 +116,11 @@ func (s *RuntimeSession) replaceQueue(start bool) error {
133116
s.mu.Unlock()
134117

135118
if start {
136-
s.startQueue()
119+
s.resetQueueState()
120+
s.mu.Lock()
121+
s.started = true
122+
s.mu.Unlock()
123+
s.triggerRunQueue()
137124
} else {
138125
s.resetQueueState()
139126
}

apps/druid/core/services/runtime_session_cache.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ func (s *RuntimeSupervisor) startSession(runtimeScroll *domain.RuntimeScroll) (*
5959
s.mu.Lock()
6060
if existing := s.sessions[runtimeScroll.ID]; existing != nil {
6161
s.mu.Unlock()
62-
session.Shutdown()
62+
session.stopDeploymentQueue()
6363
return existing, nil
6464
}
6565
s.sessions[runtimeScroll.ID] = session

apps/druid/core/services/runtime_session_queue.go

Lines changed: 24 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,6 @@ package services
33
import (
44
"context"
55
"errors"
6-
"fmt"
7-
"sync"
86
"time"
97

108
"github.com/highcard-dev/daemon/internal/core/domain"
@@ -76,10 +74,9 @@ func (s *RuntimeSession) addQueueItem(cmd string, options coreservices.AddItemOp
7674
item = &runtimeQueueItem{doneChan: doneChan}
7775
s.queue[cmd] = item
7876
s.setQueueStatusLocked(cmd, item, domain.ScrollLockStatusWaiting, nil)
79-
taskChan := s.taskChan
8077
s.queueMu.Unlock()
8178

82-
taskChan <- cmd
79+
s.triggerRunQueue()
8380

8481
if options.Wait {
8582
<-doneChan
@@ -127,29 +124,14 @@ func (s *RuntimeSession) HydrateFromState(statuses domain.ProcedureStatusMap) er
127124
return nil
128125
}
129126

130-
func (s *RuntimeSession) Work() {
131-
s.queueMu.Lock()
132-
taskChan := s.taskChan
133-
taskDoneChan := s.taskDoneChan
134-
shutdownChan := s.shutdownChan
135-
shutdownDoneChan := s.shutdownDoneChan
136-
s.queueMu.Unlock()
137-
defer close(shutdownDoneChan)
138-
139-
for {
140-
select {
141-
case <-taskChan:
142-
s.startRunQueue()
143-
case <-taskDoneChan:
144-
s.startRunQueue()
145-
case <-shutdownChan:
146-
s.workWg.Wait()
147-
s.queueMu.Lock()
148-
s.queue = make(map[string]*runtimeQueueItem)
149-
s.queueMu.Unlock()
150-
return
151-
}
127+
func (s *RuntimeSession) triggerRunQueue() {
128+
s.mu.Lock()
129+
started := s.started
130+
s.mu.Unlock()
131+
if !started {
132+
return
152133
}
134+
s.startRunQueue()
153135
}
154136

155137
func (s *RuntimeSession) startRunQueue() {
@@ -241,15 +223,7 @@ func (s *RuntimeSession) RunQueue() {
241223
if i.doneChan != nil {
242224
close(i.doneChan)
243225
}
244-
245-
s.queueMu.Lock()
246-
taskDoneChan := s.taskDoneChan
247-
shutdownChan := s.shutdownChan
248-
s.queueMu.Unlock()
249-
select {
250-
case taskDoneChan <- struct{}{}:
251-
case <-shutdownChan:
252-
}
226+
s.triggerRunQueue()
253227
}()
254228

255229
startedAt := time.Now()
@@ -301,7 +275,6 @@ func (s *RuntimeSession) WaitUntilEmptyContext(ctx context.Context) error {
301275

302276
s.queueMu.Lock()
303277
s.notifierChan = append(s.notifierChan, notifier)
304-
shutdownChan := s.shutdownChan
305278
if !s.hasActiveItemsLocked() {
306279
s.removeNotifierLocked(notifier)
307280
s.queueMu.Unlock()
@@ -322,8 +295,6 @@ func (s *RuntimeSession) WaitUntilEmptyContext(ctx context.Context) error {
322295
}
323296
case <-ctx.Done():
324297
return ctx.Err()
325-
case <-shutdownChan:
326-
return fmt.Errorf("queue manager shut down while waiting for commands")
327298
}
328299
}
329300
}
@@ -340,34 +311,32 @@ func (s *RuntimeSession) GetQueue() map[string]domain.ScrollLockStatus {
340311
return queue
341312
}
342313

343-
func (s *RuntimeSession) shutdownQueueLoop() {
344-
s.queueMu.Lock()
345-
shutdownChan := s.shutdownChan
346-
shutdownDoneChan := s.shutdownDoneChan
347-
s.queueMu.Unlock()
314+
func (s *RuntimeSession) stopDeploymentQueue() {
315+
s.mu.Lock()
316+
s.started = false
317+
s.mu.Unlock()
318+
s.drainQueueWork()
319+
s.resetQueueState()
320+
}
348321

349-
s.shutdownOnce.Do(func() {
350-
close(shutdownChan)
351-
s.notify()
352-
})
322+
func (s *RuntimeSession) drainQueueWork() {
323+
done := make(chan struct{})
324+
go func() {
325+
s.workWg.Wait()
326+
close(done)
327+
}()
353328

354329
select {
355-
case <-shutdownDoneChan:
330+
case <-done:
356331
case <-time.After(5 * time.Second):
357-
logger.Log().Warn("Timed out waiting for queue manager shutdown")
332+
logger.Log().Warn("Timed out waiting for queue work to finish")
358333
}
359334
}
360335

361336
func (s *RuntimeSession) resetQueueState() {
362337
s.queueMu.Lock()
363338
defer s.queueMu.Unlock()
364339
s.queue = make(map[string]*runtimeQueueItem)
365-
s.taskChan = make(chan string, 100)
366-
s.taskDoneChan = make(chan struct{}, 1)
367-
s.shutdownChan = make(chan struct{})
368-
s.shutdownDoneChan = make(chan struct{})
369-
s.shutdownOnce = sync.Once{}
370-
s.workWg = sync.WaitGroup{}
371340
s.notifierChan = make([]chan []string, 0)
372341
}
373342

apps/druid/core/services/runtime_session_runtime.go

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -48,14 +48,20 @@ func (s *RuntimeSession) StopRuntime() error {
4848
started := s.started
4949
s.mu.Unlock()
5050
if started {
51-
s.shutdownQueueLoop()
51+
s.mu.Lock()
52+
s.started = false
53+
s.mu.Unlock()
54+
s.drainQueueWork()
5255
}
5356
if err := s.replaceQueue(false); err != nil {
5457
return err
5558
}
5659
if err := s.runtimeBackend.StopRuntime(root); err != nil {
5760
if started {
58-
s.startQueue()
61+
s.mu.Lock()
62+
s.started = true
63+
s.mu.Unlock()
64+
s.triggerRunQueue()
5965
}
6066
return err
6167
}
@@ -74,7 +80,10 @@ func (s *RuntimeSession) StopRuntime() error {
7480
err := s.store.UpdateScroll(s.runtimeScroll)
7581
s.mu.Unlock()
7682
if err == nil && started {
77-
s.startQueue()
83+
s.mu.Lock()
84+
s.started = true
85+
s.mu.Unlock()
86+
s.triggerRunQueue()
7887
}
7988
return err
8089
}
@@ -117,7 +126,10 @@ func (s *RuntimeSession) ApplyRestore(materialized *ports.RuntimeMaterialization
117126
started := s.started
118127
s.mu.Unlock()
119128
if started {
120-
s.shutdownQueueLoop()
129+
s.mu.Lock()
130+
s.started = false
131+
s.mu.Unlock()
132+
s.drainQueueWork()
121133
}
122134

123135
s.mu.Lock()
@@ -142,7 +154,11 @@ func (s *RuntimeSession) ApplyRestore(materialized *ports.RuntimeMaterialization
142154
err = s.store.UpdateScroll(s.runtimeScroll)
143155
s.mu.Unlock()
144156
if err == nil && started {
145-
s.startQueue()
157+
s.resetQueueState()
158+
s.mu.Lock()
159+
s.started = true
160+
s.mu.Unlock()
161+
s.triggerRunQueue()
146162
}
147163
return err
148164
}

apps/druid/core/services/runtime_supervisor_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -324,7 +324,7 @@ func TestRuntimeSupervisorStartHydratesRunningScroll(t *testing.T) {
324324
}
325325
defer func() {
326326
if session := supervisor.sessions["running-scroll"]; session != nil {
327-
session.Shutdown()
327+
session.stopDeploymentQueue()
328328
}
329329
}()
330330

apps/druid/core/services/runtime_update.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ func (s *RuntimeSupervisor) updateExistingScroll(runtimeScroll *domain.RuntimeSc
3232
delete(s.sessions, runtimeScroll.ID)
3333
s.mu.Unlock()
3434
if session != nil {
35-
session.Shutdown()
35+
session.stopDeploymentQueue()
3636
}
3737

3838
if wasRunning {

test/integration/example_test.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,6 @@ func TestExamples(t *testing.T) {
175175
t.Error(err)
176176
return
177177
}
178-
session.Shutdown()
179178

180179
if config.TestAddress != "" {
181180
err = test_utils.ConnectionTest(config.TestAddress, false)

0 commit comments

Comments
 (0)