Skip to content

Commit 9ee5404

Browse files
committed
more cleanup
1 parent 49c1d60 commit 9ee5404

2 files changed

Lines changed: 26 additions & 262 deletions

File tree

java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/EndpointLifecycleManager.java

Lines changed: 16 additions & 154 deletions
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,11 @@
2424
import java.time.Duration;
2525
import java.time.Instant;
2626
import java.util.ArrayList;
27-
import java.util.HashSet;
2827
import java.util.List;
2928
import java.util.Map;
3029
import java.util.Set;
3130
import java.util.concurrent.ConcurrentHashMap;
31+
import java.util.concurrent.RejectedExecutionException;
3232
import java.util.concurrent.ScheduledExecutorService;
3333
import java.util.concurrent.ScheduledFuture;
3434
import java.util.concurrent.TimeUnit;
@@ -104,20 +104,6 @@ static final class EndpointState {
104104
private final Map<String, EndpointState> endpoints = new ConcurrentHashMap<>();
105105
private final Set<String> transientFailureEvictedAddresses = ConcurrentHashMap.newKeySet();
106106

107-
/**
108-
* Active addresses reported by each ChannelFinder, keyed by database id.
109-
*
110-
* <p>ChannelFinder instances are held via SoftReference in KeyAwareChannel, so this map uses a
111-
* stable database-id key instead of a strong ChannelFinder reference. KeyAwareChannel unregisters
112-
* stale entries when a finder is cleared.
113-
*
114-
* <p>All reads and writes to this map, and all updates to {@link
115-
* #transientFailureEvictedAddresses}, are synchronized on {@link #activeAddressLock}.
116-
*/
117-
private final Map<String, Set<String>> activeAddressesPerFinder = new ConcurrentHashMap<>();
118-
119-
private final Object activeAddressLock = new Object();
120-
121107
private final ScheduledExecutorService scheduler;
122108
private final AtomicBoolean isShutdown = new AtomicBoolean(false);
123109
private final long probeIntervalSeconds;
@@ -164,52 +150,23 @@ static final class EndpointState {
164150
TimeUnit.SECONDS);
165151
}
166152

167-
/**
168-
* Ensures an endpoint state exists for the given address.
169-
*
170-
* <p>This is only called from {@link #updateActiveAddresses} under {@link #activeAddressLock} to
171-
* guarantee that newly created endpoints are registered as active before any stale-eviction check
172-
* can see them. Background creation tasks are scheduled by the caller after {@code
173-
* computeIfAbsent} returns, so the entry is visible in the map before the scheduler thread checks
174-
* it.
175-
*
176-
* @return true if a new endpoint state was created (caller should schedule background creation)
177-
*/
178-
private boolean ensureEndpointExists(String address) {
179-
if (isShutdown.get() || address == null || address.isEmpty()) {
180-
return false;
181-
}
182-
// Don't manage the default endpoint.
183-
if (defaultEndpointAddress.equals(address)) {
184-
return false;
185-
}
186-
187-
boolean[] created = {false};
188-
endpoints.computeIfAbsent(
189-
address,
190-
addr -> {
191-
logger.log(Level.FINE, "Creating endpoint state for address: {0}", addr);
192-
created[0] = true;
193-
return new EndpointState(addr, clock.instant());
194-
});
195-
return created[0];
196-
}
197-
198-
private void retainTransientFailureEvictionMarkers(Set<String> activeAddresses) {
199-
synchronized (activeAddressLock) {
200-
transientFailureEvictedAddresses.retainAll(activeAddresses);
201-
}
202-
}
203-
204153
private void markTransientFailureEvicted(String address) {
205-
synchronized (activeAddressLock) {
206-
transientFailureEvictedAddresses.add(address);
207-
}
154+
transientFailureEvictedAddresses.add(address);
208155
}
209156

210157
private void clearTransientFailureEvictionMarker(String address) {
211-
synchronized (activeAddressLock) {
212-
transientFailureEvictedAddresses.remove(address);
158+
transientFailureEvictedAddresses.remove(address);
159+
}
160+
161+
private void submitLifecycleTask(String description, Runnable task) {
162+
try {
163+
scheduler.submit(task);
164+
} catch (RejectedExecutionException e) {
165+
logger.log(
166+
Level.FINE,
167+
String.format(
168+
"Skipping lifecycle task '%s': lifecycle manager is shutting down", description),
169+
e);
213170
}
214171
}
215172

@@ -234,102 +191,7 @@ void recordRealTraffic(String address) {
234191
});
235192
state.lastRealTrafficAt = now;
236193
if (created[0]) {
237-
scheduler.submit(() -> startProbing(address));
238-
}
239-
}
240-
241-
/**
242-
* Atomically ensures endpoints exist for all active addresses and evicts any managed endpoints
243-
* that are no longer referenced by any finder. This handles the case where a tablet's server
244-
* address changes (e.g. from server1:15000 to server2:15000) — the old endpoint is shut down
245-
* promptly instead of lingering until idle eviction.
246-
*
247-
* <p>Both endpoint creation and stale-eviction are performed under the same lock to prevent a
248-
* race condition where a newly created endpoint could be evicted by a concurrent call from
249-
* another finder before it is registered as active.
250-
*
251-
* @param finderKey stable identifier of the ChannelFinder reporting its active addresses
252-
* @param activeAddresses server addresses currently referenced by tablets in this finder
253-
*/
254-
void updateActiveAddresses(String finderKey, Set<String> activeAddresses) {
255-
if (isShutdown.get() || finderKey == null || finderKey.isEmpty()) {
256-
return;
257-
}
258-
List<String> newlyCreated = new ArrayList<>();
259-
synchronized (activeAddressLock) {
260-
// Ensure endpoints exist for all active addresses while holding the lock.
261-
// This guarantees the addresses are in the endpoints map before we compute stale entries.
262-
for (String address : activeAddresses) {
263-
if (ensureEndpointExists(address)) {
264-
newlyCreated.add(address);
265-
}
266-
}
267-
268-
activeAddressesPerFinder.put(finderKey, activeAddresses);
269-
270-
// Compute the union of all active addresses across all finders.
271-
Set<String> allActive = new HashSet<>();
272-
for (Set<String> addresses : activeAddressesPerFinder.values()) {
273-
allActive.addAll(addresses);
274-
}
275-
retainTransientFailureEvictionMarkers(allActive);
276-
277-
// Evict managed endpoints not referenced by any finder.
278-
List<String> stale = new ArrayList<>();
279-
for (String address : endpoints.keySet()) {
280-
if (!allActive.contains(address)) {
281-
stale.add(address);
282-
}
283-
}
284-
285-
for (String address : stale) {
286-
logger.log(
287-
Level.FINE, "Evicting stale endpoint {0}: no longer referenced by any tablet", address);
288-
evictEndpoint(address);
289-
}
290-
}
291-
292-
// Schedule background creation tasks AFTER computeIfAbsent has returned and the entries
293-
// are visible to other threads. Submitting from inside computeIfAbsent creates a race
294-
// where the scheduler thread can run before the entry is published in the map.
295-
for (String address : newlyCreated) {
296-
scheduler.submit(() -> createAndStartProbing(address));
297-
}
298-
}
299-
300-
/**
301-
* Unregisters a finder and evicts any managed endpoints that are no longer referenced by the
302-
* remaining finders.
303-
*/
304-
void unregisterFinder(String finderKey) {
305-
if (isShutdown.get() || finderKey == null || finderKey.isEmpty()) {
306-
return;
307-
}
308-
synchronized (activeAddressLock) {
309-
if (activeAddressesPerFinder.remove(finderKey) == null) {
310-
return;
311-
}
312-
313-
Set<String> allActive = new HashSet<>();
314-
for (Set<String> addresses : activeAddressesPerFinder.values()) {
315-
allActive.addAll(addresses);
316-
}
317-
retainTransientFailureEvictionMarkers(allActive);
318-
319-
List<String> stale = new ArrayList<>();
320-
for (String address : endpoints.keySet()) {
321-
if (!allActive.contains(address)) {
322-
stale.add(address);
323-
}
324-
}
325-
326-
for (String address : stale) {
327-
logger.log(
328-
Level.FINE,
329-
"Evicting stale endpoint {0}: finder {1} was unregistered",
330-
new Object[] {address, finderKey});
331-
evictEndpoint(address);
332-
}
194+
submitLifecycleTask("startProbing", () -> startProbing(address));
333195
}
334196
}
335197

@@ -567,7 +429,7 @@ void requestEndpointRecreation(String address) {
567429
EndpointState state = new EndpointState(address, clock.instant());
568430
if (endpoints.putIfAbsent(address, state) == null) {
569431
// Schedule after putIfAbsent returns so the entry is visible to the scheduler thread.
570-
scheduler.submit(() -> createAndStartProbing(address));
432+
submitLifecycleTask("createAndStartProbing", () -> createAndStartProbing(address));
571433
}
572434
}
573435

java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/EndpointLifecycleManagerTest.java

Lines changed: 10 additions & 108 deletions
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,7 @@
2626
import java.time.Duration;
2727
import java.time.Instant;
2828
import java.time.ZoneId;
29-
import java.util.Collections;
30-
import java.util.HashSet;
31-
import java.util.Set;
3229
import java.util.concurrent.TimeUnit;
33-
import java.util.concurrent.atomic.AtomicLong;
3430
import java.util.concurrent.locks.LockSupport;
3531
import java.util.function.BooleanSupplier;
3632
import org.junit.After;
@@ -43,26 +39,17 @@ public class EndpointLifecycleManagerTest {
4339

4440
private EndpointLifecycleManager manager;
4541

46-
/** Counter for generating unique finder keys in tests. */
47-
private static final AtomicLong TEST_FINDER_ID = new AtomicLong(1000);
48-
4942
@After
5043
public void tearDown() {
5144
if (manager != null) {
5245
manager.shutdown();
5346
}
5447
}
5548

56-
/**
57-
* Registers addresses with the lifecycle manager via updateActiveAddresses, which atomically
58-
* creates endpoints and registers them as active. This mirrors how ChannelFinder.update() works.
59-
*/
60-
private static String registerAddresses(EndpointLifecycleManager mgr, String... addresses) {
61-
String finderId = "finder-" + TEST_FINDER_ID.incrementAndGet();
62-
Set<String> addressSet = new HashSet<>();
63-
Collections.addAll(addressSet, addresses);
64-
mgr.updateActiveAddresses(finderId, addressSet);
65-
return finderId;
49+
private static void registerAddresses(EndpointLifecycleManager mgr, String... addresses) {
50+
for (String address : addresses) {
51+
mgr.requestEndpointRecreation(address);
52+
}
6653
}
6754

6855
private static void awaitCondition(String message, BooleanSupplier condition) {
@@ -103,10 +90,9 @@ public void duplicateRegistrationIsNoop() throws Exception {
10390
new EndpointLifecycleManager(
10491
cache, /* probeIntervalSeconds= */ 60, Duration.ofMinutes(30), Clock.systemUTC());
10592

106-
String finderId = registerAddresses(manager, "server1");
107-
// Re-register with the same finder ID — should not create duplicate state.
108-
manager.updateActiveAddresses(finderId, Collections.singleton("server1"));
109-
manager.updateActiveAddresses(finderId, Collections.singleton("server1"));
93+
registerAddresses(manager, "server1");
94+
manager.requestEndpointRecreation("server1");
95+
manager.requestEndpointRecreation("server1");
11096

11197
assertEquals(1, manager.managedEndpointCount());
11298
}
@@ -118,7 +104,7 @@ public void defaultEndpointIsNotManaged() {
118104
new EndpointLifecycleManager(
119105
cache, /* probeIntervalSeconds= */ 60, Duration.ofMinutes(30), Clock.systemUTC());
120106

121-
registerAddresses(manager, "default");
107+
manager.requestEndpointRecreation("default");
122108

123109
assertFalse(manager.isManaged("default"));
124110
assertEquals(0, manager.managedEndpointCount());
@@ -155,8 +141,6 @@ public void realRoutedTrafficUpdatesLastRealTrafficAt() throws Exception {
155141
new EndpointLifecycleManager(
156142
cache, /* probeIntervalSeconds= */ 60, Duration.ofMinutes(30), clock);
157143

158-
registerAddresses(manager, "server1");
159-
160144
Instant before = clock.instant();
161145
clock.advance(Duration.ofMinutes(5));
162146
manager.recordRealTraffic("server1");
@@ -267,28 +251,6 @@ public void transientFailureEvictionTrackedUntilEndpointReadyAgain() throws Exce
267251
() -> !manager.wasRecentlyEvictedTransientFailure("server1"));
268252
}
269253

270-
@Test
271-
public void transientFailureEvictionMarkerRemovedWhenAddressNoLongerActive() throws Exception {
272-
KeyRangeCacheTest.FakeEndpointCache cache = new KeyRangeCacheTest.FakeEndpointCache();
273-
manager =
274-
new EndpointLifecycleManager(
275-
cache, /* probeIntervalSeconds= */ 1, Duration.ofMinutes(30), Clock.systemUTC());
276-
277-
String finder = registerAddresses(manager, "server1");
278-
awaitCondition(
279-
"endpoint should be created in background", () -> cache.getIfPresent("server1") != null);
280-
281-
cache.setState("server1", KeyRangeCacheTest.EndpointHealthState.TRANSIENT_FAILURE);
282-
awaitCondition(
283-
"endpoint should be evicted after repeated transient failures",
284-
() ->
285-
!manager.isManaged("server1") && manager.wasRecentlyEvictedTransientFailure("server1"));
286-
287-
manager.updateActiveAddresses(finder, Collections.emptySet());
288-
289-
assertFalse(manager.wasRecentlyEvictedTransientFailure("server1"));
290-
}
291-
292254
@Test
293255
public void shutdownStopsAllProbing() throws Exception {
294256
KeyRangeCacheTest.FakeEndpointCache cache = new KeyRangeCacheTest.FakeEndpointCache();
@@ -312,8 +274,8 @@ public void emptyOrNullAddressIsIgnored() {
312274
new EndpointLifecycleManager(
313275
cache, /* probeIntervalSeconds= */ 60, Duration.ofMinutes(30), Clock.systemUTC());
314276

315-
manager.updateActiveAddresses("finder-1", Collections.singleton(""));
316-
manager.updateActiveAddresses("finder-2", Collections.emptySet());
277+
manager.requestEndpointRecreation("");
278+
manager.requestEndpointRecreation(null);
317279

318280
assertEquals(0, manager.managedEndpointCount());
319281
}
@@ -331,66 +293,6 @@ public void recordRealTrafficForDefaultEndpointIsIgnored() {
331293
assertEquals(0, manager.managedEndpointCount());
332294
}
333295

334-
@Test
335-
public void staleEndpointEvictedWhenNoLongerActive() throws Exception {
336-
KeyRangeCacheTest.FakeEndpointCache cache = new KeyRangeCacheTest.FakeEndpointCache();
337-
manager =
338-
new EndpointLifecycleManager(
339-
cache, /* probeIntervalSeconds= */ 60, Duration.ofMinutes(30), Clock.systemUTC());
340-
341-
// Finder 1 reports server1 and server2.
342-
String finder1 = registerAddresses(manager, "server1", "server2");
343-
assertEquals(2, manager.managedEndpointCount());
344-
345-
// Finder 1 updates: server1 is gone, only server2 remains.
346-
manager.updateActiveAddresses(finder1, Collections.singleton("server2"));
347-
348-
// server1 should be evicted since no finder references it.
349-
assertFalse(manager.isManaged("server1"));
350-
assertTrue(manager.isManaged("server2"));
351-
assertEquals(1, manager.managedEndpointCount());
352-
}
353-
354-
@Test
355-
public void endpointKeptIfReferencedByAnotherFinder() throws Exception {
356-
KeyRangeCacheTest.FakeEndpointCache cache = new KeyRangeCacheTest.FakeEndpointCache();
357-
manager =
358-
new EndpointLifecycleManager(
359-
cache, /* probeIntervalSeconds= */ 60, Duration.ofMinutes(30), Clock.systemUTC());
360-
361-
// Finder 1 reports server1.
362-
String finder1 = registerAddresses(manager, "server1");
363-
// Finder 2 also reports server1.
364-
registerAddresses(manager, "server1");
365-
366-
// Finder 1 drops server1, but finder 2 still references it.
367-
manager.updateActiveAddresses(finder1, Collections.emptySet());
368-
369-
assertTrue(manager.isManaged("server1"));
370-
assertEquals(1, manager.managedEndpointCount());
371-
}
372-
373-
@Test
374-
public void unregisterFinderEvictsEndpointsNoLongerReferenced() throws Exception {
375-
KeyRangeCacheTest.FakeEndpointCache cache = new KeyRangeCacheTest.FakeEndpointCache();
376-
manager =
377-
new EndpointLifecycleManager(
378-
cache, /* probeIntervalSeconds= */ 60, Duration.ofMinutes(30), Clock.systemUTC());
379-
380-
String finder1 = registerAddresses(manager, "server1");
381-
String finder2 = registerAddresses(manager, "server2");
382-
383-
manager.unregisterFinder(finder1);
384-
385-
assertFalse(manager.isManaged("server1"));
386-
assertTrue(manager.isManaged("server2"));
387-
assertEquals(1, manager.managedEndpointCount());
388-
389-
manager.unregisterFinder(finder2);
390-
391-
assertEquals(0, manager.managedEndpointCount());
392-
}
393-
394296
/** Test clock that can be advanced manually. */
395297
private static final class TestClock extends Clock {
396298
private Instant now;

0 commit comments

Comments
 (0)