chore(spanner): optimize lock contention and skipped tablet reporting#12719
chore(spanner): optimize lock contention and skipped tablet reporting#12719
Conversation
There was a problem hiding this comment.
Code Review
This pull request introduces asynchronous cache updates in ChannelFinder and KeyAwareChannel to improve performance, and enhances EndpointLifecycleManager to track and report transient failure evictions. Additionally, it refactors KeyRangeCache to use ReentrantReadWriteLock for better concurrency. My review highlights critical issues: the addRanges method in KeyRangeCache is not exception-safe and risks reference count leaks, the readLock release in fillRoutingHint creates a race condition that requires CachedGroup to be fully synchronized, and the use of Executors.newCachedThreadPool in ChannelFinder poses a risk of resource exhaustion.
| public void addRanges(CacheUpdate cacheUpdate) { | ||
| List<CachedGroup> newGroups = new ArrayList<>(); | ||
| synchronized (lock) { | ||
| List<CachedGroup> touchedGroups = new ArrayList<>(); | ||
| writeLock.lock(); | ||
| try { | ||
| for (Group groupIn : cacheUpdate.getGroupList()) { | ||
| newGroups.add(findOrInsertGroup(groupIn)); | ||
| touchedGroups.add(findOrInsertGroup(groupIn.getGroupUid())); | ||
| } | ||
| } finally { | ||
| writeLock.unlock(); | ||
| } | ||
|
|
||
| for (int i = 0; i < cacheUpdate.getGroupCount(); i++) { | ||
| touchedGroups.get(i).update(cacheUpdate.getGroup(i)); | ||
| } | ||
|
|
||
| writeLock.lock(); | ||
| try { | ||
| for (Range rangeIn : cacheUpdate.getRangeList()) { | ||
| replaceRangeIfNewer(rangeIn); | ||
| } | ||
| for (CachedGroup group : newGroups) { | ||
| for (CachedGroup group : touchedGroups) { | ||
| unref(group); | ||
| } | ||
| } finally { | ||
| writeLock.unlock(); | ||
| } | ||
| } |
There was a problem hiding this comment.
The addRanges implementation is not exception-safe. If an exception occurs during update() or replaceRangeIfNewer(), the unref() calls will be skipped, leading to a reference count leak. Following repository guidelines for resource management, the unref() logic should be placed in a finally block and executed in reverse order (LIFO) to ensure all resources are released even if exceptions occur.
public void addRanges(CacheUpdate cacheUpdate) {
List<CachedGroup> touchedGroups = new ArrayList<>();
writeLock.lock();
try {
for (Group groupIn : cacheUpdate.getGroupList()) {
touchedGroups.add(findOrInsertGroup(groupIn.getGroupUid()));
}
} finally {
writeLock.unlock();
}
try {
for (int i = 0; i < cacheUpdate.getGroupCount(); i++) {
touchedGroups.get(i).update(cacheUpdate.getGroup(i));
}
writeLock.lock();
try {
for (Range rangeIn : cacheUpdate.getRangeList()) {
replaceRangeIfNewer(rangeIn);
}
} finally {
writeLock.unlock();
}
} finally {
writeLock.lock();
try {
for (int i = touchedGroups.size() - 1; i >= 0; i--) {
unref(touchedGroups.get(i));
}
} finally {
writeLock.unlock();
}
}
}References
- Ensure resources are released in reverse order of creation (LIFO) and that the implementation is exception-safe using finally blocks to prevent leaks.
| readLock.lock(); | ||
| try { | ||
| targetRange = findRangeLocked(key, hintBuilder.getLimitKey(), rangeMode); | ||
| } finally { | ||
| readLock.unlock(); |
There was a problem hiding this comment.
Releasing the readLock before calling targetRange.group.fillRoutingHint() introduces a race condition because addRanges now performs group updates outside the global lock. Since CachedGroup relies on monitor-based synchronization (as seen in getActiveAddresses at line 180), both CachedGroup.update and CachedGroup.fillRoutingHint must be synchronized to ensure thread safety. Without this, concurrent updates and reads of the group's internal state (like tablets and leaderIndex) could lead to inconsistent behavior or data corruption.
| private static final ExecutorService CACHE_UPDATE_POOL = | ||
| Executors.newCachedThreadPool( | ||
| r -> { | ||
| Thread t = new Thread(r, "spanner-cache-update"); | ||
| t.setDaemon(true); | ||
| return t; | ||
| }); |
There was a problem hiding this comment.
Using Executors.newCachedThreadPool() for a static shared pool can lead to unbounded thread creation if many ChannelFinder instances (e.g., in a multi-database environment) receive updates simultaneously. Consider using a ThreadPoolExecutor with a defined maximum pool size to prevent potential resource exhaustion.
Summary
synchronizedwithReadWriteLockinKeyRangeCacheto reduce lock contention on the hot path. Read operations (findServer,getActiveAddresses,size,debugString) now take a read lock, allowing concurrent lookups. Group updates are performed outside the write lock to minimize critical section duration.ChannelFinderviaupdateAsync(), using a sequential executor backed by a shared daemon thread pool. This preventsCacheUpdateprocessing from blocking the gRPC response listener thread.EndpointLifecycleManagertracks addresses evicted for TRANSIENT_FAILURE andKeyRangeCacheincludes their tablet UIDs inskipped_tablets, giving the server better routing visibility.CachedRange.lastAccessasvolatilefor safe cross-thread reads.