@@ -102,10 +102,22 @@ class ControlConnection implements Connection.Owner {
102102 // from here on out.
103103 private volatile boolean isPeersV2 = true ;
104104
105+ private final SystemColumnProjection projection = new SystemColumnProjection ();
106+
105107 public ControlConnection (Cluster .Manager manager ) {
106108 this .cluster = manager ;
107109 }
108110
111+ /**
112+ * Resets the projected-column caches so that the next query to each system table sends {@code
113+ * SELECT *} and re-discovers available columns. Intended for use in tests that clear Scassandra
114+ * primes between driver operations.
115+ */
116+ @ VisibleForTesting
117+ void resetColumnCaches () {
118+ projection .reset ();
119+ }
120+
109121 // Only for the initial connection. Does not schedule retries if it fails
110122 void connect () throws UnsupportedProtocolVersionException {
111123 if (isShutdown ) return ;
@@ -326,6 +338,12 @@ private Connection tryConnect(Host host, boolean isInitialConnection)
326338 ProtocolEvent .Type .SCHEMA_CHANGE );
327339 connection .write (new Requests .Register (evs ));
328340
341+ // Reset column caches so refreshNodeListAndTokenMap() uses SELECT * to rediscover
342+ // which columns this server exposes, rather than a projected query built for a
343+ // previous connection's server. The caches are populated during the queries below
344+ // and remain warm for the lifetime of this connection.
345+ projection .reset ();
346+
329347 // We need to refresh the node list first so we know about the cassandra version of
330348 // the node we're connecting to.
331349 // This will create the token map for the first time, but it will be incomplete
@@ -453,6 +471,11 @@ void refreshNodeListAndTokenMap() {
453471 } catch (ExecutionException e ) {
454472 // If we're being shutdown during refresh, this can happen. That's fine so don't scare the
455473 // user.
474+ if (e .getCause () instanceof InvalidQueryException ) {
475+ // A projected query referenced a column the server no longer exposes; reset caches so
476+ // the next connection re-discovers columns via SELECT *.
477+ projection .reset ();
478+ }
456479 if (!isShutdown )
457480 logger .error (
458481 "[Control connection] Unexpected error while refreshing node list and token map" , e );
@@ -489,28 +512,46 @@ private Row fetchNodeInfo(Host host, Connection c)
489512 InterruptedException {
490513 boolean isConnectedHost = c .endPoint .equals (host .getEndPoint ());
491514 if (isConnectedHost || host .getBroadcastSocketAddress () != null ) {
515+ SystemColumnProjection .SystemTable table =
516+ isConnectedHost
517+ ? SystemColumnProjection .SystemTable .LOCAL
518+ : (isPeersV2
519+ ? SystemColumnProjection .SystemTable .PEERS_V2
520+ : SystemColumnProjection .SystemTable .PEERS );
492521 String query ;
493522 if (isConnectedHost ) {
494- query = SELECT_LOCAL ;
523+ query = projection . query ( table ) ;
495524 } else {
496525 InetSocketAddress broadcastAddress = host .getBroadcastSocketAddress ();
497- query =
498- isPeersV2
499- ? SELECT_PEERS_V2
500- + " WHERE peer='"
501- + broadcastAddress .getAddress ().getHostAddress ()
502- + "' AND peer_port="
503- + broadcastAddress .getPort ()
504- : SELECT_PEERS
505- + " WHERE peer='"
506- + broadcastAddress .getAddress ().getHostAddress ()
507- + "'" ;
526+ // Always use SELECT * for single-row WHERE lookups. Projected queries are only used for
527+ // full-table scans via selectPeersFuture(), where the cache is guaranteed to be warm and
528+ // every node has the projected full-scan prime registered. For WHERE lookups the control
529+ // connection may query a node that was never restarted (and therefore still carries only
530+ // the original SELECT * prime from init time), so projecting here risks a cache miss.
531+ if (isPeersV2 ) {
532+ String whereClause =
533+ "peer='"
534+ + broadcastAddress .getAddress ().getHostAddress ()
535+ + "' AND peer_port="
536+ + broadcastAddress .getPort ();
537+ query = SELECT_PEERS_V2 + " WHERE " + whereClause ;
538+ } else {
539+ String whereClause = "peer='" + broadcastAddress .getAddress ().getHostAddress () + "'" ;
540+ query = SELECT_PEERS + " WHERE " + whereClause ;
541+ }
508542 }
509543 DefaultResultSetFuture future =
510544 new DefaultResultSetFuture (null , cluster .protocolVersion (), new Requests .Query (query ));
511545 c .write (future );
512- Row row = future .get ().one ();
546+ ResultSet rs = future .get ();
547+ Row row = rs .one ();
513548 if (row != null ) {
549+ // Populate the column cache only when we got a real row. WHERE-clause lookups may return
550+ // zero rows (e.g. broadcast address changed), in which case the ColumnDefinitions still
551+ // exist in the result metadata but there is nothing useful to learn — we must not warm
552+ // the cache from an empty result, or subsequent full-table scans will send a projected
553+ // query that the server may not recognise.
554+ projection .populate (table , rs );
514555 return row ;
515556 } else {
516557 InetSocketAddress address = host .getBroadcastSocketAddress ();
@@ -582,6 +623,11 @@ boolean refreshNodeInfo(Host host) {
582623 } catch (ExecutionException e ) {
583624 // If we're being shutdown during refresh, this can happen. That's fine so don't scare the
584625 // user.
626+ if (e .getCause () instanceof InvalidQueryException ) {
627+ // A projected query referenced a column the server no longer exposes; reset caches so
628+ // the next connection re-discovers columns via SELECT *.
629+ projection .reset ();
630+ }
585631 if (!isShutdown )
586632 logger .debug ("[Control connection] Unexpected error while refreshing node info" , e );
587633 signalError ();
@@ -719,7 +765,9 @@ private ListenableFuture<ResultSet> selectPeersFuture(final Connection connectio
719765 if (isPeersV2 ) {
720766 DefaultResultSetFuture peersV2Future =
721767 new DefaultResultSetFuture (
722- null , cluster .protocolVersion (), new Requests .Query (SELECT_PEERS_V2 ));
768+ null ,
769+ cluster .protocolVersion (),
770+ new Requests .Query (projection .query (SystemColumnProjection .SystemTable .PEERS_V2 )));
723771 connection .write (peersV2Future );
724772 final SettableFuture <ResultSet > peersFuture = SettableFuture .create ();
725773 // if peers v2 query fails, query peers table instead.
@@ -729,6 +777,7 @@ private ListenableFuture<ResultSet> selectPeersFuture(final Connection connectio
729777
730778 @ Override
731779 public void onSuccess (ResultSet result ) {
780+ projection .populate (SystemColumnProjection .SystemTable .PEERS_V2 , result );
732781 peersFuture .set (result );
733782 }
734783
@@ -742,6 +791,9 @@ public void onFailure(Throwable t) {
742791 || (t instanceof ServerError
743792 && t .getMessage ().contains ("Unknown keyspace/cf pair (system.peers_v2)" ))) {
744793 isPeersV2 = false ;
794+ // Reset all caches: peersV2Columns is now stale, and peers cache should be cleared
795+ // so the first system.peers query re-discovers columns via SELECT *.
796+ projection .reset ();
745797 MoreFutures .propagateFuture (peersFuture , selectPeersFuture (connection ));
746798 } else {
747799 peersFuture .setException (t );
@@ -751,11 +803,13 @@ public void onFailure(Throwable t) {
751803 MoreExecutors .directExecutor ());
752804 return peersFuture ;
753805 } else {
754- DefaultResultSetFuture peersFuture =
806+ DefaultResultSetFuture rawFuture =
755807 new DefaultResultSetFuture (
756- null , cluster .protocolVersion (), new Requests .Query (SELECT_PEERS ));
757- connection .write (peersFuture );
758- return peersFuture ;
808+ null ,
809+ cluster .protocolVersion (),
810+ new Requests .Query (projection .query (SystemColumnProjection .SystemTable .PEERS )));
811+ connection .write (rawFuture );
812+ return projection .hook (SystemColumnProjection .SystemTable .PEERS , rawFuture );
759813 }
760814 }
761815
@@ -774,7 +828,9 @@ private void refreshNodeListAndTokenMap(
774828
775829 DefaultResultSetFuture localFuture =
776830 new DefaultResultSetFuture (
777- null , cluster .protocolVersion (), new Requests .Query (SELECT_LOCAL ));
831+ null ,
832+ cluster .protocolVersion (),
833+ new Requests .Query (projection .query (SystemColumnProjection .SystemTable .LOCAL )));
778834 ListenableFuture <ResultSet > peersFuture = selectPeersFuture (connection );
779835 connection .write (localFuture );
780836
@@ -783,7 +839,9 @@ private void refreshNodeListAndTokenMap(
783839 Map <Host , Set <Token >> tokenMap = new HashMap <Host , Set <Token >>();
784840
785841 // Update cluster name, DC and rack for the one node we are connected to
786- Row localRow = localFuture .get ().one ();
842+ ResultSet localRs = localFuture .get ();
843+ projection .populate (SystemColumnProjection .SystemTable .LOCAL , localRs );
844+ Row localRow = localRs .one ();
787845 if (localRow == null ) {
788846 throw new IllegalStateException (
789847 String .format (
0 commit comments