1818package com .datastax .oss .driver .core .cql ;
1919
2020import static org .assertj .core .api .Assertions .assertThat ;
21+ import static org .awaitility .Awaitility .await ;
2122
2223import com .codahale .metrics .Gauge ;
2324import com .datastax .oss .driver .api .core .CqlSession ;
2425import com .datastax .oss .driver .api .core .config .DefaultDriverOption ;
2526import com .datastax .oss .driver .api .core .config .DriverConfigLoader ;
2627import com .datastax .oss .driver .api .core .context .DriverContext ;
28+ import com .datastax .oss .driver .api .core .cql .PrepareRequest ;
2729import com .datastax .oss .driver .api .core .cql .PreparedStatement ;
2830import com .datastax .oss .driver .api .core .metrics .DefaultSessionMetric ;
2931import com .datastax .oss .driver .api .core .session .ProgrammaticArguments ;
3436import com .datastax .oss .driver .api .testinfra .session .SessionUtils ;
3537import com .datastax .oss .driver .categories .IsolatedTests ;
3638import com .datastax .oss .driver .internal .core .context .DefaultDriverContext ;
39+ import com .datastax .oss .driver .internal .core .context .InternalDriverContext ;
3740import com .datastax .oss .driver .internal .core .cql .CqlPrepareAsyncProcessor ;
3841import com .datastax .oss .driver .internal .core .cql .CqlPrepareSyncProcessor ;
3942import com .datastax .oss .driver .internal .core .metadata .schema .events .TypeChangeEvent ;
4043import com .datastax .oss .driver .internal .core .session .BuiltInRequestProcessors ;
44+ import com .datastax .oss .driver .internal .core .session .DefaultSession ;
4145import com .datastax .oss .driver .internal .core .session .RequestProcessor ;
4246import com .datastax .oss .driver .internal .core .session .RequestProcessorRegistry ;
4347import com .datastax .oss .driver .shaded .guava .common .cache .RemovalListener ;
5357import java .util .Optional ;
5458import java .util .Set ;
5559import java .util .concurrent .CompletableFuture ;
60+ import java .util .concurrent .CompletionStage ;
5661import java .util .concurrent .ConcurrentHashMap ;
5762import java .util .concurrent .CountDownLatch ;
5863import java .util .concurrent .TimeUnit ;
@@ -117,6 +122,9 @@ private static class TestCqlPrepareAsyncProcessor extends CqlPrepareAsyncProcess
117122 private static final Logger LOG =
118123 LoggerFactory .getLogger (PreparedStatementCachingIT .TestCqlPrepareAsyncProcessor .class );
119124
125+ private final Set <CompletableFuture <PreparedStatement >> retainedCacheValues =
126+ ConcurrentHashMap .newKeySet ();
127+
120128 private static RemovalListener <Object , Object > buildCacheRemoveCallback (
121129 @ NonNull Optional <DefaultDriverContext > context ) {
122130 return (evt ) -> {
@@ -133,10 +141,25 @@ private static RemovalListener<Object, Object> buildCacheRemoveCallback(
133141 }
134142
135143 public TestCqlPrepareAsyncProcessor (@ NonNull Optional <DefaultDriverContext > context ) {
136- // Default CqlPrepareAsyncProcessor uses weak values here as well. We avoid doing so
137- // to prevent cache entries from unexpectedly disappearing mid-test .
144+ // Default CqlPrepareAsyncProcessor uses weak values. Retain the cached futures so this test
145+ // validates type-change invalidation instead of racing cache value collection .
138146 super (context , builder -> builder .removalListener (buildCacheRemoveCallback (context )));
139147 }
148+
149+ @ Override
150+ public CompletionStage <PreparedStatement > process (
151+ PrepareRequest request ,
152+ DefaultSession session ,
153+ InternalDriverContext context ,
154+ String sessionLogPrefix ) {
155+ CompletionStage <PreparedStatement > stage =
156+ super .process (request , session , context , sessionLogPrefix );
157+ CompletableFuture <PreparedStatement > cachedValue = cache .getIfPresent (request );
158+ if (cachedValue != null ) {
159+ retainedCacheValues .add (cachedValue );
160+ }
161+ return stage ;
162+ }
140163 }
141164
142165 private static class TestDefaultDriverContext extends DefaultDriverContext {
@@ -222,11 +245,11 @@ private void invalidationTestInner(
222245 assertThat (getPreparedCacheSize (session )).isEqualTo (0 );
223246 setupTestSchema .accept (session );
224247
225- session .prepare (preparedStmtQueryType1 );
226- ByteBuffer queryId2 = session .prepare (preparedStmtQueryType2 ).getId ();
248+ PreparedStatement statement1 = session .prepare (preparedStmtQueryType1 );
249+ PreparedStatement statement2 = session .prepare (preparedStmtQueryType2 );
250+ ByteBuffer queryId2 = statement2 .getId ();
227251 assertThat (getPreparedCacheSize (session )).isEqualTo (2 );
228252
229- CountDownLatch preparedStmtCacheRemoveLatch = new CountDownLatch (1 );
230253 CountDownLatch typeChangeEventLatch = new CountDownLatch (expectedChangedTypes .size ());
231254
232255 DefaultDriverContext ctx = (DefaultDriverContext ) session .getContext ();
@@ -260,7 +283,6 @@ private void invalidationTestInner(
260283 removedQueryEventError .set (
261284 Optional .of ("Unable to set reference for PS removal event" ));
262285 }
263- preparedStmtCacheRemoveLatch .countDown ();
264286 });
265287
266288 // alter test_type_2 to trigger cache invalidation and above events
@@ -270,17 +292,20 @@ private void invalidationTestInner(
270292 assertThat (Uninterruptibles .awaitUninterruptibly (typeChangeEventLatch , 10 , TimeUnit .SECONDS ))
271293 .withFailMessage ("typeChangeEventLatch did not trigger before timeout" )
272294 .isTrue ();
273- assertThat (
274- Uninterruptibles .awaitUninterruptibly (
275- preparedStmtCacheRemoveLatch , 10 , TimeUnit .SECONDS ))
276- .withFailMessage ("preparedStmtCacheRemoveLatch did not trigger before timeout" )
277- .isTrue ();
278295
279- /* Okay, the latch triggered so cache processing should now be done. Let's validate :allthethings: */
280- assertThat (changedTypes .keySet ()).isEqualTo (expectedChangedTypes );
281- assertThat (removedQueryIds .get ()).isNotEmpty ().get ().isEqualTo (queryId2 );
296+ await ()
297+ .atMost (30 , TimeUnit .SECONDS )
298+ .untilAsserted (() -> assertThat (getPreparedCacheSize (session )).isEqualTo (1 ));
299+
300+ assertThat (session .prepare (preparedStmtQueryType1 )).isSameAs (statement1 );
282301 assertThat (getPreparedCacheSize (session )).isEqualTo (1 );
283302
303+ assertThat (session .prepare (preparedStmtQueryType2 )).isNotSameAs (statement2 );
304+ assertThat (getPreparedCacheSize (session )).isEqualTo (2 );
305+
306+ assertThat (changedTypes .keySet ()).isEqualTo (expectedChangedTypes );
307+ removedQueryIds .get ().ifPresent (queryId -> assertThat (queryId ).isEqualTo (queryId2 ));
308+
284309 // check no errors were seen in callback (and report those as fail msgs)
285310 // if something is broken these may still succeed due to timing
286311 // but shouldn't intermittently fail if the code is working properly
0 commit comments