Skip to content

Commit df0c0ac

Browse files
authored
Adds graceful shutdown to concurrent core
1 parent 66c32dc commit df0c0ac

10 files changed

Lines changed: 174 additions & 56 deletions

core/concurrent_cache/backend_test.go

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@ func TestUpsertBackend_Metrics(t *testing.T) {
3131
for _, tt := range tests {
3232
t.Run(tt.name, func(t *testing.T) {
3333
mockCtrl := gomock.NewController(t)
34-
defer mockCtrl.Finish()
3534

3635
// Reset metrics before each test
3736
metrics.BackendsGauge.Reset()
@@ -132,7 +131,6 @@ func TestDeleteBackend_Metrics(t *testing.T) {
132131
for _, tt := range tests {
133132
t.Run(tt.name, func(t *testing.T) {
134133
mockCtrl := gomock.NewController(t)
135-
defer mockCtrl.Finish()
136134

137135
// Reset metrics before each test
138136
metrics.BackendsGauge.Reset()
@@ -294,7 +292,6 @@ func TestReadBackend(t *testing.T) {
294292
for _, tt := range tests {
295293
t.Run(tt.name, func(t *testing.T) {
296294
mockCtrl := gomock.NewController(t)
297-
defer mockCtrl.Finish()
298295

299296
// Set up initial state
300297
backends.lock()
@@ -354,7 +351,6 @@ func TestInconsistentReadBackend(t *testing.T) {
354351
for _, tt := range tests {
355352
t.Run(tt.name, func(t *testing.T) {
356353
mockCtrl := gomock.NewController(t)
357-
defer mockCtrl.Finish()
358354

359355
// Set up initial state
360356
backends.lock()
@@ -417,7 +413,6 @@ func TestReadBackendByName(t *testing.T) {
417413
for _, tt := range tests {
418414
t.Run(tt.name, func(t *testing.T) {
419415
mockCtrl := gomock.NewController(t)
420-
defer mockCtrl.Finish()
421416

422417
// Set up initial state
423418
backends.lock()
@@ -485,7 +480,6 @@ func TestInconsistentReadBackendByName(t *testing.T) {
485480
for _, tt := range tests {
486481
t.Run(tt.name, func(t *testing.T) {
487482
mockCtrl := gomock.NewController(t)
488-
defer mockCtrl.Finish()
489483

490484
// Set up initial state
491485
backends.lock()

core/concurrent_cache/concurrent_cache.go

Lines changed: 23 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,10 @@ func init() {
2828

2929
// Initialize Need this as public for unit tests
3030
func Initialize() {
31+
rootCache = cache{
32+
data: make(map[string]SmartCopier),
33+
resourceLocks: locks.NewGCNamedMutex(),
34+
}
3135
nodes = cache{
3236
data: make(map[string]SmartCopier),
3337
resourceLocks: locks.NewGCNamedMutex(),
@@ -49,7 +53,6 @@ func Initialize() {
4953
volumes = cache{
5054
data: make(map[string]SmartCopier),
5155
resourceLocks: locks.NewGCNamedMutex(),
52-
5356
// volume internal name
5457
key: &uniqueKey{
5558
data: make(map[string]string),
@@ -84,6 +87,8 @@ func Initialize() {
8487
for r := range schema {
8588
resourceRanks[r] = rankForResource(r)
8689
}
90+
91+
rootCache.data["."] = SmartCopier(nil)
8792
}
8893

8994
// SmartCopier is an interface for objects that can be copied for the cache. If the object implements interior
@@ -143,6 +148,7 @@ func (c *cache) runlock() {
143148
}
144149

145150
var (
151+
rootCache cache
146152
nodes cache
147153
storageClasses cache
148154
backends cache
@@ -153,6 +159,7 @@ var (
153159
autogrowPolicies cache
154160

155161
caches = map[resource]*cache{
162+
root: &rootCache,
156163
node: &nodes,
157164
storageClass: &storageClasses,
158165
backend: &backends,
@@ -244,6 +251,7 @@ func assembleQueries(queries [][]Subquery, roots [][]int, cachesPresent map[reso
244251
}
245252
queries[i], roots[i] = buildTrees(ri, q)
246253
}
254+
247255
return nil
248256
}
249257

@@ -363,7 +371,7 @@ func dedupe(query []Subquery) (map[resource]int, error) {
363371
return resourceIndices, nil
364372
}
365373

366-
// buildTrees takes subqueries and finds the roots, and fills in any missing parents
374+
// buildTrees takes subqueries and finds the roots, and fills in any missing dependencies
367375
func buildTrees(resourceIndices map[resource]int, query []Subquery) ([]Subquery, []int) {
368376
roots := make([]int, 0)
369377
l := len(query)
@@ -377,11 +385,11 @@ func buildTrees(resourceIndices map[resource]int, query []Subquery) ([]Subquery,
377385
continue
378386
}
379387

380-
// if the current subquery has ownable resources in the query it is not a root
388+
// if the current subquery has dependent resources in the query it is not a root
381389
root := true
382-
ownables := inverseSchema[query[i].res]
383-
for j := 0; j < len(ownables) && root; j++ {
384-
if _, ok := resourceIndices[ownables[j]]; ok {
390+
dependents := inverseSchema[query[i].res]
391+
for j := 0; j < len(dependents) && root; j++ {
392+
if _, ok := resourceIndices[dependents[j]]; ok {
385393
root = false
386394
}
387395
}
@@ -479,11 +487,13 @@ func fillInIDs(root int, query []Subquery) error {
479487
}
480488

481489
func mergeQueries(queries [][]Subquery) []Subquery {
482-
length := 0
490+
length := 1
483491
for _, q := range queries {
484492
length += len(q)
485493
}
486494
merged := make([]Subquery, 0, length)
495+
// always add implied read root, if write root is present it will take precedence
496+
merged = append(merged, Subquery{res: root, op: read, id: "."})
487497
for i, q := range queries {
488498
for j := range q {
489499
q[j].result = i
@@ -505,9 +515,9 @@ func mergeQueries(queries [][]Subquery) []Subquery {
505515
}
506516

507517
switch {
508-
case resourceRanks[i.res] > resourceRanks[j.res]:
509-
return -1
510518
case resourceRanks[i.res] < resourceRanks[j.res]:
519+
return -1
520+
case resourceRanks[i.res] > resourceRanks[j.res]:
511521
return 1
512522
}
513523

@@ -688,7 +698,7 @@ func checkDependency(s []Subquery, i int, r resource) error {
688698

689699
func rankForResource(r resource) int {
690700
rank := 0
691-
for _, c := range inverseSchema[r] {
701+
for _, c := range schema[r] {
692702
cRank := 1 + rankForResource(c)
693703
if cRank > rank {
694704
rank = cRank
@@ -786,7 +796,8 @@ var resourceNames = map[resource]string{
786796
}
787797

788798
const (
789-
node = resource(iota)
799+
root = resource(iota)
800+
node
790801
storageClass
791802
backend
792803
volume
@@ -825,6 +836,7 @@ const (
825836
// schema is the authoritative source for relationships between resources. For example,
826837
// a snapshot depends on a volume, and a volume depends on a backend.
827838
var schema = map[resource][]resource{
839+
root: nil, // root is an implied dependency for all resources
828840
node: nil,
829841
storageClass: nil,
830842
backend: nil,

core/concurrent_cache/concurrent_cache_test.go

Lines changed: 5 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,6 @@ func TestFillInIDs(t *testing.T) {
164164
for _, tt := range tests {
165165
t.Run(tt.name, func(t *testing.T) {
166166
initCaches()
167-
defer cleanCaches()
168167
m, err := dedupe(tt.query)
169168
assert.NoError(t, err)
170169
query, roots := buildTrees(m, tt.query)
@@ -214,7 +213,6 @@ func TestMergeQueries(t *testing.T) {
214213
t.Run(tt.name, func(t *testing.T) {
215214
initCaches()
216215
nodes.data["node2"] = &models.Node{Name: "node2"}
217-
defer cleanCaches()
218216

219217
queries := make([][]Subquery, 0, len(tt.queries))
220218
for _, q := range tt.queries {
@@ -229,7 +227,7 @@ func TestMergeQueries(t *testing.T) {
229227
}
230228

231229
merged := mergeQueries(queries)
232-
assert.Len(t, merged, 13)
230+
assert.Len(t, merged, 14)
233231
})
234232
}
235233
}
@@ -238,7 +236,6 @@ func TestLock(t *testing.T) {
238236
// start two goroutines that try to update the same resources using multiple queries
239237
// passes if there is no deadlock
240238
initCaches()
241-
defer cleanCaches()
242239
makeQueries := func() [][]Subquery {
243240
return [][]Subquery{
244241
{
@@ -295,14 +292,11 @@ func TestLock(t *testing.T) {
295292
}
296293

297294
func initCaches() {
298-
nodes.data = make(map[string]SmartCopier)
299-
backends.data = make(map[string]SmartCopier)
300-
volumes.data = make(map[string]SmartCopier)
301-
volumePublications.data = make(map[string]SmartCopier)
302-
snapshots.data = make(map[string]SmartCopier)
303-
autogrowPolicies.data = make(map[string]SmartCopier)
295+
Initialize()
304296

305-
nodes.data["node1"] = &models.Node{}
297+
nodes.data["node1"] = &models.Node{
298+
Name: "node1",
299+
}
306300
backends.data["backend1"] = storage.NewTestStorageBackend()
307301
volumes.data["volume1"] = &storage.Volume{
308302
BackendUUID: "backend1",
@@ -323,15 +317,6 @@ func initCaches() {
323317
autogrowPolicies.data["policy1"] = storage.NewAutogrowPolicy("policy1", "80", "20", "1000Gi", storage.AutogrowPolicyStateSuccess)
324318
}
325319

326-
func cleanCaches() {
327-
nodes.data = make(map[string]SmartCopier)
328-
backends.data = make(map[string]SmartCopier)
329-
volumes.data = make(map[string]SmartCopier)
330-
volumePublications.data = make(map[string]SmartCopier)
331-
snapshots.data = make(map[string]SmartCopier)
332-
autogrowPolicies.data = make(map[string]SmartCopier)
333-
}
334-
335320
func createQueryGenerators(numIds int) [][]queryGenerator {
336321
generators := make([][]queryGenerator, 0, numIds)
337322

core/concurrent_cache/metrics_test.go

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,6 @@ func TestAddAndDeleteBackendMetrics(t *testing.T) {
9191
metrics.TridentBackendInfo.Reset()
9292

9393
mockCtrl := gomock.NewController(t)
94-
defer mockCtrl.Finish()
9594

9695
// Create mock backend
9796
mockBackend := getMockBackendWithMap(mockCtrl, map[string]string{
@@ -150,7 +149,6 @@ func TestAddAndDeleteBackendMetrics_MultipleBackends(t *testing.T) {
150149
// Add all backends to metrics
151150
for i, b := range backends {
152151
mockCtrl := gomock.NewController(t)
153-
defer mockCtrl.Finish()
154152

155153
mockBackend := getMockBackendWithMap(mockCtrl, map[string]string{
156154
"driverName": b.driverName,
@@ -243,7 +241,6 @@ func TestAddAndDeleteVolumeMetrics(t *testing.T) {
243241
metrics.VolumeAllocatedBytesGauge.Reset()
244242

245243
mockCtrl := gomock.NewController(t)
246-
defer mockCtrl.Finish()
247244

248245
// Create mock backend
249246
mockBackend := getMockBackendWithMap(mockCtrl, map[string]string{
@@ -348,7 +345,6 @@ func TestAddAndDeleteVolumeMetrics_InvalidSize(t *testing.T) {
348345
metrics.VolumeAllocatedBytesGauge.Reset()
349346

350347
mockCtrl := gomock.NewController(t)
351-
defer mockCtrl.Finish()
352348

353349
// Create mock backend
354350
mockBackend := getMockBackendWithMap(mockCtrl, map[string]string{
@@ -457,7 +453,6 @@ func TestAddAndDeleteSnapshotMetrics(t *testing.T) {
457453
metrics.SnapshotAllocatedBytesGauge.Reset()
458454

459455
mockCtrl := gomock.NewController(t)
460-
defer mockCtrl.Finish()
461456

462457
// Create mock backend
463458
mockBackend := getMockBackendWithMap(mockCtrl, map[string]string{

core/concurrent_cache/root.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
package concurrent_cache
2+
3+
// LockCache takes a write lock on the cache root, which blocks all other operations.
4+
func LockCache() Subquery {
5+
return Subquery{
6+
res: root,
7+
op: upsert,
8+
id: ".",
9+
setResults: func(s *Subquery, r *Result) error {
10+
return nil
11+
},
12+
}
13+
}

core/concurrent_cache/root_test.go

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
package concurrent_cache
2+
3+
import (
4+
"context"
5+
"sync"
6+
"testing"
7+
"time"
8+
9+
"github.com/stretchr/testify/assert"
10+
)
11+
12+
// TestLockCache tests that a Lock call with the LockCache subquery holds the root resource
13+
// and blocks any other Lock call until it is released.
14+
func TestLockCache(t *testing.T) {
15+
initCaches()
16+
17+
// Goroutine 1: acquire Lock with LockCache() and hold it
18+
lockHeld := make(chan struct{})
19+
releaseRequested := make(chan struct{})
20+
var unlockFirst func()
21+
var firstErr error
22+
var wg sync.WaitGroup
23+
wg.Add(1)
24+
go func() {
25+
defer wg.Done()
26+
_, unlock, err := Lock(context.Background(), Query(LockCache()))
27+
firstErr = err
28+
unlockFirst = unlock
29+
close(lockHeld)
30+
<-releaseRequested
31+
if unlockFirst != nil {
32+
unlockFirst()
33+
}
34+
}()
35+
36+
// Wait until the first Lock has been acquired
37+
<-lockHeld
38+
assert.NoError(t, firstErr, "first Lock(LockCache()) should succeed")
39+
40+
// Goroutine 2: attempt Lock with another query; it must block while the first holds LockCache
41+
secondDone := make(chan struct{})
42+
var secondErr error
43+
go func() {
44+
_, _, secondErr = Lock(context.Background(), Query(ReadBackend("backend1"), ReadVolume("volume1")))
45+
close(secondDone)
46+
}()
47+
48+
// Give the second goroutine a moment to reach the Lock call and block
49+
time.Sleep(50 * time.Millisecond)
50+
select {
51+
case <-secondDone:
52+
assert.FailNow(t, "second Lock completed while first still holds LockCache; LockCache should block other Lock calls")
53+
default:
54+
// Second Lock is still blocked, as expected
55+
}
56+
57+
// Release the first lock; second Lock should now complete
58+
close(releaseRequested)
59+
wg.Wait()
60+
61+
// Wait for second Lock to finish (with a timeout)
62+
select {
63+
case <-secondDone:
64+
assert.NoError(t, secondErr, "second Lock should succeed after first releases")
65+
case <-time.After(2 * time.Second):
66+
assert.FailNow(t, "second Lock did not complete after first released LockCache")
67+
}
68+
}

core/concurrent_cache/snapshot_test.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,6 @@ func TestUpsertSnapshot_Metrics(t *testing.T) {
5050
metrics.SnapshotAllocatedBytesGauge.Reset()
5151

5252
mockCtrl := gomock.NewController(t)
53-
defer mockCtrl.Finish()
5453

5554
// Create mock backend and volume
5655
mockBackend := getMockBackendWithMap(mockCtrl, map[string]string{
@@ -203,7 +202,6 @@ func TestDeleteSnapshot_Metrics(t *testing.T) {
203202
metrics.SnapshotAllocatedBytesGauge.Reset()
204203

205204
mockCtrl := gomock.NewController(t)
206-
defer mockCtrl.Finish()
207205

208206
// Create mock backend
209207
mockBackend := getMockBackendWithMap(mockCtrl, map[string]string{

0 commit comments

Comments
 (0)