Skip to content

Commit 0116e0a

Browse files
NiteshNETIZEN-11
authored andcommitted
fix: release lock during DB operation in cachedFetcher.update()
Signed-off-by: Nitesh <nitesh@example.com>
1 parent a75e3ad commit 0116e0a

2 files changed

Lines changed: 119 additions & 3 deletions

File tree

token/services/selector/sherdlock/fetcher.go

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -219,28 +219,45 @@ func newCachedFetcher(tokenDB TokenDB, cacheSize int64, freshnessInterval time.D
219219
}
220220
}
221221

222-
// update refreshes the token cache from the database, adding new entries before removing stale ones to prevent race conditions.
222+
// update refreshes the token cache from the database. It releases the lock during the
223+
// potentially slow DB operation to avoid blocking other goroutines, then re-acquires
224+
// the lock to atomically update the cache. A re-check of staleness is performed
225+
// after the DB call completes to avoid overwriting a cache that was refreshed by
226+
// another goroutine while waiting for the database.
223227
func (f *cachedFetcher) update(ctx context.Context) {
224228
f.mu.Lock()
225-
defer f.mu.Unlock()
226229
if !f.isCacheStale() && !f.isCacheOverused() {
227230
logger.DebugfContext(ctx, "Cache renewed in the meantime by another process")
231+
f.mu.Unlock()
228232

229233
return
230234
}
231235
logger.DebugfContext(ctx, "Renew token cache")
236+
237+
// Release lock during slow DB operation to not block other token operations
238+
f.mu.Unlock()
239+
232240
it, err := f.tokenDB.SpendableTokensIteratorBy(ctx, "", "")
233241
if err != nil {
234242
logger.Warnf("Failed to get token iterator: %v", err)
235-
236243
return
237244
}
238245
defer it.Close()
239246

240247
m := f.groupTokensByKey(ctx, it)
248+
249+
f.mu.Lock()
250+
// Re-check: another goroutine may have refreshed while we waited for DB
251+
if !f.isCacheStale() && !f.isCacheOverused() {
252+
logger.DebugfContext(ctx, "Cache renewed in the meantime by another process, skipping")
253+
f.mu.Unlock()
254+
255+
return
256+
}
241257
f.updateCache(ctx, m)
242258
f.lastFetched = time.Now()
243259
atomic.StoreUint32(&f.queriesResponded, 0)
260+
f.mu.Unlock()
244261
}
245262

246263
// groupTokensByKey reads tokens from the iterator and groups them by wallet/currency key.

token/services/selector/sherdlock/fetcher_test.go

Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ package sherdlock
99
import (
1010
"context"
1111
"errors"
12+
"sync"
1213
"sync/atomic"
1314
"testing"
1415
"time"
@@ -980,3 +981,101 @@ func (m *mockStoreServiceManager) StoreServiceByTMSId(tmsID token.TMSID) (*token
980981

981982
return nil, errors.New("not implemented")
982983
}
984+
985+
// TestCachedFetcher_UpdateDoesNotBlockReaders tests that the update() function
986+
// releases the lock during the potentially slow DB operation, allowing concurrent
987+
// readers to access the cache. This is the fix for issue #16.
988+
func TestCachedFetcher_UpdateDoesNotBlockReaders(t *testing.T) {
989+
mockDB := new(mockTokenDB)
990+
// Use long freshness interval so cache won't be stale
991+
fetcher := newCachedFetcher(mockDB, 0, 10*time.Second, 100)
992+
993+
// Pre-populate the cache so readers can hit it
994+
initialTokens := []*token2.UnspentTokenInWallet{
995+
{WalletID: "wallet1", Type: "USD", Quantity: "100"},
996+
}
997+
mockIterator := iterators.Slice(initialTokens)
998+
mockDB.On("SpendableTokensIteratorBy", mock.Anything, "", token2.Type("")).Return(mockIterator, nil).Once()
999+
1000+
ctx := t.Context()
1001+
fetcher.update(ctx)
1002+
1003+
// Make cache stale so update() will be called
1004+
fetcher.lastFetched = time.Now().Add(-20 * time.Second)
1005+
1006+
// Use a channel to simulate a slow DB operation
1007+
slowDB := make(chan struct{})
1008+
tokensAfterSlowDB := []*token2.UnspentTokenInWallet{
1009+
{WalletID: "wallet1", Type: "USD", Quantity: "200"},
1010+
}
1011+
mockIterator2 := iterators.Slice(tokensAfterSlowDB)
1012+
mockDB.On("SpendableTokensIteratorBy", mock.Anything, "", token2.Type("")).Return(mockIterator2, nil).Run(func(args mock.Arguments) {
1013+
<-slowDB // Wait before returning to simulate slow DB
1014+
}).Once()
1015+
1016+
// Track whether reader succeeded while update() was blocked on DB
1017+
var readerSuccess atomic.Bool
1018+
var readerWg sync.WaitGroup
1019+
1020+
// Start update in background (it will block on DB call)
1021+
readerWg.Add(1)
1022+
go func() {
1023+
defer readerWg.Done()
1024+
fetcher.update(ctx)
1025+
}()
1026+
1027+
// Small delay to ensure update() has released lock and is waiting on DB
1028+
time.Sleep(10 * time.Millisecond)
1029+
1030+
// Reader should be able to acquire RLock while update() waits on DB
1031+
// This would deadlock before the fix (issue #16)
1032+
fetcher.mu.RLock()
1033+
_, ok := fetcher.cache.Get(tokenKey("wallet1", "USD"))
1034+
fetcher.mu.RUnlock()
1035+
1036+
if ok {
1037+
readerSuccess.Store(true)
1038+
}
1039+
1040+
// Signal DB to complete
1041+
close(slowDB)
1042+
1043+
// Wait for update to complete
1044+
readerWg.Wait()
1045+
1046+
// Verify reader succeeded - the cache should still be accessible during update
1047+
assert.True(t, readerSuccess.Load(), "reader should be able to access cache while update() is blocked on DB")
1048+
mockDB.AssertExpectations(t)
1049+
}
1050+
1051+
// TestCachedFetcher_UpdateReacquiresLockAfterDB tests that after the DB operation
1052+
// completes, update() correctly re-acquires the lock and performs the cache update.
1053+
func TestCachedFetcher_UpdateReacquiresLockAfterDB(t *testing.T) {
1054+
mockDB := new(mockTokenDB)
1055+
fetcher := newCachedFetcher(mockDB, 0, 1*time.Second, 100)
1056+
1057+
// Pre-populate to make cache appear stale
1058+
fetcher.lastFetched = time.Now().Add(-20 * time.Second)
1059+
1060+
tokens := []*token2.UnspentTokenInWallet{
1061+
{WalletID: "wallet1", Type: "USD", Quantity: "300"},
1062+
}
1063+
mockIterator := iterators.Slice(tokens)
1064+
mockDB.On("SpendableTokensIteratorBy", mock.Anything, "", token2.Type("")).Return(mockIterator, nil).Once()
1065+
1066+
ctx := t.Context()
1067+
fetcher.update(ctx)
1068+
1069+
// After update completes, cache should be refreshed (not stale)
1070+
assert.False(t, fetcher.isCacheStale())
1071+
assert.Equal(t, uint32(0), atomic.LoadUint32(&fetcher.queriesResponded))
1072+
1073+
// Token should be in cache
1074+
fetcher.mu.RLock()
1075+
_, ok := fetcher.cache.Get(tokenKey("wallet1", "USD"))
1076+
fetcher.mu.RUnlock()
1077+
assert.True(t, ok, "token should be in cache after update")
1078+
1079+
mockDB.AssertExpectations(t)
1080+
}
1081+

0 commit comments

Comments
 (0)