Skip to content

Commit 2fd05f7

Browse files
committed
Skip defensive copy for completed prepare cache futures
When the cached CompletableFuture is already done, return it directly instead of creating a defensive copy via thenApply(x -> x). Completed futures are immutable (cancel/complete are no-ops), so the copy only served to release the caller's strong reference to the cached value, causing premature weak-value eviction under GC pressure. This led to repeated PREPARE requests being sent to all nodes on every execution, as the cache entry would be garbage-collected between calls. With prepare-on-all-nodes=true (default), each eviction multiplied the re-prepare cost by the cluster node count. The defensive copy is still used for in-flight futures to protect the shared cache entry from cancellation by concurrent waiters. Ref: CUSTOMER-372
1 parent 9149592 commit 2fd05f7

2 files changed

Lines changed: 166 additions & 2 deletions

File tree

core/src/main/java/com/datastax/oss/driver/internal/core/cql/CqlPrepareAsyncProcessor.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -162,8 +162,15 @@ public CompletionStage<PreparedStatement> process(
162162
});
163163
}
164164
}
165-
// Return a defensive copy. So if a client cancels its request, the cache won't be impacted
166-
// nor a potential concurrent request.
165+
if (result.isDone()) {
166+
// Completed futures are immutable (cancel/complete/completeExceptionally are no-ops),
167+
// so returning the cached instance directly is safe. This also keeps the cache entry
168+
// alive via the caller's strong reference, preventing premature weak-value eviction
169+
// under GC pressure.
170+
return result;
171+
}
172+
// Defensive copy for in-flight preparations only: protects the shared cached future
173+
// from cancellation by one of multiple concurrent waiters.
167174
return result.thenApply(x -> x); // copy() is available only since Java 9
168175
} catch (ExecutionException e) {
169176
return CompletableFutures.failedFuture(e.getCause());
Lines changed: 157 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,157 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package com.datastax.oss.driver.internal.core.cql;
19+
20+
import static org.assertj.core.api.Assertions.assertThat;
21+
22+
import com.datastax.oss.driver.api.core.cql.PrepareRequest;
23+
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
24+
import com.datastax.oss.driver.shaded.guava.common.cache.Cache;
25+
import java.lang.ref.WeakReference;
26+
import java.util.Optional;
27+
import java.util.concurrent.CompletableFuture;
28+
import java.util.concurrent.CompletionStage;
29+
import org.junit.Before;
30+
import org.junit.Test;
31+
import org.mockito.Mockito;
32+
33+
/**
34+
* Unit tests for {@link CqlPrepareAsyncProcessor} focusing on the caching behavior of {@link
35+
* CqlPrepareAsyncProcessor#process} with respect to defensive copies and weak-value retention.
36+
*/
37+
public class CqlPrepareAsyncProcessorTest {
38+
39+
private CqlPrepareAsyncProcessor processor;
40+
private Cache<PrepareRequest, CompletableFuture<PreparedStatement>> cache;
41+
42+
@Before
43+
public void setup() {
44+
processor = new CqlPrepareAsyncProcessor(Optional.empty());
45+
cache = processor.getCache();
46+
}
47+
48+
/**
49+
* When the cached future is already completed, process() should return the exact same instance
50+
* (identity). This ensures callers hold a strong reference to the cached CF, preventing
51+
* weak-value eviction under GC pressure.
52+
*/
53+
@Test
54+
public void should_return_cached_future_directly_when_already_completed() throws Exception {
55+
PrepareRequest request = new DefaultPrepareRequest("SELECT 1");
56+
PreparedStatement ps = Mockito.mock(PreparedStatement.class);
57+
58+
// Pre-populate cache with a completed future
59+
CompletableFuture<PreparedStatement> completed = CompletableFuture.completedFuture(ps);
60+
cache.put(request, completed);
61+
62+
// process() should return the exact same object
63+
CompletionStage<PreparedStatement> returned = processor.process(request, null, null, "test");
64+
65+
assertThat(returned).isSameAs(completed);
66+
}
67+
68+
/**
69+
* When the cached future is still in-flight (not yet done), process() should return a defensive
70+
* copy to protect the cache from cancellation by the caller.
71+
*/
72+
@Test
73+
public void should_return_defensive_copy_when_future_is_in_flight() throws Exception {
74+
PrepareRequest request = new DefaultPrepareRequest("SELECT 1");
75+
76+
// Pre-populate cache with an incomplete future
77+
CompletableFuture<PreparedStatement> inFlight = new CompletableFuture<>();
78+
cache.put(request, inFlight);
79+
80+
CompletionStage<PreparedStatement> returned = processor.process(request, null, null, "test");
81+
82+
// Should NOT be the same instance
83+
assertThat(returned).isNotSameAs(inFlight);
84+
85+
// Cancelling the returned copy should NOT affect the cached future
86+
returned.toCompletableFuture().cancel(false);
87+
assertThat(inFlight.isCancelled()).isFalse();
88+
}
89+
90+
/**
91+
* Verifies that returning the cached future directly (for completed entries) keeps the weak-value
92+
* cache entry alive as long as the caller holds a reference to the returned stage.
93+
*/
94+
@Test
95+
public void should_keep_cache_entry_alive_when_caller_holds_completed_future() throws Exception {
96+
PrepareRequest request = new DefaultPrepareRequest("SELECT 1");
97+
PreparedStatement ps = Mockito.mock(PreparedStatement.class);
98+
99+
CompletableFuture<PreparedStatement> completed = CompletableFuture.completedFuture(ps);
100+
cache.put(request, completed);
101+
102+
// Simulate what a caller does: hold the returned stage
103+
CompletionStage<PreparedStatement> held = processor.process(request, null, null, "test");
104+
105+
// Create a weak reference to detect if cache entry would be collected
106+
WeakReference<CompletableFuture<PreparedStatement>> weakRef = new WeakReference<>(completed);
107+
// Drop our local strong reference
108+
//noinspection UnusedAssignment
109+
completed = null;
110+
111+
// Force GC
112+
System.gc();
113+
Thread.sleep(100);
114+
115+
// The cache entry should still be alive because 'held' IS the cached CF
116+
cache.cleanUp();
117+
assertThat(cache.getIfPresent(request)).isNotNull();
118+
assertThat(weakRef.get()).isNotNull();
119+
120+
// Verify held is usable
121+
assertThat(held.toCompletableFuture().get()).isSameAs(ps);
122+
}
123+
124+
/**
125+
* Demonstrates the problem this fix addresses: without the fix, a defensive copy would be the
126+
* only reference returned, and if the caller doesn't hold it long enough, GC can evict the cache
127+
* entry. This test shows that with the fix, even after the caller's reference goes out of scope,
128+
* the behavior is correct for the next caller who retrieves it promptly.
129+
*/
130+
@Test
131+
public void should_allow_gc_eviction_when_no_strong_references_remain() throws Exception {
132+
PrepareRequest request = new DefaultPrepareRequest("SELECT 1");
133+
PreparedStatement ps = Mockito.mock(PreparedStatement.class);
134+
135+
CompletableFuture<PreparedStatement> completed = CompletableFuture.completedFuture(ps);
136+
cache.put(request, completed);
137+
138+
// Drop all strong references
139+
//noinspection UnusedAssignment
140+
completed = null;
141+
142+
// Force GC - weak value should be collected
143+
for (int i = 0; i < 10; i++) {
144+
System.gc();
145+
Thread.sleep(50);
146+
cache.cleanUp();
147+
if (cache.getIfPresent(request) == null) {
148+
break;
149+
}
150+
}
151+
152+
// Cache entry may have been evicted (weak values)
153+
// This is expected behavior - the fix ensures callers who DO hold a reference keep it alive
154+
// We just verify the cache doesn't throw
155+
assertThat(cache.size()).isGreaterThanOrEqualTo(0);
156+
}
157+
}

0 commit comments

Comments
 (0)